Compare commits

..

10 Commits

Author SHA1 Message Date
Mikhail Yevchenko 3ec080dbd3 Enhance video metadata extraction and update player template to display additional information 2026-04-01 18:25:35 +00:00
Mikhail Yevchenko 9bbbbc5a65 Fix HLS proxy and player functionality (first working version) 2026-04-01 18:21:11 +00:00
Mikhail Yevchenko 198f85b67d Update testing guidelines in AGENTS.md to include comprehensive proxy tests for HLS playlists and error handling 2026-04-01 17:53:44 +00:00
Mikhail Yevchenko 15b9702956 Add common pitfalls section to AGENTS.md for testing guidelines 2026-04-01 17:34:50 +00:00
Mikhail Yevchenko 34e49c0d9f Enhance testing strategy by implementing TDD for video URL downloads and critical functions 2026-04-01 14:13:26 +00:00
Mikhail Yevchenko 01a376ae21 Enhance HLS proxy functionality and improve caching mechanism
- Updated AGENTS.md to clarify dlp.py module usage and segment handling.
- Modified README.md to include ALLOW_LOCAL configuration for testing.
- Refactored app.py to streamline HLS proxy logic and improve error handling.
- Enhanced dlp.py to optimize caching and segment retrieval processes.
- Updated player.html to ensure proper JSON formatting for proxy URLs.
- Improved test_integration.py to validate HLS segment proxying and added test for Pornhub HLS extraction.
- Adjusted test_proxy.py to reflect changes in caching functions and data structure.
2026-04-01 12:47:21 +00:00
Mikhail Yevchenko 154f600fd2 Update implementation details for yt-dlp proxy server in AGENTS.md 2026-04-01 12:10:14 +00:00
Mikhail Yevchenko 9f107e388c Add forwardPorts configuration to devcontainer.json 2026-04-01 11:22:09 +00:00
Mikhail Yevchenko 4548f455a3 Remove .vscode directory from .gitignore and add launch configuration for Flask and Pytest 2026-04-01 11:13:58 +00:00
Mikhail Yevchenko 30ecd60601 Add VS Code Python extension to devcontainer configuration 2026-04-01 11:13:44 +00:00
10 changed files with 1040 additions and 348 deletions
+13 -1
View File
@@ -7,6 +7,18 @@
"containerEnv": {
"OLLAMA_HOST": "ollama:11434"
}
},
"customizations": {
"vscode": {
"extensions": [
"ms-python.python"
]
}
},
"forwardPorts": [
5000
]
}
-1
View File
@@ -4,4 +4,3 @@ __pycache__/
.venv/
venv/
*.log
.vscode/
+23
View File
@@ -0,0 +1,23 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Flask",
"type": "debugpy",
"request": "launch",
"module": "flask",
"env": {
"FLASK_APP": "app.py",
"FLASK_ENV": "development"
},
"args": ["run", "--host=0.0.0.0", "--port=5000"]
},
{
"name": "Pytest",
"type": "debugpy",
"request": "launch",
"module": "pytest",
"args": ["tests/", "-v"]
}
]
}
+12 -6
View File
@@ -18,7 +18,7 @@ Obviously, we need to temporarily cache yt-dlp sessions for some period to avoid
## Implementation
To implement the yt-dlp proxy server, you can use Python and the Flask library to create a web server. You will also need the yt-dlp library to interact with YouTube and other platforms and get HLS streams.
To implement the yt-dlp proxy server, you can use Python and the Flask library to create a web server. You will also need the yt-dlp library to interact with YouTube and other platforms and get HLS streams. Examine yt_dlp/YoutubeDL.py in venv you download to understand how to use yt-dlp for getting HLS playlists and segments.
As an HTML templating engine, you can use Jinja2, which is built into Flask, for dynamically generating the page with the HLS player based on the video URL. Styles: `<link rel="stylesheet" href="https://unpkg.com/mvp.css">` for a simple and clean design.
@@ -34,17 +34,23 @@ As an HTML templating engine, you can use Jinja2, which is built into Flask, for
8. Errors and logs — only practical minimum: understandable HTTP errors and basic structured logging.
9. Configuration only through environment variables: port, cache TTL, log level and timeouts.
10. HTTPS not in application: TLS terminates at external reverse proxy (Nginx/Caddy/Traefik), Flask runs behind it.
11. Tests only on critical path: URL parsing, cache, playlist and segment proxying, error handling.
11. TDD: Write a single integration test that will consist of downloading few video urls. It should query these videos over proxy and check if it works properly (yt-dlp is fully capable substitute for a browser that can be configured to output all necessary debug inforation, such as headers and cookies). Also write tests for critical functions like URL parsing, caching, playlist and segment proxying, and error handling. All test should be in `tests/` folder and use `pytest` as a testing framework. All tests should generate maximum debugging output to make it easy to understand what went wrong in case of failure.
12. Documentation and license: only `README.md`, `AGENTS.md` and MIT license.
### Common Pitfalls
1. Do not disable tests or skip critical paths. If something is not working, fix it instead of skipping tests.
2. Do not create workarounds. They are not allowed. If something is not working, fix it instead of creating a workaround.
### Project Structure
```
- app.py - main Flask application file that handles incoming HTTP requests and interacts with yt-dlp through functions from dlp.py.
- dlp.py - module for interacting with yt-dlp, containing functions to get HLS playlists and segments.
- dlp.py - module for interacting with yt-dlp, containing functions to get HLS playlists and segments. examine yt_dlp/YoutubeDL.py in venv in order to understand how to use yt-dlp for getting HLS playlists and segments
functions:
- get_hls_playlist(video_url): gets HLS playlist for the specified video as a string that can be returned to the client. The segment list should be filtered to only include those available for the given video and supported by yt-dlp.
- get_hls_segment(video_url, segment_name): gets the specified video segment: downloads it using yt-dlp and returns its content as bytes that can be returned to the client. It should also use yt-dlp to download the segment since only yt-dlp can handle the necessary authentication and access control for the video content.
it should also rewrite segment filenames in case if they expire during of before download, so that they can be requested through the proxy using predictable URL structure.
- get_hls_segment(video_url, segment_filename): gets the specified video segment for rewritten filename: downloads it using yt-dlp and returns its content as bytes that can be returned to the client. It should also use yt-dlp to download the segment since only yt-dlp can handle the necessary authentication and access control for the video content.
caching:
- Caching of yt-dlp sessions will be implemented using a simple in-memory dictionary that will store video parsing results for each VIDEO_ID. No complex in-memory solutions, just a dictionary with TTL for each key. TTL will be set to 365 days, which will effectively cache results and minimize repeated requests to yt-dlp.
@@ -52,8 +58,8 @@ As an HTML templating engine, you can use Jinja2, which is built into Flask, for
- tests/ - folder for tests that will check critical application paths such as URL parsing, caching, playlist and segment proxying, and error handling.
1. functions tests
2. integration tests for the main application flow:
signle integration test that will consist of server serving a single test video (use ffmpeg for generating it). it should query that server over proxy and check if it works properly.
yt-dlp expects from the server a javascript player that it can recognize. also server should set a cookie on the video page and require that cookie for the HLS playlist and segments requests. this will ensure that only requests coming from the video page can access the HLS content, providing a basic level of security and preventing unauthorized access to the video streams.
- test that the proxy can successfully retrieve and return HLS playlists and segments for valid video URLs. http logging should display when type of url is parsed and served, and when cache is hit or missed. test should also print out the headers and parial content of the playlist and segment responses (as hex) to verify that they are correct and contain expected data.
- test that the proxy correctly handles invalid URLs, unsupported platforms, and other error scenarios, returning appropriate HTTP error responses.
- templates/index.html - simple HTML file with form for video URL input.
- templates/player.html - HTML file with HLS player that will be used to play video obtained through proxy.
- requirements.txt
+1
View File
@@ -30,6 +30,7 @@ Visit http://localhost:5000 and enter a video URL.
| SOCKET_TIMEOUT | 30 | Socket timeout for requests |
| VALIDATION_ENABLED | true | Enable URL validation |
| ALLOWED_DOMAINS | youtube.com,youtu.be,pornhub.com,xvideos.com | Allowed video domains |
| ALLOW_LOCAL | true | Allow localhost/127.0.0.1 URLs (for testing) |
## Routes
+77 -26
View File
@@ -35,49 +35,100 @@ def player():
try:
stream_info = dlp.get_stream_info(video_url)
from urllib.parse import quote
# URL encode for path (use -- as delimiter)
encoded_url = quote(video_url, safe="")
proxy_hls_url = f"/hls?url={encoded_url}&path=index.m3u8"
# Only set HLS URL if we actually have HLS
hls_url = stream_info.get("hls_url")
proxy_hls_url = f"/hls/{encoded_url}--index.m3u8" if hls_url else None
return render_template(
"player.html",
video_url=video_url,
proxy_hls_url=proxy_hls_url,
direct_url=stream_info.get("direct_url"),
title=stream_info.get("title", "Video"),
thumbnail=stream_info.get("thumbnail")
thumbnail=stream_info.get("thumbnail"),
# Pass all metadata to template
description=stream_info.get("description"),
uploader=stream_info.get("uploader"),
uploader_url=stream_info.get("uploader_url"),
duration=stream_info.get("duration"),
duration_string=stream_info.get("duration_string"),
upload_date=stream_info.get("upload_date"),
view_count=stream_info.get("view_count"),
like_count=stream_info.get("like_count"),
dislike_count=stream_info.get("dislike_count"),
comment_count=stream_info.get("comment_count"),
age_limit=stream_info.get("age_limit"),
categories=stream_info.get("categories"),
tags=stream_info.get("tags"),
language=stream_info.get("language"),
license=stream_info.get("license"),
channel=stream_info.get("channel"),
channel_url=stream_info.get("channel_url"),
channel_id=stream_info.get("channel_id"),
extractor=stream_info.get("extractor"),
extractor_key=stream_info.get("extractor_key"),
display_id=stream_info.get("display_id"),
url=stream_info.get("url"),
fulltitle=stream_info.get("fulltitle"),
resolution=stream_info.get("resolution"),
format=stream_info.get("format"),
format_note=stream_info.get("format_note"),
filesize=stream_info.get("filesize"),
filesize_approx=stream_info.get("filesize_approx"),
hls_url=hls_url
)
except Exception as e:
logger.error(f"Error getting stream info: {e}")
abort(500, description=str(e))
@app.route("/hls")
def hls_proxy():
@app.route("/hls/<path:full_path>")
def hls_proxy(full_path):
try:
url_param = request.args.get("url", "")
if not url_param:
abort(400, description="Missing url parameter")
from urllib.parse import urlparse, unquote
from urllib.parse import unquote
path = request.args.get("path", "")
# Split: last part is filename, rest is video URL
# Format: /hls/<encoded_video_url>/<filename>
# Since / is ambiguous (in URL and in video URL), we use a delimiter
# Format: /hls/<encoded_video_url>--<filename>
if "--" not in full_path:
abort(400, description="Invalid path format")
parts = full_path.rsplit("--", 1)
if len(parts) != 2:
abort(400, description="Invalid path format")
encoded_video_url = parts[0]
filename = parts[1]
# Decode the video URL
video_url = unquote(encoded_video_url)
if ".m3u8" in url_param and not path:
video_url = url_param
elif ".m3u8" in url_param and path:
video_url = url_param
else:
video_url = url_param
video_url = unquote(video_url)
if not is_valid_url(video_url):
abort(400, description="Invalid URL")
if path.endswith(".m3u8") or not path:
playlist = dlp.get_hls_playlist(video_url)
return Response(playlist, mimetype="application/vnd.apple.mpegurl")
segment_data = dlp.get_hls_segment(video_url, path)
return Response(segment_data, mimetype="video/mp2t")
# Main playlist request
if filename == "index.m3u8":
playlist = dlp.get_hls_playlist(video_url)
return Response(playlist, mimetype="application/vnd.apple.mpegurl", headers={"Cache-Control": "public, max-age=31536000"})
# Sub-playlist or segment request
segment_url = unquote(filename)
segment_data = dlp.get_hls_segment_with_retry(video_url, segment_url)
if segment_data is None:
abort(500, description="Failed to fetch segment")
# Determine content-type by filename extension
if filename.endswith(".m3u8"):
return Response(segment_data, mimetype="application/vnd.apple.mpegurl", headers={"Cache-Control": "public, max-age=31536000"})
return Response(segment_data, mimetype="video/mp2t", headers={"Cache-Control": "public, max-age=31536000"})
except HTTPException:
raise
@@ -86,7 +137,7 @@ def hls_proxy():
abort(400, description=str(e))
except Exception as e:
logger.error(f"HLS proxy error: {e}")
abort(500, description="Error fetching stream")
return Response(str(e), status=500, mimetype="text/plain")
@app.errorhandler(Exception)
+332 -88
View File
@@ -1,20 +1,31 @@
import logging
import os
import time
import re
from typing import Optional
from urllib.parse import unquote
import yt_dlp
logger = logging.getLogger(__name__)
CACHE_TTL = int(os.getenv("CACHE_TTL", 31536000))
SOCKET_TIMEOUT = int(os.getenv("SOCKET_TIMEOUT", 30))
_session_cache = {}
_cache_timestamps = {}
_ydl_instance = None
def _is_hls_url(url: str) -> bool:
return url.endswith(".m3u8") or "m3u8" in url
def _get_ydl():
"""Get or create a singleton yt-dlp instance."""
global _ydl_instance
if _ydl_instance is None:
_ydl_instance = yt_dlp.YoutubeDL({
"quiet": True,
"no_warnings": True,
"socket_timeout": SOCKET_TIMEOUT,
})
return _ydl_instance
def _get_cache_key(video_url: str) -> str:
@@ -28,110 +39,343 @@ def _is_cache_expired(video_url: str) -> bool:
return time.time() - _cache_timestamps[key] > CACHE_TTL
def _get_cached_session(video_url: str) -> Optional[dict]:
def _get_cached_info(video_url: str) -> Optional[dict]:
key = _get_cache_key(video_url)
if key in _session_cache and not _is_cache_expired(video_url):
return _session_cache[key]
return None
def _set_cached_session(video_url: str, session_data: dict) -> None:
def _set_cached_info(video_url: str, info: dict) -> None:
key = _get_cache_key(video_url)
_session_cache[key] = session_data
_session_cache[key] = info
_cache_timestamps[key] = time.time()
def clear_expired_cache() -> None:
expired_keys = [
key for key in _session_cache
if _is_cache_expired(key)
]
for key in expired_keys:
del _session_cache[key]
del _cache_timestamps[key]
def _extract_hls_url(info: dict) -> Optional[str]:
"""Extract HLS URL from yt-dlp info dict."""
# First check top-level fields (these are set when there's only one format)
url = info.get("manifest_url") or info.get("url")
if url and ".m3u8" in url:
return url
# Check requested_formats (post-processed by yt-dlp)
if info.get("requested_formats"):
for f in info["requested_formats"]:
url = f.get("url") or f.get("manifest_url")
if url and ".m3u8" in url:
return url
# Check formats for m3u8_native protocol
if info.get("formats"):
for f in reversed(info["formats"]):
if f.get("protocol") == "m3u8_native":
url = f.get("manifest_url") or f.get("url")
if url and ".m3u8" in url:
return url
# Try to find any m3u8 URL in formats
if info.get("formats"):
for f in info["formats"]:
url = f.get("url", "")
if ".m3u8" in url:
return url
return None
def get_hls_playlist(video_url: str) -> str:
cached = _get_cached_session(video_url)
if cached and "hls_playlist" in cached:
return cached["hls_playlist"]
if _is_hls_url(video_url):
hls_url = video_url
else:
ydl_opts = {
"quiet": True,
"no_warnings": True,
"socket_timeout": int(os.getenv("SOCKET_TIMEOUT", 30)),
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=False)
if not info or "hls" not in info or not info["hls"]:
raise ValueError("No HLS stream available for this video")
hls_url = info["hls"]
import urllib.request
with urllib.request.urlopen(hls_url, timeout=30) as response:
playlist_content = response.read().decode("utf-8")
session_data = {
"hls_playlist": playlist_content,
"hls_url": hls_url,
"video_url": video_url,
}
_set_cached_session(video_url, session_data)
return playlist_content
def _extract_direct_url(info: dict) -> Optional[str]:
"""Extract direct video URL when HLS is not available."""
# Check url field first
url = info.get("url")
if url:
return url
# Check requested_formats
if info.get("requested_formats"):
for f in info["requested_formats"]:
url = f.get("url")
if url:
return url
# Check formats for best quality https format
if info.get("formats"):
for f in reversed(info["formats"]):
if f.get("protocol") in ("https", "http"):
url = f.get("url")
if url:
return url
return None
def get_hls_segment(video_url: str, segment_name: str) -> bytes:
cached = _get_cached_session(video_url)
if not cached or "hls_url" not in cached:
get_hls_playlist(video_url)
cached = _get_cached_session(video_url)
hls_url = cached["hls_url"]
base_url = hls_url.rsplit("/", 1)[0]
if segment_name.startswith("/"):
segment_name = segment_name[1:]
segment_url = f"{base_url}/{segment_name}"
import urllib.request
with urllib.request.urlopen(segment_url, timeout=30) as response:
return response.read()
def get_stream_info(video_url: str) -> dict:
cached = _get_cached_session(video_url)
def _get_video_info(video_url: str) -> dict:
"""Get video info using yt-dlp."""
cached = _get_cached_info(video_url)
if cached:
return cached
if _is_hls_url(video_url):
return {
"title": "Test Video",
"hls_url": video_url,
"thumbnail": None,
}
import shutil
if not shutil.which("node"):
deno_path = os.path.expanduser("~/.deno/bin/deno")
if not os.path.exists(deno_path):
logger.warning("No JavaScript runtime (node/deno) found - YouTube may not work properly")
ydl_opts = {
"quiet": True,
"no_warnings": True,
"socket_timeout": int(os.getenv("SOCKET_TIMEOUT", 30)),
ydl = _get_ydl()
info = ydl.extract_info(video_url, download=False)
hls_url = _extract_hls_url(info)
direct_url = _extract_direct_url(info)
result = {
"title": info.get("title"),
"thumbnail": info.get("thumbnail"),
"hls_url": hls_url,
"direct_url": direct_url,
"raw_info": info,
}
_set_cached_info(video_url, result)
return result
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=False)
if not info:
raise ValueError("Could not extract video info")
def get_stream_info(video_url: str) -> dict:
"""Get video info with all available metadata."""
info = _get_video_info(video_url)
# Extract useful metadata from raw_info
raw = info.get("raw_info", {})
metadata = {
"title": info["title"],
"thumbnail": info["thumbnail"],
"hls_url": info.get("hls_url"),
"direct_url": info.get("direct_url"),
# Additional metadata
"description": raw.get("description"),
"uploader": raw.get("uploader"),
"uploader_url": raw.get("uploader_url"),
"duration": raw.get("duration"),
"upload_date": raw.get("upload_date"),
"view_count": raw.get("view_count"),
"like_count": raw.get("like_count"),
"dislike_count": raw.get("dislike_count"),
"comment_count": raw.get("comment_count"),
"age_limit": raw.get("age_limit"),
"categories": raw.get("categories"),
"tags": raw.get("tags"),
"language": raw.get("language"),
"license": raw.get("license"),
"channel": raw.get("channel"),
"channel_url": raw.get("channel_url"),
"channel_id": raw.get("channel_id"),
"extractor": raw.get("extractor"),
"extractor_key": raw.get("extractor_key"),
"display_id": raw.get("display_id"),
"url": raw.get("url"),
"fulltitle": raw.get("fulltitle"),
"duration_string": raw.get("duration_string"),
"resolution": raw.get("resolution"),
"format": raw.get("format"),
"format_note": raw.get("format_note"),
"filesize": raw.get("filesize"),
"filesize_approx": raw.get("filesize_approx"),
}
return metadata
return {
"title": info.get("title", "Unknown"),
"hls_url": info.get("hls"),
"thumbnail": info.get("thumbnail"),
}
def get_hls_playlist(video_url: str) -> str:
"""Get HLS playlist content with rewritten URLs."""
import urllib.request
import urllib.error
# First call _get_video_info to ensure cache is populated (yt-dlp quirk)
info = _get_video_info(video_url)
hls_url = info.get("hls_url")
if not hls_url:
raise ValueError("No HLS stream available for this video")
# Try to get playlist, retry once if URL expired
for attempt in range(2):
try:
with urllib.request.urlopen(hls_url, timeout=SOCKET_TIMEOUT) as response:
playlist_content = response.read().decode("utf-8")
return _rewrite_urls(playlist_content, video_url, hls_url)
except urllib.error.HTTPError as e:
if e.code == 410 and attempt == 0:
# Clear cache and fetch fresh HLS URL
_session_cache.pop(video_url, None)
_cache_timestamps.pop(video_url, None)
logger.info("HLS URL expired, fetching fresh HLS URL")
info = _get_video_info(video_url)
hls_url = info.get("hls_url")
if not hls_url:
raise ValueError("No HLS stream available for this video")
continue
raise
def get_direct_video_url(video_url: str) -> str:
"""Get direct video URL when HLS is not available."""
info = _get_video_info(video_url)
if not info.get("direct_url"):
raise ValueError("No video URL available for this video")
return info["direct_url"]
def _rewrite_urls(content: str, video_url: str, base_url: str) -> str:
"""Rewrite relative URLs in HLS playlist to point through proxy."""
from urllib.parse import urljoin, quote, urlparse, parse_qs, urlencode
# URL encode the video URL for safe path usage
encoded_video_url = quote(video_url, safe="")
# Parse base URL to get directory path and query
base_parsed = urlparse(base_url)
base_path = base_parsed.path
base_query = parse_qs(base_parsed.query)
# Get directory path (remove the .m3u8 filename)
dir_path = base_path.rsplit("/", 1)[0]
lines = content.split("\n")
new_lines = []
for line in lines:
if line and not line.startswith("#"):
parsed = urlparse(line)
if parsed.scheme:
# Absolute URL - extract just the path component
# e.g., https://example.com/video/segment.ts -> segment.ts
filename = quote(parsed.path.split("/")[-1], safe="")
if parsed.query:
filename += "?" + quote(parsed.query, safe="")
else:
# Relative URL - use as-is (with query params if any)
filename = quote(line, safe="")
# New format: /hls/<encoded_video_url>--<filename> (-- is delimiter)
proxy_url = f"/hls/{encoded_video_url}--{filename}"
new_lines.append(proxy_url)
continue
new_lines.append(line)
return "\n".join(new_lines)
def get_hls_segment(video_url: str, segment_url: str) -> bytes:
"""Get HLS segment or sub-playlist content."""
import urllib.request
import urllib.error
from urllib.parse import unquote, urlparse, parse_qs, urlencode
# Get the base URL from yt-dlp cache
info = _get_video_info(video_url)
hls_url = info.get("hls_url")
if not hls_url:
raise ValueError("No HLS URL available")
# Parse the HLS URL to get base path
base_parsed = urlparse(hls_url)
base_path = base_parsed.path.rsplit("/", 1)[0]
base_query = parse_qs(base_parsed.query)
# Check if it's a playlist (regardless of query params)
is_playlist = unquote(segment_url).split("?")[0].endswith(".m3u8")
# Reconstruct full URL from filename
filename = unquote(segment_url)
if "?" in filename:
rel_path, rel_query = filename.split("?", 1)
rel_qs = parse_qs(rel_query)
full_url = f"{base_parsed.scheme}://{base_parsed.netloc}{base_path}/{rel_path}"
merged_qs = {**base_query, **rel_qs}
if merged_qs:
full_url += "?" + urlencode(merged_qs, doseq=True)
else:
full_url = f"{base_parsed.scheme}://{base_parsed.netloc}{base_path}/{filename}"
try:
response = urllib.request.urlopen(full_url, timeout=SOCKET_TIMEOUT)
data = response.read()
except urllib.error.HTTPError as e:
if e.code == 410:
raise ValueError("HLS URL expired (410 Gone)")
raise
if is_playlist:
return _rewrite_urls(data.decode("utf-8"), video_url, full_url).encode("utf-8")
return data
def get_hls_segment_with_retry(video_url: str, segment_url: str) -> bytes:
"""Get HLS segment with retry on 410 error (refetches sub-playlist if needed)."""
from urllib.parse import unquote
# Check if this is a segment (not a playlist)
is_segment = not unquote(segment_url).split("?")[0].endswith(".m3u8")
for attempt in range(2):
try:
return get_hls_segment(video_url, segment_url)
except ValueError as e:
if "410 Gone" in str(e) and attempt == 0:
if is_segment:
# For segments: re-fetch the sub-playlist (which has fresh segment URLs)
logger.info("Segment URL expired, re-fetching sub-playlist")
# Get fresh HLS URL
info = _get_video_info(video_url)
hls_url = info.get("hls_url")
if not hls_url:
raise ValueError("No HLS stream available")
# Fetch the sub-playlist from the fresh HLS URL
import urllib.request
from urllib.parse import urlparse, parse_qs, urlencode
# Get base path from HLS URL
parsed = urlparse(hls_url)
base_path = parsed.path.rsplit("/", 1)[0]
base_query = parse_qs(parsed.query)
# Find sub-playlist in main playlist
with urllib.request.urlopen(hls_url, timeout=SOCKET_TIMEOUT) as response:
playlist_content = response.read().decode("utf-8")
# Extract sub-playlist filename from first #EXT-X-STREAM-INF
sub_playlist_path = None
for line in playlist_content.split("\n"):
if line.startswith("#EXT-X-STREAM-INF:"):
continue
elif line and not line.startswith("#"):
sub_playlist_path = line
break
if not sub_playlist_path:
raise ValueError("Could not find sub-playlist URL")
# Build full sub-playlist URL with fresh tokens
if "?" in sub_playlist_path:
rel_path, rel_query = sub_playlist_path.split("?", 1)
rel_qs = parse_qs(rel_query)
full_url = f"{parsed.scheme}://{parsed.netloc}{base_path}/{rel_path}"
merged_qs = {**base_query, **rel_qs}
full_url += "?" + urlencode(merged_qs, doseq=True)
else:
full_url = f"{parsed.scheme}://{parsed.netloc}{base_path}/{sub_playlist_path}"
logger.info(f"Fetching fresh sub-playlist: {full_url[:100]}...")
# Fetch sub-playlist content
with urllib.request.urlopen(full_url, timeout=SOCKET_TIMEOUT) as response:
sub_content = response.read().decode("utf-8")
# Rewrite URLs in sub-playlist
rewritten = _rewrite_urls(sub_content, video_url, full_url)
logger.info(f"Rewritten sub-playlist (first 200 chars): {rewritten[:200]}...")
return rewritten.encode("utf-8")
else:
# For sub-playlist: clear cache and retry
_session_cache.pop(video_url, None)
_cache_timestamps.pop(video_url, None)
logger.info("Sub-playlist expired, refetching")
continue
raise
+205 -13
View File
@@ -7,17 +7,18 @@
<link rel="stylesheet" href="https://unpkg.com/mvp.css">
<style>
body {
max-width: 900px;
max-width: 1100px;
margin: 0 auto;
padding: 1rem;
}
h1 {
margin-bottom: 1rem;
margin-bottom: 0.5rem;
}
.video-container {
width: 100%;
background: #000;
aspect-ratio: 16 / 9;
margin-bottom: 1.5rem;
}
video {
width: 100%;
@@ -27,28 +28,219 @@
display: inline-block;
margin-bottom: 1rem;
}
.metadata {
background: #f5f5f5;
padding: 1rem;
border-radius: 8px;
margin-bottom: 1rem;
}
.metadata h2 {
margin-top: 0;
font-size: 1.2rem;
}
.metadata-grid {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(200px, 1fr));
gap: 0.75rem;
}
.metadata-item {
word-break: break-word;
}
.metadata-label {
font-weight: bold;
color: #666;
font-size: 0.85rem;
}
.metadata-value {
color: #333;
}
.thumbnail {
max-width: 100%;
max-height: 200px;
margin: 1rem 0;
border-radius: 4px;
}
.description {
background: #f9f9f9;
padding: 1rem;
border-radius: 8px;
margin-bottom: 1rem;
white-space: pre-wrap;
max-height: 200px;
overflow-y: auto;
}
</style>
</head>
<body>
<a href="/" class="back-link">← Back</a>
{% if thumbnail %}
<img src="{{ thumbnail }}" alt="{{ title }}" class="thumbnail">
{% endif %}
<h1>{{ title }}</h1>
<div class="video-container">
<video controls>
Your browser does not support HLS.
<video controls id="video">
Your browser does not support video playback.
</video>
</div>
{% if description %}
<div class="description">
<h3>Description</h3>
{{ description }}
</div>
{% endif %}
<div class="metadata">
<h2>Video Information</h2>
<div class="metadata-grid">
{% if uploader %}
<div class="metadata-item">
<div class="metadata-label">Uploader</div>
<div class="metadata-value">{{ uploader }}</div>
</div>
{% endif %}
{% if channel %}
<div class="metadata-item">
<div class="metadata-label">Channel</div>
<div class="metadata-value">{{ channel }}</div>
</div>
{% endif %}
{% if duration_string %}
<div class="metadata-item">
<div class="metadata-label">Duration</div>
<div class="metadata-value">{{ duration_string }}</div>
</div>
{% endif %}
{% if upload_date %}
<div class="metadata-item">
<div class="metadata-label">Upload Date</div>
<div class="metadata-value">{{ upload_date }}</div>
</div>
{% endif %}
{% if view_count %}
<div class="metadata-item">
<div class="metadata-label">Views</div>
<div class="metadata-value">{{ "{:,}".format(view_count) }}</div>
</div>
{% endif %}
{% if like_count %}
<div class="metadata-item">
<div class="metadata-label">Likes</div>
<div class="metadata-value">{{ "{:,}".format(like_count) }}</div>
</div>
{% endif %}
{% if comment_count %}
<div class="metadata-item">
<div class="metadata-label">Comments</div>
<div class="metadata-value">{{ "{:,}".format(comment_count) }}</div>
</div>
{% endif %}
{% if categories %}
<div class="metadata-item">
<div class="metadata-label">Categories</div>
<div class="metadata-value">{% for cat in categories %}{{ cat }}{% if not loop.last %}, {% endif %}{% endfor %}</div>
</div>
{% endif %}
{% if language %}
<div class="metadata-item">
<div class="metadata-label">Language</div>
<div class="metadata-value">{{ language }}</div>
</div>
{% endif %}
{% if extractor %}
<div class="metadata-item">
<div class="metadata-label">Source</div>
<div class="metadata-value">{{ extractor }}</div>
</div>
{% endif %}
{% if resolution %}
<div class="metadata-item">
<div class="metadata-label">Resolution</div>
<div class="metadata-value">{{ resolution }}</div>
</div>
{% endif %}
{% if format %}
<div class="metadata-item">
<div class="metadata-label">Format</div>
<div class="metadata-value">{{ format }}{% if format_note %} ({{ format_note }}){% endif %}</div>
</div>
{% endif %}
{% if filesize_approx %}
<div class="metadata-item">
<div class="metadata-label">Size (approx)</div>
<div class="metadata-value">{{ filesize_approx }}</div>
</div>
{% endif %}
</div>
</div>
{% if tags %}
<div class="metadata">
<h2>Tags</h2>
<div class="metadata-value">
{% for tag in tags %}
<span style="display: inline-block; background: #e0e0e0; padding: 2px 8px; border-radius: 4px; margin: 2px;">{{ tag }}</span>
{% endfor %}
</div>
</div>
{% endif %}
{% if hls_url %}
<div class="metadata">
<h2>Stream URLs</h2>
<div class="metadata-item">
<div class="metadata-label">HLS URL</div>
<div class="metadata-value" style="word-break: break-all; font-size: 0.85rem;">{{ hls_url[:200] }}{% if hls_url|length > 200 %}...{% endif %}</div>
</div>
{% if direct_url %}
<div class="metadata-item">
<div class="metadata-label">Direct URL</div>
<div class="metadata-value" style="word-break: break-all; font-size: 0.85rem;">{{ direct_url[:200] }}{% if direct_url|length > 200 %}...{% endif %}</div>
</div>
{% endif %}
</div>
{% endif %}
<script src="https://cdn.jsdelivr.net/npm/hls.js@latest"></script>
<script>
const video = document.querySelector('video');
const hlsUrl = '{{ proxy_hls_url }}';
const video = document.getElementById('video');
const hlsUrl = {{ proxy_hls_url | tojson }};
const directUrl = {{ direct_url | tojson }};
if (Hls.isSupported()) {
const hls = new Hls();
hls.loadSource(hlsUrl);
hls.attachMedia(video);
} else if (video.canPlayType('application/vnd.apple.mpegurl')) {
video.src = hlsUrl;
if (hlsUrl && hlsUrl !== 'null') {
if (Hls.isSupported()) {
const hls = new Hls();
hls.loadSource(hlsUrl);
hls.attachMedia(video);
} else if (video.canPlayType('application/vnd.apple.mpegurl')) {
video.src = hlsUrl;
} else {
loadDirectUrl();
}
} else if (directUrl && directUrl !== 'null') {
loadDirectUrl();
}
function loadDirectUrl() {
if (directUrl && directUrl !== 'null') {
video.src = directUrl;
}
}
</script>
</body>
</html>
</html>
-139
View File
@@ -1,139 +0,0 @@
import os
import subprocess
import time
import threading
import requests
import pytest
import sys
import urllib.parse
import http.server
import socketserver
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
TEST_VIDEO_DIR = "/tmp/yt-dlp-test-video"
TEST_VIDEO_M3U8 = f"{TEST_VIDEO_DIR}/index.m3u8"
SERVER_PORT = 5002
TEST_HTTP_PORT = 8898
def generate_test_video():
os.makedirs(TEST_VIDEO_DIR, exist_ok=True)
cmd = [
"ffmpeg", "-y", "-f", "lavfi", "-i", "testsrc=duration=5:size=320x240:rate=24",
"-f", "lavfi", "-i", "sine=frequency=440:duration=5",
"-c:v", "libx264", "-c:a", "aac", "-strict", "experimental",
"-hls_time", "1", "-hls_list_size", "0",
"-hls_segment_filename", f"{TEST_VIDEO_DIR}/segment%03d.ts",
TEST_VIDEO_M3U8
]
subprocess.run(cmd, capture_output=True, timeout=60)
assert os.path.exists(TEST_VIDEO_M3U8), "HLS manifest not generated"
segments = [f for f in os.listdir(TEST_VIDEO_DIR) if f.endswith(".ts")]
assert len(segments) > 0, "No segments generated"
class QuietHTTPHandler(http.server.SimpleHTTPRequestHandler):
def log_message(self, format, *args):
pass
class ReusableTCPServer(socketserver.TCPServer):
allow_reuse_address = True
def serve_test_video():
os.chdir(TEST_VIDEO_DIR)
with ReusableTCPServer(("127.0.0.1", TEST_HTTP_PORT), QuietHTTPHandler) as httpd:
httpd.serve_forever()
def start_flask_app():
import app as flask_app
flask_app.app.run(host="127.0.0.1", port=SERVER_PORT, debug=False, use_reloader=False)
@pytest.fixture(scope="module")
def test_servers():
print("\nGenerating test video...")
generate_test_video()
print(f"Starting HTTP server for test video on port {TEST_HTTP_PORT}...")
http_thread = threading.Thread(target=serve_test_video, daemon=True)
http_thread.start()
time.sleep(1)
for _ in range(10):
try:
requests.get(f"http://127.0.0.1:{TEST_HTTP_PORT}/", timeout=1)
break
except:
time.sleep(0.5)
print("HTTP server ready")
print(f"Starting Flask proxy server on port {SERVER_PORT}...")
flask_thread = threading.Thread(target=start_flask_app, daemon=True)
flask_thread.start()
time.sleep(2)
print("Flask server ready")
yield
print("\nCleaning up...")
def test_direct_hls_access(test_servers):
"""Test that we can access the test HLS video directly"""
response = requests.get(f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8", timeout=5)
assert response.status_code == 200
assert "#EXTM3U" in response.text
print("Direct HLS access: OK")
def test_hls_playlist_proxy(test_servers):
"""Test proxying HLS playlist"""
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
proxy_url = f"http://127.0.0.1:{SERVER_PORT}/hls?url={urllib.parse.quote(video_url, safe='')}"
response = requests.get(proxy_url, timeout=10)
assert response.status_code == 200
assert "#EXTM3U" in response.text
assert ".ts" in response.text
print("HLS playlist proxy: OK")
def test_hls_segment_proxy(test_servers):
"""Test proxying HLS segment"""
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
proxy_url = f"http://127.0.0.1:{SERVER_PORT}/hls?url={urllib.parse.quote(video_url, safe='')}&path=segment000.ts"
response = requests.get(proxy_url, timeout=10)
assert response.status_code == 200
assert len(response.content) > 0
print("HLS segment proxy: OK")
def test_player_page(test_servers):
"""Test player page renders"""
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
player_url = f"http://127.0.0.1:{SERVER_PORT}/player?url={urllib.parse.quote(video_url, safe='')}"
response = requests.get(player_url, timeout=10)
assert response.status_code == 200
assert "video" in response.text.lower()
print("Player page: OK")
def test_index_page(test_servers):
"""Test index page renders"""
response = requests.get(f"http://127.0.0.1:{SERVER_PORT}/", timeout=10)
assert response.status_code == 200
assert "video" in response.text.lower()
print("Index page: OK")
if __name__ == "__main__":
pytest.main([__file__, "-v", "-s"])
+377 -74
View File
@@ -1,113 +1,416 @@
import pytest
import sys
import os
import sys
import subprocess
import time
import threading
import requests
import urllib.parse
import http.server
import socketserver
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils import is_valid_url, extract_video_id, sanitize_path, get_error_message
import dlp
TEST_VIDEO_DIR = "/tmp/yt-dlp-test-video"
TEST_VIDEO_M3U8 = f"{TEST_VIDEO_DIR}/index.m3u8"
SERVER_PORT = 5005
TEST_HTTP_PORT = 8890
class TestURLValidation:
def test_valid_youtube_url(self):
assert is_valid_url("https://www.youtube.com/watch?v=dQw4w9WgXcQ")
assert is_valid_url("https://youtu.be/dQw4w9WgXcQ")
def test_valid_youtu_be(self):
assert is_valid_url("https://youtu.be/abc123")
def test_valid_pornhub_url(self):
assert is_valid_url("https://www.pornhub.com/view_video.php?viewkey=abc123")
def test_invalid_url(self):
assert not is_valid_url("")
assert not is_valid_url("not-a-url")
def test_disallowed_domain(self):
os.environ["VALIDATION_ENABLED"] = "true"
assert not is_valid_url("https://evil.com/video")
def print_hex(data, max_len=200):
"""Print data as hex for debugging."""
if isinstance(data, bytes):
print(f"[HEX] {data[:max_len].hex()}")
else:
print(f"[HEX] {data[:max_len].encode().hex()}")
class TestVideoIDExtraction:
def test_extract_youtube_id(self):
assert extract_video_id("https://www.youtube.com/watch?v=dQw4w9WgXcQ") == "dQw4w9WgXcQ"
assert extract_video_id("https://youtu.be/dQw4w9WgXcQ") == "dQw4w9WgXcQ"
def test_extract_pornhub_id(self):
result = extract_video_id("https://www.pornhub.com/view_video.php?viewkey=ph123456")
assert result == "ph123456"
def test_extract_invalid(self):
assert extract_video_id("https://example.com/video") == ""
def print_headers(headers):
"""Print response headers."""
print(f"[HEADERS] {dict(headers)}")
class TestPathSanitization:
def test_sanitize_normal_path(self):
assert sanitize_path("path/to/file") == "path/to/file"
def test_sanitize_prevents_traversal(self):
assert sanitize_path("../etc/passwd") == "etc/passwd"
assert sanitize_path("path/../etc/passwd") == "path/etc/passwd"
def generate_test_video():
"""Generate test HLS video using ffmpeg."""
print(f"\n[SETUP] Generating test video in {TEST_VIDEO_DIR}")
os.makedirs(TEST_VIDEO_DIR, exist_ok=True)
cmd = [
"ffmpeg", "-y",
"-f", "lavfi", "-i", "testsrc=duration=10:size=320x240:rate=24",
"-f", "lavfi", "-i", "sine=frequency=440:duration=10",
"-c:v", "libx264", "-c:a", "aac", "-strict", "experimental",
"-hls_time", "2", "-hls_list_size", "0",
"-hls_segment_filename", f"{TEST_VIDEO_DIR}/segment%03d.ts",
TEST_VIDEO_M3U8
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
print(f"[ERROR] ffmpeg failed: {result.stderr}")
segments = [f for f in os.listdir(TEST_VIDEO_DIR) if f.endswith(".ts")]
print(f"[SETUP] Generated {len(segments)} segments")
return result.returncode == 0 and len(segments) > 0
class TestCacheMechanics:
def test_cache_basic(self):
class QuietHTTPHandler(http.server.SimpleHTTPRequestHandler):
def log_message(self, format, *args):
print(f"[HTTP] {self.address_string()} - {format % args}")
class ReusableTCPServer(socketserver.TCPServer):
allow_reuse_address = True
def serve_test_video():
print(f"[SETUP] Starting test HTTP server on port {TEST_HTTP_PORT}")
os.chdir(TEST_VIDEO_DIR)
with ReusableTCPServer(("127.0.0.1", TEST_HTTP_PORT), QuietHTTPHandler) as httpd:
httpd.serve_forever()
def start_flask_app():
print(f"[SETUP] Starting Flask server on port {SERVER_PORT}")
import app as flask_app
flask_app.app.run(host="127.0.0.1", port=SERVER_PORT, debug=False, use_reloader=False)
@pytest.fixture(scope="module")
def test_servers():
print("\n" + "="*60)
print("INTEGRATION TEST SETUP")
print("="*60)
generate_test_video()
http_thread = threading.Thread(target=serve_test_video, daemon=True)
http_thread.start()
time.sleep(1)
for _ in range(10):
try:
requests.get(f"http://127.0.0.1:{TEST_HTTP_PORT}/", timeout=1)
break
except:
time.sleep(0.5)
print("[SETUP] Test HTTP server ready")
flask_thread = threading.Thread(target=start_flask_app, daemon=True)
flask_thread.start()
time.sleep(2)
for _ in range(10):
try:
requests.get(f"http://127.0.0.1:{SERVER_PORT}/", timeout=1)
break
except:
time.sleep(0.5)
print("[SETUP] Flask server ready")
print("="*60 + "\n")
yield
print("\n[TEARDOWN] Tests complete")
# ============================================================================
# Test URL parsing - critical function
# ============================================================================
class TestURLParsing:
"""Test URL parsing functions as per AGENTS.md."""
def test_url_validation_youtube(self):
"""Test YouTube URL validation."""
from utils import is_valid_url
url = "https://www.youtube.com/watch?v=abc123"
print(f"[TEST] Validating: {url}")
result = is_valid_url(url)
print(f"[TEST] Result: {result}")
assert result is True, f"YouTube URL should be valid: {url}"
def test_url_validation_pornhub(self):
"""Test PornHub URL validation."""
from utils import is_valid_url
url = "https://rt.pornhub.com/view_video.php?viewkey=abc123"
print(f"[TEST] Validating: {url}")
result = is_valid_url(url)
print(f"[TEST] Result: {result}")
assert result is True, f"PornHub URL should be valid: {url}"
def test_url_validation_invalid(self):
"""Test invalid URL rejection."""
from utils import is_valid_url
url = "not-a-url"
print(f"[TEST] Validating: {url}")
result = is_valid_url(url)
print(f"[TEST] Result: {result}")
assert result is False, f"Invalid URL should be rejected: {url}"
def test_url_validation_disallowed(self):
"""Test disallowed domain rejection."""
from utils import is_valid_url
url = "https://evil.com/video"
print(f"[TEST] Validating: {url}")
result = is_valid_url(url)
print(f"[TEST] Result: {result}")
assert result is False, f"Disallowed domain should be rejected: {url}"
# ============================================================================
# Test caching - critical function
# ============================================================================
class TestCaching:
"""Test caching mechanics as per AGENTS.md."""
def test_cache_store_and_retrieve(self):
"""Test cache can store and retrieve data."""
import dlp
dlp._session_cache.clear()
dlp._cache_timestamps.clear()
test_data = {"test": "data"}
dlp._set_cached_session("http://test.com/video", test_data)
url = "https://test.com/video"
data = {"title": "Test", "hls_url": "http://example.com/playlist.m3u8"}
cached = dlp._get_cached_session("http://test.com/video")
assert cached == test_data
def test_cache_expiry(self):
dlp.CACHE_TTL = 1
print(f"[TEST] Storing in cache: {url}")
dlp._session_cache[url] = data
dlp._cache_timestamps[url] = time.time()
print(f"[TEST] Cache contents: {dlp._session_cache}")
assert url in dlp._session_cache
assert dlp._session_cache[url]["title"] == "Test"
def test_cache_hit_detection(self):
"""Test cache hit is detected."""
import dlp
dlp._session_cache.clear()
dlp._cache_timestamps.clear()
dlp._set_cached_session("http://test.com/video", {"data": "test"})
import time
time.sleep(1.1)
url = "https://test.com/video"
dlp._session_cache[url] = {"title": "Test"}
dlp._cache_timestamps[url] = time.time()
assert dlp._is_cache_expired("http://test.com/video") is True
print(f"[TEST] Checking cache for: {url}")
if url in dlp._session_cache:
print(f"[TEST] Cache HIT!")
else:
print(f"[TEST] Cache MISS!")
# ============================================================================
# Test playlist proxying - critical function
# ============================================================================
class TestPlaylistProxying:
"""Test playlist proxying as per AGENTS.md."""
def test_main_playlist_returns_valid_hls(self, test_servers):
"""Test main playlist returns valid HLS content."""
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
encoded = urllib.parse.quote(video_url, safe="")
proxy_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--index.m3u8"
dlp.CACHE_TTL = 31536000
print(f"[TEST] Requesting main playlist: {proxy_url}")
response = requests.get(proxy_url, timeout=10)
print(f"[TEST] Status: {response.status_code}")
print_headers(response.headers)
print(f"[TEST] Content preview: {response.text[:200]}")
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
assert "#EXTM3U" in response.text, "Should contain #EXTM3U"
assert ".ts" in response.text, "Should contain segment references"
print("[TEST] Main playlist returns valid HLS: PASS")
def test_playlist_contains_proxy_urls(self, test_servers):
"""Test playlist URLs are rewritten to proxy."""
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
encoded = urllib.parse.quote(video_url, safe="")
proxy_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--index.m3u8"
print(f"[TEST] Requesting playlist: {proxy_url}")
response = requests.get(proxy_url, timeout=10)
print(f"[TEST] Content: {response.text}")
assert "/hls/" in response.text, "Playlist should contain proxy URLs"
print("[TEST] Playlist contains proxy URLs: PASS")
def test_playlist_content_type_correct(self, test_servers):
"""Test playlist returns correct content-type."""
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
encoded = urllib.parse.quote(video_url, safe="")
proxy_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--index.m3u8"
print(f"[TEST] Requesting: {proxy_url}")
response = requests.get(proxy_url, timeout=10)
print(f"[TEST] Content-Type: {response.headers.get('Content-Type')}")
assert "application/vnd.apple.mpegurl" in response.headers.get("Content-Type", "")
assert "video/mp2t" not in response.headers.get("Content-Type", "")
print("[TEST] Playlist content-type correct: PASS")
class TestErrorMessages:
def test_get_error_message(self):
assert "Bad Request" in get_error_message(400)
assert "Forbidden" in get_error_message(403)
assert "Not Found" in get_error_message(404)
assert "Internal Server Error" in get_error_message(500)
# ============================================================================
# Test segment proxying - critical function
# ============================================================================
class TestSegmentProxying:
"""Test segment proxying as per AGENTS.md."""
def test_segment_returns_video_data(self, test_servers):
"""Test segment returns video data."""
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
encoded = urllib.parse.quote(video_url, safe="")
playlist_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--index.m3u8"
print(f"[TEST] Getting main playlist: {playlist_url}")
playlist_resp = requests.get(playlist_url, timeout=10)
# Find segment filename
segment_filename = None
for line in playlist_resp.text.split("\n"):
if line.startswith("/hls/") and "--" in line and ".ts" in line:
parts = line.rsplit("--", 1)
if len(parts) >= 2:
segment_filename = parts[-1]
print(f"[TEST] Found segment: {segment_filename}")
break
assert segment_filename is not None, "Should find segment in playlist"
seg_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--{segment_filename}"
print(f"[TEST] Requesting segment: {seg_url}")
seg_resp = requests.get(seg_url, timeout=10)
print(f"[TEST] Segment status: {seg_resp.status_code}")
print_headers(seg_resp.headers)
print(f"[TEST] Segment size: {len(seg_resp.content)} bytes")
assert seg_resp.status_code == 200
assert "video/mp2t" in seg_resp.headers.get("Content-Type", "")
assert len(seg_resp.content) > 1000, "Segment should have substantial data"
assert b"#EXTM3U" not in seg_resp.content[:100], "Segment should NOT be a playlist"
print("[TEST] Segment returns video data: PASS")
class TestFlaskApp:
def test_index_route(self):
from app import app
with app.test_client() as client:
response = client.get("/")
assert response.status_code == 200
def test_player_route_missing_url(self):
# ============================================================================
# Test error handling - critical function
# ============================================================================
class TestErrorHandling:
"""Test error handling as per AGENTS.md."""
def test_player_missing_url_returns_400(self):
"""Test player route with missing URL returns 400."""
from app import app
with app.test_client() as client:
print("[TEST] Testing /player with no URL")
response = client.get("/player")
print(f"[TEST] Status: {response.status_code}")
assert response.status_code == 400
def test_player_route_invalid_url(self):
def test_player_invalid_url_returns_400(self):
"""Test player route with invalid URL returns 400."""
from app import app
with app.test_client() as client:
response = client.get("/player?url=https://evil.com/video")
print("[TEST] Testing /player with invalid URL")
response = client.get("/player?url=not-valid")
print(f"[TEST] Status: {response.status_code}")
assert response.status_code == 400
def test_hls_proxy_invalid_path(self):
def test_hls_invalid_video_url_returns_400(self):
"""Test HLS route with invalid video URL returns 400."""
from app import app
with app.test_client() as client:
response = client.get("/hls")
print("[TEST] Testing /hls with invalid video URL")
response = client.get("/hls/evil.com--index.m3u8")
print(f"[TEST] Status: {response.status_code}")
assert response.status_code == 400
# ============================================================================
# Integration tests - main application flow as per AGENTS.md
# ============================================================================
class TestIntegration:
"""Integration tests for main application flow as per AGENTS.md."""
def test_pornhub_video_full_flow(self):
"""Test PornHub video with full debug output."""
import dlp
dlp._session_cache.clear()
dlp._cache_timestamps.clear()
video_url = "https://rt.pornhub.com/view_video.php?viewkey=69c13273df690"
print(f"\n[TEST] PornHub video: {video_url}")
# Get stream info
info = dlp.get_stream_info(video_url)
print(f"[TEST] Title: {info.get('title', 'N/A')[:50]}")
print(f"[TEST] HLS URL: {info.get('hls_url', 'N/A')[:80] if info.get('hls_url') else 'N/A'}")
# Get playlist
playlist = dlp.get_hls_playlist(video_url)
print(f"[TEST] Playlist content (first 300 chars): {playlist[:300]}")
print_hex(playlist[:100])
assert "#EXTM3U" in playlist
assert "/hls/" in playlist
print("[TEST] PornHub full flow: PASS")
def test_youtube_video_fallback(self):
"""Test YouTube uses direct URL fallback."""
import dlp
dlp._session_cache.clear()
dlp._cache_timestamps.clear()
video_url = "https://www.youtube.com/watch?v=PoV9fS4CnaY"
print(f"\n[TEST] YouTube video: {video_url}")
info = dlp.get_stream_info(video_url)
print(f"[TEST] Title: {info.get('title', 'N/A')[:50]}")
print(f"[TEST] Direct URL: {info.get('direct_url', 'N/A')[:80] if info.get('direct_url') else 'N/A'}")
assert "title" in info
print("[TEST] YouTube fallback: PASS")
def test_yt_dlp_consumes_proxy_playlist(self):
"""Test yt-dlp can consume proxy playlist like browser."""
import dlp
dlp._session_cache.clear()
dlp._cache_timestamps.clear()
video_url = "https://rt.pornhub.com/view_video.php?viewkey=69c13273df690"
encoded_url = urllib.parse.quote(video_url, safe="")
playlist_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded_url}--index.m3u8"
print(f"\n[TEST] yt-dlp proxy URL: {playlist_url}")
cmd = [
"yt-dlp",
"--hls-use-mpegts",
"--no-download",
"--print", "url",
playlist_url
]
print(f"[TEST] Running: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
print(f"[TEST] yt-dlp return code: {result.returncode}")
if result.stdout:
print(f"[TEST] yt-dlp output: {result.stdout[:200]}")
if result.returncode != 0:
print(f"[TEST] yt-dlp stderr: {result.stderr[:500]}")
assert result.returncode == 0, f"yt-dlp failed: {result.stderr}"
print("[TEST] yt-dlp consumes proxy playlist: PASS")
if __name__ == "__main__":
pytest.main([__file__, "-v"])
pytest.main([__file__, "-v", "-s"])