Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 3ec080dbd3 | |||
| 9bbbbc5a65 | |||
| 198f85b67d | |||
| 15b9702956 | |||
| 34e49c0d9f | |||
| 01a376ae21 | |||
| 154f600fd2 | |||
| 9f107e388c | |||
| 4548f455a3 | |||
| 30ecd60601 |
@@ -7,6 +7,18 @@
|
||||
|
||||
"containerEnv": {
|
||||
"OLLAMA_HOST": "ollama:11434"
|
||||
}
|
||||
},
|
||||
"customizations": {
|
||||
"vscode": {
|
||||
"extensions": [
|
||||
"ms-python.python"
|
||||
]
|
||||
}
|
||||
},
|
||||
|
||||
"forwardPorts": [
|
||||
5000
|
||||
]
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -4,4 +4,3 @@ __pycache__/
|
||||
.venv/
|
||||
venv/
|
||||
*.log
|
||||
.vscode/
|
||||
Vendored
+23
@@ -0,0 +1,23 @@
|
||||
{
|
||||
"version": "0.2.0",
|
||||
"configurations": [
|
||||
{
|
||||
"name": "Flask",
|
||||
"type": "debugpy",
|
||||
"request": "launch",
|
||||
"module": "flask",
|
||||
"env": {
|
||||
"FLASK_APP": "app.py",
|
||||
"FLASK_ENV": "development"
|
||||
},
|
||||
"args": ["run", "--host=0.0.0.0", "--port=5000"]
|
||||
},
|
||||
{
|
||||
"name": "Pytest",
|
||||
"type": "debugpy",
|
||||
"request": "launch",
|
||||
"module": "pytest",
|
||||
"args": ["tests/", "-v"]
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -18,7 +18,7 @@ Obviously, we need to temporarily cache yt-dlp sessions for some period to avoid
|
||||
|
||||
## Implementation
|
||||
|
||||
To implement the yt-dlp proxy server, you can use Python and the Flask library to create a web server. You will also need the yt-dlp library to interact with YouTube and other platforms and get HLS streams.
|
||||
To implement the yt-dlp proxy server, you can use Python and the Flask library to create a web server. You will also need the yt-dlp library to interact with YouTube and other platforms and get HLS streams. Examine yt_dlp/YoutubeDL.py in venv you download to understand how to use yt-dlp for getting HLS playlists and segments.
|
||||
|
||||
As an HTML templating engine, you can use Jinja2, which is built into Flask, for dynamically generating the page with the HLS player based on the video URL. Styles: `<link rel="stylesheet" href="https://unpkg.com/mvp.css">` for a simple and clean design.
|
||||
|
||||
@@ -34,17 +34,23 @@ As an HTML templating engine, you can use Jinja2, which is built into Flask, for
|
||||
8. Errors and logs — only practical minimum: understandable HTTP errors and basic structured logging.
|
||||
9. Configuration only through environment variables: port, cache TTL, log level and timeouts.
|
||||
10. HTTPS not in application: TLS terminates at external reverse proxy (Nginx/Caddy/Traefik), Flask runs behind it.
|
||||
11. Tests only on critical path: URL parsing, cache, playlist and segment proxying, error handling.
|
||||
11. TDD: Write a single integration test that will consist of downloading few video urls. It should query these videos over proxy and check if it works properly (yt-dlp is fully capable substitute for a browser that can be configured to output all necessary debug inforation, such as headers and cookies). Also write tests for critical functions like URL parsing, caching, playlist and segment proxying, and error handling. All test should be in `tests/` folder and use `pytest` as a testing framework. All tests should generate maximum debugging output to make it easy to understand what went wrong in case of failure.
|
||||
12. Documentation and license: only `README.md`, `AGENTS.md` and MIT license.
|
||||
|
||||
### Common Pitfalls
|
||||
|
||||
1. Do not disable tests or skip critical paths. If something is not working, fix it instead of skipping tests.
|
||||
2. Do not create workarounds. They are not allowed. If something is not working, fix it instead of creating a workaround.
|
||||
|
||||
### Project Structure
|
||||
|
||||
```
|
||||
- app.py - main Flask application file that handles incoming HTTP requests and interacts with yt-dlp through functions from dlp.py.
|
||||
- dlp.py - module for interacting with yt-dlp, containing functions to get HLS playlists and segments.
|
||||
- dlp.py - module for interacting with yt-dlp, containing functions to get HLS playlists and segments. examine yt_dlp/YoutubeDL.py in venv in order to understand how to use yt-dlp for getting HLS playlists and segments
|
||||
functions:
|
||||
- get_hls_playlist(video_url): gets HLS playlist for the specified video as a string that can be returned to the client. The segment list should be filtered to only include those available for the given video and supported by yt-dlp.
|
||||
- get_hls_segment(video_url, segment_name): gets the specified video segment: downloads it using yt-dlp and returns its content as bytes that can be returned to the client. It should also use yt-dlp to download the segment since only yt-dlp can handle the necessary authentication and access control for the video content.
|
||||
it should also rewrite segment filenames in case if they expire during of before download, so that they can be requested through the proxy using predictable URL structure.
|
||||
- get_hls_segment(video_url, segment_filename): gets the specified video segment for rewritten filename: downloads it using yt-dlp and returns its content as bytes that can be returned to the client. It should also use yt-dlp to download the segment since only yt-dlp can handle the necessary authentication and access control for the video content.
|
||||
|
||||
caching:
|
||||
- Caching of yt-dlp sessions will be implemented using a simple in-memory dictionary that will store video parsing results for each VIDEO_ID. No complex in-memory solutions, just a dictionary with TTL for each key. TTL will be set to 365 days, which will effectively cache results and minimize repeated requests to yt-dlp.
|
||||
@@ -52,8 +58,8 @@ As an HTML templating engine, you can use Jinja2, which is built into Flask, for
|
||||
- tests/ - folder for tests that will check critical application paths such as URL parsing, caching, playlist and segment proxying, and error handling.
|
||||
1. functions tests
|
||||
2. integration tests for the main application flow:
|
||||
signle integration test that will consist of server serving a single test video (use ffmpeg for generating it). it should query that server over proxy and check if it works properly.
|
||||
yt-dlp expects from the server a javascript player that it can recognize. also server should set a cookie on the video page and require that cookie for the HLS playlist and segments requests. this will ensure that only requests coming from the video page can access the HLS content, providing a basic level of security and preventing unauthorized access to the video streams.
|
||||
- test that the proxy can successfully retrieve and return HLS playlists and segments for valid video URLs. http logging should display when type of url is parsed and served, and when cache is hit or missed. test should also print out the headers and parial content of the playlist and segment responses (as hex) to verify that they are correct and contain expected data.
|
||||
- test that the proxy correctly handles invalid URLs, unsupported platforms, and other error scenarios, returning appropriate HTTP error responses.
|
||||
- templates/index.html - simple HTML file with form for video URL input.
|
||||
- templates/player.html - HTML file with HLS player that will be used to play video obtained through proxy.
|
||||
- requirements.txt
|
||||
|
||||
@@ -30,6 +30,7 @@ Visit http://localhost:5000 and enter a video URL.
|
||||
| SOCKET_TIMEOUT | 30 | Socket timeout for requests |
|
||||
| VALIDATION_ENABLED | true | Enable URL validation |
|
||||
| ALLOWED_DOMAINS | youtube.com,youtu.be,pornhub.com,xvideos.com | Allowed video domains |
|
||||
| ALLOW_LOCAL | true | Allow localhost/127.0.0.1 URLs (for testing) |
|
||||
|
||||
## Routes
|
||||
|
||||
|
||||
@@ -35,49 +35,100 @@ def player():
|
||||
try:
|
||||
stream_info = dlp.get_stream_info(video_url)
|
||||
from urllib.parse import quote
|
||||
|
||||
# URL encode for path (use -- as delimiter)
|
||||
encoded_url = quote(video_url, safe="")
|
||||
proxy_hls_url = f"/hls?url={encoded_url}&path=index.m3u8"
|
||||
|
||||
# Only set HLS URL if we actually have HLS
|
||||
hls_url = stream_info.get("hls_url")
|
||||
proxy_hls_url = f"/hls/{encoded_url}--index.m3u8" if hls_url else None
|
||||
|
||||
return render_template(
|
||||
"player.html",
|
||||
video_url=video_url,
|
||||
proxy_hls_url=proxy_hls_url,
|
||||
direct_url=stream_info.get("direct_url"),
|
||||
title=stream_info.get("title", "Video"),
|
||||
thumbnail=stream_info.get("thumbnail")
|
||||
thumbnail=stream_info.get("thumbnail"),
|
||||
# Pass all metadata to template
|
||||
description=stream_info.get("description"),
|
||||
uploader=stream_info.get("uploader"),
|
||||
uploader_url=stream_info.get("uploader_url"),
|
||||
duration=stream_info.get("duration"),
|
||||
duration_string=stream_info.get("duration_string"),
|
||||
upload_date=stream_info.get("upload_date"),
|
||||
view_count=stream_info.get("view_count"),
|
||||
like_count=stream_info.get("like_count"),
|
||||
dislike_count=stream_info.get("dislike_count"),
|
||||
comment_count=stream_info.get("comment_count"),
|
||||
age_limit=stream_info.get("age_limit"),
|
||||
categories=stream_info.get("categories"),
|
||||
tags=stream_info.get("tags"),
|
||||
language=stream_info.get("language"),
|
||||
license=stream_info.get("license"),
|
||||
channel=stream_info.get("channel"),
|
||||
channel_url=stream_info.get("channel_url"),
|
||||
channel_id=stream_info.get("channel_id"),
|
||||
extractor=stream_info.get("extractor"),
|
||||
extractor_key=stream_info.get("extractor_key"),
|
||||
display_id=stream_info.get("display_id"),
|
||||
url=stream_info.get("url"),
|
||||
fulltitle=stream_info.get("fulltitle"),
|
||||
resolution=stream_info.get("resolution"),
|
||||
format=stream_info.get("format"),
|
||||
format_note=stream_info.get("format_note"),
|
||||
filesize=stream_info.get("filesize"),
|
||||
filesize_approx=stream_info.get("filesize_approx"),
|
||||
hls_url=hls_url
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting stream info: {e}")
|
||||
abort(500, description=str(e))
|
||||
|
||||
|
||||
@app.route("/hls")
|
||||
def hls_proxy():
|
||||
@app.route("/hls/<path:full_path>")
|
||||
def hls_proxy(full_path):
|
||||
try:
|
||||
url_param = request.args.get("url", "")
|
||||
if not url_param:
|
||||
abort(400, description="Missing url parameter")
|
||||
|
||||
from urllib.parse import urlparse, unquote
|
||||
from urllib.parse import unquote
|
||||
|
||||
path = request.args.get("path", "")
|
||||
# Split: last part is filename, rest is video URL
|
||||
# Format: /hls/<encoded_video_url>/<filename>
|
||||
# Since / is ambiguous (in URL and in video URL), we use a delimiter
|
||||
# Format: /hls/<encoded_video_url>--<filename>
|
||||
|
||||
if "--" not in full_path:
|
||||
abort(400, description="Invalid path format")
|
||||
|
||||
parts = full_path.rsplit("--", 1)
|
||||
if len(parts) != 2:
|
||||
abort(400, description="Invalid path format")
|
||||
|
||||
encoded_video_url = parts[0]
|
||||
filename = parts[1]
|
||||
|
||||
# Decode the video URL
|
||||
video_url = unquote(encoded_video_url)
|
||||
|
||||
if ".m3u8" in url_param and not path:
|
||||
video_url = url_param
|
||||
elif ".m3u8" in url_param and path:
|
||||
video_url = url_param
|
||||
else:
|
||||
video_url = url_param
|
||||
|
||||
video_url = unquote(video_url)
|
||||
|
||||
if not is_valid_url(video_url):
|
||||
abort(400, description="Invalid URL")
|
||||
|
||||
if path.endswith(".m3u8") or not path:
|
||||
playlist = dlp.get_hls_playlist(video_url)
|
||||
return Response(playlist, mimetype="application/vnd.apple.mpegurl")
|
||||
|
||||
segment_data = dlp.get_hls_segment(video_url, path)
|
||||
return Response(segment_data, mimetype="video/mp2t")
|
||||
# Main playlist request
|
||||
if filename == "index.m3u8":
|
||||
playlist = dlp.get_hls_playlist(video_url)
|
||||
return Response(playlist, mimetype="application/vnd.apple.mpegurl", headers={"Cache-Control": "public, max-age=31536000"})
|
||||
|
||||
# Sub-playlist or segment request
|
||||
segment_url = unquote(filename)
|
||||
|
||||
segment_data = dlp.get_hls_segment_with_retry(video_url, segment_url)
|
||||
|
||||
if segment_data is None:
|
||||
abort(500, description="Failed to fetch segment")
|
||||
|
||||
# Determine content-type by filename extension
|
||||
if filename.endswith(".m3u8"):
|
||||
return Response(segment_data, mimetype="application/vnd.apple.mpegurl", headers={"Cache-Control": "public, max-age=31536000"})
|
||||
return Response(segment_data, mimetype="video/mp2t", headers={"Cache-Control": "public, max-age=31536000"})
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
@@ -86,7 +137,7 @@ def hls_proxy():
|
||||
abort(400, description=str(e))
|
||||
except Exception as e:
|
||||
logger.error(f"HLS proxy error: {e}")
|
||||
abort(500, description="Error fetching stream")
|
||||
return Response(str(e), status=500, mimetype="text/plain")
|
||||
|
||||
|
||||
@app.errorhandler(Exception)
|
||||
|
||||
@@ -1,20 +1,31 @@
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
import re
|
||||
from typing import Optional
|
||||
from urllib.parse import unquote
|
||||
import yt_dlp
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
CACHE_TTL = int(os.getenv("CACHE_TTL", 31536000))
|
||||
SOCKET_TIMEOUT = int(os.getenv("SOCKET_TIMEOUT", 30))
|
||||
|
||||
_session_cache = {}
|
||||
_cache_timestamps = {}
|
||||
|
||||
_ydl_instance = None
|
||||
|
||||
def _is_hls_url(url: str) -> bool:
|
||||
return url.endswith(".m3u8") or "m3u8" in url
|
||||
|
||||
def _get_ydl():
|
||||
"""Get or create a singleton yt-dlp instance."""
|
||||
global _ydl_instance
|
||||
if _ydl_instance is None:
|
||||
_ydl_instance = yt_dlp.YoutubeDL({
|
||||
"quiet": True,
|
||||
"no_warnings": True,
|
||||
"socket_timeout": SOCKET_TIMEOUT,
|
||||
})
|
||||
return _ydl_instance
|
||||
|
||||
|
||||
def _get_cache_key(video_url: str) -> str:
|
||||
@@ -28,110 +39,343 @@ def _is_cache_expired(video_url: str) -> bool:
|
||||
return time.time() - _cache_timestamps[key] > CACHE_TTL
|
||||
|
||||
|
||||
def _get_cached_session(video_url: str) -> Optional[dict]:
|
||||
def _get_cached_info(video_url: str) -> Optional[dict]:
|
||||
key = _get_cache_key(video_url)
|
||||
if key in _session_cache and not _is_cache_expired(video_url):
|
||||
return _session_cache[key]
|
||||
return None
|
||||
|
||||
|
||||
def _set_cached_session(video_url: str, session_data: dict) -> None:
|
||||
def _set_cached_info(video_url: str, info: dict) -> None:
|
||||
key = _get_cache_key(video_url)
|
||||
_session_cache[key] = session_data
|
||||
_session_cache[key] = info
|
||||
_cache_timestamps[key] = time.time()
|
||||
|
||||
|
||||
def clear_expired_cache() -> None:
|
||||
expired_keys = [
|
||||
key for key in _session_cache
|
||||
if _is_cache_expired(key)
|
||||
]
|
||||
for key in expired_keys:
|
||||
del _session_cache[key]
|
||||
del _cache_timestamps[key]
|
||||
def _extract_hls_url(info: dict) -> Optional[str]:
|
||||
"""Extract HLS URL from yt-dlp info dict."""
|
||||
# First check top-level fields (these are set when there's only one format)
|
||||
url = info.get("manifest_url") or info.get("url")
|
||||
if url and ".m3u8" in url:
|
||||
return url
|
||||
|
||||
# Check requested_formats (post-processed by yt-dlp)
|
||||
if info.get("requested_formats"):
|
||||
for f in info["requested_formats"]:
|
||||
url = f.get("url") or f.get("manifest_url")
|
||||
if url and ".m3u8" in url:
|
||||
return url
|
||||
|
||||
# Check formats for m3u8_native protocol
|
||||
if info.get("formats"):
|
||||
for f in reversed(info["formats"]):
|
||||
if f.get("protocol") == "m3u8_native":
|
||||
url = f.get("manifest_url") or f.get("url")
|
||||
if url and ".m3u8" in url:
|
||||
return url
|
||||
|
||||
# Try to find any m3u8 URL in formats
|
||||
if info.get("formats"):
|
||||
for f in info["formats"]:
|
||||
url = f.get("url", "")
|
||||
if ".m3u8" in url:
|
||||
return url
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def get_hls_playlist(video_url: str) -> str:
|
||||
cached = _get_cached_session(video_url)
|
||||
if cached and "hls_playlist" in cached:
|
||||
return cached["hls_playlist"]
|
||||
|
||||
if _is_hls_url(video_url):
|
||||
hls_url = video_url
|
||||
else:
|
||||
ydl_opts = {
|
||||
"quiet": True,
|
||||
"no_warnings": True,
|
||||
"socket_timeout": int(os.getenv("SOCKET_TIMEOUT", 30)),
|
||||
}
|
||||
|
||||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||||
info = ydl.extract_info(video_url, download=False)
|
||||
|
||||
if not info or "hls" not in info or not info["hls"]:
|
||||
raise ValueError("No HLS stream available for this video")
|
||||
|
||||
hls_url = info["hls"]
|
||||
|
||||
import urllib.request
|
||||
with urllib.request.urlopen(hls_url, timeout=30) as response:
|
||||
playlist_content = response.read().decode("utf-8")
|
||||
|
||||
session_data = {
|
||||
"hls_playlist": playlist_content,
|
||||
"hls_url": hls_url,
|
||||
"video_url": video_url,
|
||||
}
|
||||
_set_cached_session(video_url, session_data)
|
||||
|
||||
return playlist_content
|
||||
def _extract_direct_url(info: dict) -> Optional[str]:
|
||||
"""Extract direct video URL when HLS is not available."""
|
||||
# Check url field first
|
||||
url = info.get("url")
|
||||
if url:
|
||||
return url
|
||||
|
||||
# Check requested_formats
|
||||
if info.get("requested_formats"):
|
||||
for f in info["requested_formats"]:
|
||||
url = f.get("url")
|
||||
if url:
|
||||
return url
|
||||
|
||||
# Check formats for best quality https format
|
||||
if info.get("formats"):
|
||||
for f in reversed(info["formats"]):
|
||||
if f.get("protocol") in ("https", "http"):
|
||||
url = f.get("url")
|
||||
if url:
|
||||
return url
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def get_hls_segment(video_url: str, segment_name: str) -> bytes:
|
||||
cached = _get_cached_session(video_url)
|
||||
if not cached or "hls_url" not in cached:
|
||||
get_hls_playlist(video_url)
|
||||
cached = _get_cached_session(video_url)
|
||||
|
||||
hls_url = cached["hls_url"]
|
||||
base_url = hls_url.rsplit("/", 1)[0]
|
||||
|
||||
if segment_name.startswith("/"):
|
||||
segment_name = segment_name[1:]
|
||||
|
||||
segment_url = f"{base_url}/{segment_name}"
|
||||
|
||||
import urllib.request
|
||||
with urllib.request.urlopen(segment_url, timeout=30) as response:
|
||||
return response.read()
|
||||
|
||||
|
||||
def get_stream_info(video_url: str) -> dict:
|
||||
cached = _get_cached_session(video_url)
|
||||
def _get_video_info(video_url: str) -> dict:
|
||||
"""Get video info using yt-dlp."""
|
||||
cached = _get_cached_info(video_url)
|
||||
if cached:
|
||||
return cached
|
||||
|
||||
if _is_hls_url(video_url):
|
||||
return {
|
||||
"title": "Test Video",
|
||||
"hls_url": video_url,
|
||||
"thumbnail": None,
|
||||
}
|
||||
import shutil
|
||||
if not shutil.which("node"):
|
||||
deno_path = os.path.expanduser("~/.deno/bin/deno")
|
||||
if not os.path.exists(deno_path):
|
||||
logger.warning("No JavaScript runtime (node/deno) found - YouTube may not work properly")
|
||||
|
||||
ydl_opts = {
|
||||
"quiet": True,
|
||||
"no_warnings": True,
|
||||
"socket_timeout": int(os.getenv("SOCKET_TIMEOUT", 30)),
|
||||
ydl = _get_ydl()
|
||||
info = ydl.extract_info(video_url, download=False)
|
||||
|
||||
hls_url = _extract_hls_url(info)
|
||||
direct_url = _extract_direct_url(info)
|
||||
result = {
|
||||
"title": info.get("title"),
|
||||
"thumbnail": info.get("thumbnail"),
|
||||
"hls_url": hls_url,
|
||||
"direct_url": direct_url,
|
||||
"raw_info": info,
|
||||
}
|
||||
_set_cached_info(video_url, result)
|
||||
return result
|
||||
|
||||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||||
info = ydl.extract_info(video_url, download=False)
|
||||
|
||||
if not info:
|
||||
raise ValueError("Could not extract video info")
|
||||
def get_stream_info(video_url: str) -> dict:
|
||||
"""Get video info with all available metadata."""
|
||||
info = _get_video_info(video_url)
|
||||
|
||||
# Extract useful metadata from raw_info
|
||||
raw = info.get("raw_info", {})
|
||||
metadata = {
|
||||
"title": info["title"],
|
||||
"thumbnail": info["thumbnail"],
|
||||
"hls_url": info.get("hls_url"),
|
||||
"direct_url": info.get("direct_url"),
|
||||
# Additional metadata
|
||||
"description": raw.get("description"),
|
||||
"uploader": raw.get("uploader"),
|
||||
"uploader_url": raw.get("uploader_url"),
|
||||
"duration": raw.get("duration"),
|
||||
"upload_date": raw.get("upload_date"),
|
||||
"view_count": raw.get("view_count"),
|
||||
"like_count": raw.get("like_count"),
|
||||
"dislike_count": raw.get("dislike_count"),
|
||||
"comment_count": raw.get("comment_count"),
|
||||
"age_limit": raw.get("age_limit"),
|
||||
"categories": raw.get("categories"),
|
||||
"tags": raw.get("tags"),
|
||||
"language": raw.get("language"),
|
||||
"license": raw.get("license"),
|
||||
"channel": raw.get("channel"),
|
||||
"channel_url": raw.get("channel_url"),
|
||||
"channel_id": raw.get("channel_id"),
|
||||
"extractor": raw.get("extractor"),
|
||||
"extractor_key": raw.get("extractor_key"),
|
||||
"display_id": raw.get("display_id"),
|
||||
"url": raw.get("url"),
|
||||
"fulltitle": raw.get("fulltitle"),
|
||||
"duration_string": raw.get("duration_string"),
|
||||
"resolution": raw.get("resolution"),
|
||||
"format": raw.get("format"),
|
||||
"format_note": raw.get("format_note"),
|
||||
"filesize": raw.get("filesize"),
|
||||
"filesize_approx": raw.get("filesize_approx"),
|
||||
}
|
||||
return metadata
|
||||
|
||||
return {
|
||||
"title": info.get("title", "Unknown"),
|
||||
"hls_url": info.get("hls"),
|
||||
"thumbnail": info.get("thumbnail"),
|
||||
}
|
||||
|
||||
def get_hls_playlist(video_url: str) -> str:
|
||||
"""Get HLS playlist content with rewritten URLs."""
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
# First call _get_video_info to ensure cache is populated (yt-dlp quirk)
|
||||
info = _get_video_info(video_url)
|
||||
hls_url = info.get("hls_url")
|
||||
if not hls_url:
|
||||
raise ValueError("No HLS stream available for this video")
|
||||
|
||||
# Try to get playlist, retry once if URL expired
|
||||
for attempt in range(2):
|
||||
try:
|
||||
with urllib.request.urlopen(hls_url, timeout=SOCKET_TIMEOUT) as response:
|
||||
playlist_content = response.read().decode("utf-8")
|
||||
return _rewrite_urls(playlist_content, video_url, hls_url)
|
||||
except urllib.error.HTTPError as e:
|
||||
if e.code == 410 and attempt == 0:
|
||||
# Clear cache and fetch fresh HLS URL
|
||||
_session_cache.pop(video_url, None)
|
||||
_cache_timestamps.pop(video_url, None)
|
||||
logger.info("HLS URL expired, fetching fresh HLS URL")
|
||||
info = _get_video_info(video_url)
|
||||
hls_url = info.get("hls_url")
|
||||
if not hls_url:
|
||||
raise ValueError("No HLS stream available for this video")
|
||||
continue
|
||||
raise
|
||||
|
||||
|
||||
def get_direct_video_url(video_url: str) -> str:
|
||||
"""Get direct video URL when HLS is not available."""
|
||||
info = _get_video_info(video_url)
|
||||
if not info.get("direct_url"):
|
||||
raise ValueError("No video URL available for this video")
|
||||
return info["direct_url"]
|
||||
|
||||
|
||||
def _rewrite_urls(content: str, video_url: str, base_url: str) -> str:
|
||||
"""Rewrite relative URLs in HLS playlist to point through proxy."""
|
||||
from urllib.parse import urljoin, quote, urlparse, parse_qs, urlencode
|
||||
|
||||
# URL encode the video URL for safe path usage
|
||||
encoded_video_url = quote(video_url, safe="")
|
||||
|
||||
# Parse base URL to get directory path and query
|
||||
base_parsed = urlparse(base_url)
|
||||
base_path = base_parsed.path
|
||||
base_query = parse_qs(base_parsed.query)
|
||||
|
||||
# Get directory path (remove the .m3u8 filename)
|
||||
dir_path = base_path.rsplit("/", 1)[0]
|
||||
|
||||
lines = content.split("\n")
|
||||
new_lines = []
|
||||
for line in lines:
|
||||
if line and not line.startswith("#"):
|
||||
parsed = urlparse(line)
|
||||
|
||||
if parsed.scheme:
|
||||
# Absolute URL - extract just the path component
|
||||
# e.g., https://example.com/video/segment.ts -> segment.ts
|
||||
filename = quote(parsed.path.split("/")[-1], safe="")
|
||||
if parsed.query:
|
||||
filename += "?" + quote(parsed.query, safe="")
|
||||
else:
|
||||
# Relative URL - use as-is (with query params if any)
|
||||
filename = quote(line, safe="")
|
||||
|
||||
# New format: /hls/<encoded_video_url>--<filename> (-- is delimiter)
|
||||
proxy_url = f"/hls/{encoded_video_url}--{filename}"
|
||||
new_lines.append(proxy_url)
|
||||
continue
|
||||
new_lines.append(line)
|
||||
return "\n".join(new_lines)
|
||||
|
||||
|
||||
def get_hls_segment(video_url: str, segment_url: str) -> bytes:
|
||||
"""Get HLS segment or sub-playlist content."""
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
from urllib.parse import unquote, urlparse, parse_qs, urlencode
|
||||
|
||||
# Get the base URL from yt-dlp cache
|
||||
info = _get_video_info(video_url)
|
||||
hls_url = info.get("hls_url")
|
||||
|
||||
if not hls_url:
|
||||
raise ValueError("No HLS URL available")
|
||||
|
||||
# Parse the HLS URL to get base path
|
||||
base_parsed = urlparse(hls_url)
|
||||
base_path = base_parsed.path.rsplit("/", 1)[0]
|
||||
base_query = parse_qs(base_parsed.query)
|
||||
|
||||
# Check if it's a playlist (regardless of query params)
|
||||
is_playlist = unquote(segment_url).split("?")[0].endswith(".m3u8")
|
||||
|
||||
# Reconstruct full URL from filename
|
||||
filename = unquote(segment_url)
|
||||
if "?" in filename:
|
||||
rel_path, rel_query = filename.split("?", 1)
|
||||
rel_qs = parse_qs(rel_query)
|
||||
full_url = f"{base_parsed.scheme}://{base_parsed.netloc}{base_path}/{rel_path}"
|
||||
merged_qs = {**base_query, **rel_qs}
|
||||
if merged_qs:
|
||||
full_url += "?" + urlencode(merged_qs, doseq=True)
|
||||
else:
|
||||
full_url = f"{base_parsed.scheme}://{base_parsed.netloc}{base_path}/{filename}"
|
||||
|
||||
try:
|
||||
response = urllib.request.urlopen(full_url, timeout=SOCKET_TIMEOUT)
|
||||
data = response.read()
|
||||
except urllib.error.HTTPError as e:
|
||||
if e.code == 410:
|
||||
raise ValueError("HLS URL expired (410 Gone)")
|
||||
raise
|
||||
|
||||
if is_playlist:
|
||||
return _rewrite_urls(data.decode("utf-8"), video_url, full_url).encode("utf-8")
|
||||
return data
|
||||
|
||||
|
||||
def get_hls_segment_with_retry(video_url: str, segment_url: str) -> bytes:
|
||||
"""Get HLS segment with retry on 410 error (refetches sub-playlist if needed)."""
|
||||
from urllib.parse import unquote
|
||||
|
||||
# Check if this is a segment (not a playlist)
|
||||
is_segment = not unquote(segment_url).split("?")[0].endswith(".m3u8")
|
||||
|
||||
for attempt in range(2):
|
||||
try:
|
||||
return get_hls_segment(video_url, segment_url)
|
||||
except ValueError as e:
|
||||
if "410 Gone" in str(e) and attempt == 0:
|
||||
if is_segment:
|
||||
# For segments: re-fetch the sub-playlist (which has fresh segment URLs)
|
||||
logger.info("Segment URL expired, re-fetching sub-playlist")
|
||||
|
||||
# Get fresh HLS URL
|
||||
info = _get_video_info(video_url)
|
||||
hls_url = info.get("hls_url")
|
||||
if not hls_url:
|
||||
raise ValueError("No HLS stream available")
|
||||
|
||||
# Fetch the sub-playlist from the fresh HLS URL
|
||||
import urllib.request
|
||||
from urllib.parse import urlparse, parse_qs, urlencode
|
||||
|
||||
# Get base path from HLS URL
|
||||
parsed = urlparse(hls_url)
|
||||
base_path = parsed.path.rsplit("/", 1)[0]
|
||||
base_query = parse_qs(parsed.query)
|
||||
|
||||
# Find sub-playlist in main playlist
|
||||
with urllib.request.urlopen(hls_url, timeout=SOCKET_TIMEOUT) as response:
|
||||
playlist_content = response.read().decode("utf-8")
|
||||
|
||||
# Extract sub-playlist filename from first #EXT-X-STREAM-INF
|
||||
sub_playlist_path = None
|
||||
for line in playlist_content.split("\n"):
|
||||
if line.startswith("#EXT-X-STREAM-INF:"):
|
||||
continue
|
||||
elif line and not line.startswith("#"):
|
||||
sub_playlist_path = line
|
||||
break
|
||||
|
||||
if not sub_playlist_path:
|
||||
raise ValueError("Could not find sub-playlist URL")
|
||||
|
||||
# Build full sub-playlist URL with fresh tokens
|
||||
if "?" in sub_playlist_path:
|
||||
rel_path, rel_query = sub_playlist_path.split("?", 1)
|
||||
rel_qs = parse_qs(rel_query)
|
||||
full_url = f"{parsed.scheme}://{parsed.netloc}{base_path}/{rel_path}"
|
||||
merged_qs = {**base_query, **rel_qs}
|
||||
full_url += "?" + urlencode(merged_qs, doseq=True)
|
||||
else:
|
||||
full_url = f"{parsed.scheme}://{parsed.netloc}{base_path}/{sub_playlist_path}"
|
||||
|
||||
logger.info(f"Fetching fresh sub-playlist: {full_url[:100]}...")
|
||||
|
||||
# Fetch sub-playlist content
|
||||
with urllib.request.urlopen(full_url, timeout=SOCKET_TIMEOUT) as response:
|
||||
sub_content = response.read().decode("utf-8")
|
||||
|
||||
# Rewrite URLs in sub-playlist
|
||||
rewritten = _rewrite_urls(sub_content, video_url, full_url)
|
||||
logger.info(f"Rewritten sub-playlist (first 200 chars): {rewritten[:200]}...")
|
||||
return rewritten.encode("utf-8")
|
||||
else:
|
||||
# For sub-playlist: clear cache and retry
|
||||
_session_cache.pop(video_url, None)
|
||||
_cache_timestamps.pop(video_url, None)
|
||||
logger.info("Sub-playlist expired, refetching")
|
||||
continue
|
||||
raise
|
||||
|
||||
+205
-13
@@ -7,17 +7,18 @@
|
||||
<link rel="stylesheet" href="https://unpkg.com/mvp.css">
|
||||
<style>
|
||||
body {
|
||||
max-width: 900px;
|
||||
max-width: 1100px;
|
||||
margin: 0 auto;
|
||||
padding: 1rem;
|
||||
}
|
||||
h1 {
|
||||
margin-bottom: 1rem;
|
||||
margin-bottom: 0.5rem;
|
||||
}
|
||||
.video-container {
|
||||
width: 100%;
|
||||
background: #000;
|
||||
aspect-ratio: 16 / 9;
|
||||
margin-bottom: 1.5rem;
|
||||
}
|
||||
video {
|
||||
width: 100%;
|
||||
@@ -27,28 +28,219 @@
|
||||
display: inline-block;
|
||||
margin-bottom: 1rem;
|
||||
}
|
||||
.metadata {
|
||||
background: #f5f5f5;
|
||||
padding: 1rem;
|
||||
border-radius: 8px;
|
||||
margin-bottom: 1rem;
|
||||
}
|
||||
.metadata h2 {
|
||||
margin-top: 0;
|
||||
font-size: 1.2rem;
|
||||
}
|
||||
.metadata-grid {
|
||||
display: grid;
|
||||
grid-template-columns: repeat(auto-fill, minmax(200px, 1fr));
|
||||
gap: 0.75rem;
|
||||
}
|
||||
.metadata-item {
|
||||
word-break: break-word;
|
||||
}
|
||||
.metadata-label {
|
||||
font-weight: bold;
|
||||
color: #666;
|
||||
font-size: 0.85rem;
|
||||
}
|
||||
.metadata-value {
|
||||
color: #333;
|
||||
}
|
||||
.thumbnail {
|
||||
max-width: 100%;
|
||||
max-height: 200px;
|
||||
margin: 1rem 0;
|
||||
border-radius: 4px;
|
||||
}
|
||||
.description {
|
||||
background: #f9f9f9;
|
||||
padding: 1rem;
|
||||
border-radius: 8px;
|
||||
margin-bottom: 1rem;
|
||||
white-space: pre-wrap;
|
||||
max-height: 200px;
|
||||
overflow-y: auto;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<a href="/" class="back-link">← Back</a>
|
||||
|
||||
{% if thumbnail %}
|
||||
<img src="{{ thumbnail }}" alt="{{ title }}" class="thumbnail">
|
||||
{% endif %}
|
||||
|
||||
<h1>{{ title }}</h1>
|
||||
|
||||
<div class="video-container">
|
||||
<video controls>
|
||||
Your browser does not support HLS.
|
||||
<video controls id="video">
|
||||
Your browser does not support video playback.
|
||||
</video>
|
||||
</div>
|
||||
|
||||
{% if description %}
|
||||
<div class="description">
|
||||
<h3>Description</h3>
|
||||
{{ description }}
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
<div class="metadata">
|
||||
<h2>Video Information</h2>
|
||||
<div class="metadata-grid">
|
||||
{% if uploader %}
|
||||
<div class="metadata-item">
|
||||
<div class="metadata-label">Uploader</div>
|
||||
<div class="metadata-value">{{ uploader }}</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if channel %}
|
||||
<div class="metadata-item">
|
||||
<div class="metadata-label">Channel</div>
|
||||
<div class="metadata-value">{{ channel }}</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if duration_string %}
|
||||
<div class="metadata-item">
|
||||
<div class="metadata-label">Duration</div>
|
||||
<div class="metadata-value">{{ duration_string }}</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if upload_date %}
|
||||
<div class="metadata-item">
|
||||
<div class="metadata-label">Upload Date</div>
|
||||
<div class="metadata-value">{{ upload_date }}</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if view_count %}
|
||||
<div class="metadata-item">
|
||||
<div class="metadata-label">Views</div>
|
||||
<div class="metadata-value">{{ "{:,}".format(view_count) }}</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if like_count %}
|
||||
<div class="metadata-item">
|
||||
<div class="metadata-label">Likes</div>
|
||||
<div class="metadata-value">{{ "{:,}".format(like_count) }}</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if comment_count %}
|
||||
<div class="metadata-item">
|
||||
<div class="metadata-label">Comments</div>
|
||||
<div class="metadata-value">{{ "{:,}".format(comment_count) }}</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if categories %}
|
||||
<div class="metadata-item">
|
||||
<div class="metadata-label">Categories</div>
|
||||
<div class="metadata-value">{% for cat in categories %}{{ cat }}{% if not loop.last %}, {% endif %}{% endfor %}</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if language %}
|
||||
<div class="metadata-item">
|
||||
<div class="metadata-label">Language</div>
|
||||
<div class="metadata-value">{{ language }}</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if extractor %}
|
||||
<div class="metadata-item">
|
||||
<div class="metadata-label">Source</div>
|
||||
<div class="metadata-value">{{ extractor }}</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if resolution %}
|
||||
<div class="metadata-item">
|
||||
<div class="metadata-label">Resolution</div>
|
||||
<div class="metadata-value">{{ resolution }}</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if format %}
|
||||
<div class="metadata-item">
|
||||
<div class="metadata-label">Format</div>
|
||||
<div class="metadata-value">{{ format }}{% if format_note %} ({{ format_note }}){% endif %}</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if filesize_approx %}
|
||||
<div class="metadata-item">
|
||||
<div class="metadata-label">Size (approx)</div>
|
||||
<div class="metadata-value">{{ filesize_approx }}</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{% if tags %}
|
||||
<div class="metadata">
|
||||
<h2>Tags</h2>
|
||||
<div class="metadata-value">
|
||||
{% for tag in tags %}
|
||||
<span style="display: inline-block; background: #e0e0e0; padding: 2px 8px; border-radius: 4px; margin: 2px;">{{ tag }}</span>
|
||||
{% endfor %}
|
||||
</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if hls_url %}
|
||||
<div class="metadata">
|
||||
<h2>Stream URLs</h2>
|
||||
<div class="metadata-item">
|
||||
<div class="metadata-label">HLS URL</div>
|
||||
<div class="metadata-value" style="word-break: break-all; font-size: 0.85rem;">{{ hls_url[:200] }}{% if hls_url|length > 200 %}...{% endif %}</div>
|
||||
</div>
|
||||
{% if direct_url %}
|
||||
<div class="metadata-item">
|
||||
<div class="metadata-label">Direct URL</div>
|
||||
<div class="metadata-value" style="word-break: break-all; font-size: 0.85rem;">{{ direct_url[:200] }}{% if direct_url|length > 200 %}...{% endif %}</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
<script src="https://cdn.jsdelivr.net/npm/hls.js@latest"></script>
|
||||
<script>
|
||||
const video = document.querySelector('video');
|
||||
const hlsUrl = '{{ proxy_hls_url }}';
|
||||
const video = document.getElementById('video');
|
||||
const hlsUrl = {{ proxy_hls_url | tojson }};
|
||||
const directUrl = {{ direct_url | tojson }};
|
||||
|
||||
if (Hls.isSupported()) {
|
||||
const hls = new Hls();
|
||||
hls.loadSource(hlsUrl);
|
||||
hls.attachMedia(video);
|
||||
} else if (video.canPlayType('application/vnd.apple.mpegurl')) {
|
||||
video.src = hlsUrl;
|
||||
if (hlsUrl && hlsUrl !== 'null') {
|
||||
if (Hls.isSupported()) {
|
||||
const hls = new Hls();
|
||||
hls.loadSource(hlsUrl);
|
||||
hls.attachMedia(video);
|
||||
} else if (video.canPlayType('application/vnd.apple.mpegurl')) {
|
||||
video.src = hlsUrl;
|
||||
} else {
|
||||
loadDirectUrl();
|
||||
}
|
||||
} else if (directUrl && directUrl !== 'null') {
|
||||
loadDirectUrl();
|
||||
}
|
||||
|
||||
function loadDirectUrl() {
|
||||
if (directUrl && directUrl !== 'null') {
|
||||
video.src = directUrl;
|
||||
}
|
||||
}
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
</html>
|
||||
@@ -1,139 +0,0 @@
|
||||
import os
|
||||
import subprocess
|
||||
import time
|
||||
import threading
|
||||
import requests
|
||||
import pytest
|
||||
import sys
|
||||
import urllib.parse
|
||||
import http.server
|
||||
import socketserver
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
|
||||
TEST_VIDEO_DIR = "/tmp/yt-dlp-test-video"
|
||||
TEST_VIDEO_M3U8 = f"{TEST_VIDEO_DIR}/index.m3u8"
|
||||
SERVER_PORT = 5002
|
||||
TEST_HTTP_PORT = 8898
|
||||
|
||||
|
||||
def generate_test_video():
|
||||
os.makedirs(TEST_VIDEO_DIR, exist_ok=True)
|
||||
|
||||
cmd = [
|
||||
"ffmpeg", "-y", "-f", "lavfi", "-i", "testsrc=duration=5:size=320x240:rate=24",
|
||||
"-f", "lavfi", "-i", "sine=frequency=440:duration=5",
|
||||
"-c:v", "libx264", "-c:a", "aac", "-strict", "experimental",
|
||||
"-hls_time", "1", "-hls_list_size", "0",
|
||||
"-hls_segment_filename", f"{TEST_VIDEO_DIR}/segment%03d.ts",
|
||||
TEST_VIDEO_M3U8
|
||||
]
|
||||
subprocess.run(cmd, capture_output=True, timeout=60)
|
||||
|
||||
assert os.path.exists(TEST_VIDEO_M3U8), "HLS manifest not generated"
|
||||
segments = [f for f in os.listdir(TEST_VIDEO_DIR) if f.endswith(".ts")]
|
||||
assert len(segments) > 0, "No segments generated"
|
||||
|
||||
|
||||
class QuietHTTPHandler(http.server.SimpleHTTPRequestHandler):
|
||||
def log_message(self, format, *args):
|
||||
pass
|
||||
|
||||
|
||||
class ReusableTCPServer(socketserver.TCPServer):
|
||||
allow_reuse_address = True
|
||||
|
||||
|
||||
def serve_test_video():
|
||||
os.chdir(TEST_VIDEO_DIR)
|
||||
with ReusableTCPServer(("127.0.0.1", TEST_HTTP_PORT), QuietHTTPHandler) as httpd:
|
||||
httpd.serve_forever()
|
||||
|
||||
|
||||
def start_flask_app():
|
||||
import app as flask_app
|
||||
flask_app.app.run(host="127.0.0.1", port=SERVER_PORT, debug=False, use_reloader=False)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def test_servers():
|
||||
print("\nGenerating test video...")
|
||||
generate_test_video()
|
||||
|
||||
print(f"Starting HTTP server for test video on port {TEST_HTTP_PORT}...")
|
||||
http_thread = threading.Thread(target=serve_test_video, daemon=True)
|
||||
http_thread.start()
|
||||
time.sleep(1)
|
||||
|
||||
for _ in range(10):
|
||||
try:
|
||||
requests.get(f"http://127.0.0.1:{TEST_HTTP_PORT}/", timeout=1)
|
||||
break
|
||||
except:
|
||||
time.sleep(0.5)
|
||||
print("HTTP server ready")
|
||||
|
||||
print(f"Starting Flask proxy server on port {SERVER_PORT}...")
|
||||
flask_thread = threading.Thread(target=start_flask_app, daemon=True)
|
||||
flask_thread.start()
|
||||
time.sleep(2)
|
||||
print("Flask server ready")
|
||||
|
||||
yield
|
||||
|
||||
print("\nCleaning up...")
|
||||
|
||||
|
||||
def test_direct_hls_access(test_servers):
|
||||
"""Test that we can access the test HLS video directly"""
|
||||
response = requests.get(f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8", timeout=5)
|
||||
assert response.status_code == 200
|
||||
assert "#EXTM3U" in response.text
|
||||
print("Direct HLS access: OK")
|
||||
|
||||
|
||||
def test_hls_playlist_proxy(test_servers):
|
||||
"""Test proxying HLS playlist"""
|
||||
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
|
||||
proxy_url = f"http://127.0.0.1:{SERVER_PORT}/hls?url={urllib.parse.quote(video_url, safe='')}"
|
||||
|
||||
response = requests.get(proxy_url, timeout=10)
|
||||
assert response.status_code == 200
|
||||
assert "#EXTM3U" in response.text
|
||||
assert ".ts" in response.text
|
||||
print("HLS playlist proxy: OK")
|
||||
|
||||
|
||||
def test_hls_segment_proxy(test_servers):
|
||||
"""Test proxying HLS segment"""
|
||||
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
|
||||
proxy_url = f"http://127.0.0.1:{SERVER_PORT}/hls?url={urllib.parse.quote(video_url, safe='')}&path=segment000.ts"
|
||||
|
||||
response = requests.get(proxy_url, timeout=10)
|
||||
assert response.status_code == 200
|
||||
assert len(response.content) > 0
|
||||
print("HLS segment proxy: OK")
|
||||
|
||||
|
||||
def test_player_page(test_servers):
|
||||
"""Test player page renders"""
|
||||
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
|
||||
player_url = f"http://127.0.0.1:{SERVER_PORT}/player?url={urllib.parse.quote(video_url, safe='')}"
|
||||
|
||||
response = requests.get(player_url, timeout=10)
|
||||
assert response.status_code == 200
|
||||
assert "video" in response.text.lower()
|
||||
print("Player page: OK")
|
||||
|
||||
|
||||
def test_index_page(test_servers):
|
||||
"""Test index page renders"""
|
||||
response = requests.get(f"http://127.0.0.1:{SERVER_PORT}/", timeout=10)
|
||||
assert response.status_code == 200
|
||||
assert "video" in response.text.lower()
|
||||
print("Index page: OK")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v", "-s"])
|
||||
+377
-74
@@ -1,113 +1,416 @@
|
||||
import pytest
|
||||
import sys
|
||||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
import time
|
||||
import threading
|
||||
import requests
|
||||
import urllib.parse
|
||||
import http.server
|
||||
import socketserver
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from utils import is_valid_url, extract_video_id, sanitize_path, get_error_message
|
||||
import dlp
|
||||
|
||||
TEST_VIDEO_DIR = "/tmp/yt-dlp-test-video"
|
||||
TEST_VIDEO_M3U8 = f"{TEST_VIDEO_DIR}/index.m3u8"
|
||||
SERVER_PORT = 5005
|
||||
TEST_HTTP_PORT = 8890
|
||||
|
||||
|
||||
class TestURLValidation:
|
||||
def test_valid_youtube_url(self):
|
||||
assert is_valid_url("https://www.youtube.com/watch?v=dQw4w9WgXcQ")
|
||||
assert is_valid_url("https://youtu.be/dQw4w9WgXcQ")
|
||||
|
||||
def test_valid_youtu_be(self):
|
||||
assert is_valid_url("https://youtu.be/abc123")
|
||||
|
||||
def test_valid_pornhub_url(self):
|
||||
assert is_valid_url("https://www.pornhub.com/view_video.php?viewkey=abc123")
|
||||
|
||||
def test_invalid_url(self):
|
||||
assert not is_valid_url("")
|
||||
assert not is_valid_url("not-a-url")
|
||||
|
||||
def test_disallowed_domain(self):
|
||||
os.environ["VALIDATION_ENABLED"] = "true"
|
||||
assert not is_valid_url("https://evil.com/video")
|
||||
def print_hex(data, max_len=200):
|
||||
"""Print data as hex for debugging."""
|
||||
if isinstance(data, bytes):
|
||||
print(f"[HEX] {data[:max_len].hex()}")
|
||||
else:
|
||||
print(f"[HEX] {data[:max_len].encode().hex()}")
|
||||
|
||||
|
||||
class TestVideoIDExtraction:
|
||||
def test_extract_youtube_id(self):
|
||||
assert extract_video_id("https://www.youtube.com/watch?v=dQw4w9WgXcQ") == "dQw4w9WgXcQ"
|
||||
assert extract_video_id("https://youtu.be/dQw4w9WgXcQ") == "dQw4w9WgXcQ"
|
||||
|
||||
def test_extract_pornhub_id(self):
|
||||
result = extract_video_id("https://www.pornhub.com/view_video.php?viewkey=ph123456")
|
||||
assert result == "ph123456"
|
||||
|
||||
def test_extract_invalid(self):
|
||||
assert extract_video_id("https://example.com/video") == ""
|
||||
def print_headers(headers):
|
||||
"""Print response headers."""
|
||||
print(f"[HEADERS] {dict(headers)}")
|
||||
|
||||
|
||||
class TestPathSanitization:
|
||||
def test_sanitize_normal_path(self):
|
||||
assert sanitize_path("path/to/file") == "path/to/file"
|
||||
|
||||
def test_sanitize_prevents_traversal(self):
|
||||
assert sanitize_path("../etc/passwd") == "etc/passwd"
|
||||
assert sanitize_path("path/../etc/passwd") == "path/etc/passwd"
|
||||
def generate_test_video():
|
||||
"""Generate test HLS video using ffmpeg."""
|
||||
print(f"\n[SETUP] Generating test video in {TEST_VIDEO_DIR}")
|
||||
os.makedirs(TEST_VIDEO_DIR, exist_ok=True)
|
||||
|
||||
cmd = [
|
||||
"ffmpeg", "-y",
|
||||
"-f", "lavfi", "-i", "testsrc=duration=10:size=320x240:rate=24",
|
||||
"-f", "lavfi", "-i", "sine=frequency=440:duration=10",
|
||||
"-c:v", "libx264", "-c:a", "aac", "-strict", "experimental",
|
||||
"-hls_time", "2", "-hls_list_size", "0",
|
||||
"-hls_segment_filename", f"{TEST_VIDEO_DIR}/segment%03d.ts",
|
||||
TEST_VIDEO_M3U8
|
||||
]
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
|
||||
if result.returncode != 0:
|
||||
print(f"[ERROR] ffmpeg failed: {result.stderr}")
|
||||
segments = [f for f in os.listdir(TEST_VIDEO_DIR) if f.endswith(".ts")]
|
||||
print(f"[SETUP] Generated {len(segments)} segments")
|
||||
return result.returncode == 0 and len(segments) > 0
|
||||
|
||||
|
||||
class TestCacheMechanics:
|
||||
def test_cache_basic(self):
|
||||
class QuietHTTPHandler(http.server.SimpleHTTPRequestHandler):
|
||||
def log_message(self, format, *args):
|
||||
print(f"[HTTP] {self.address_string()} - {format % args}")
|
||||
|
||||
|
||||
class ReusableTCPServer(socketserver.TCPServer):
|
||||
allow_reuse_address = True
|
||||
|
||||
|
||||
def serve_test_video():
|
||||
print(f"[SETUP] Starting test HTTP server on port {TEST_HTTP_PORT}")
|
||||
os.chdir(TEST_VIDEO_DIR)
|
||||
with ReusableTCPServer(("127.0.0.1", TEST_HTTP_PORT), QuietHTTPHandler) as httpd:
|
||||
httpd.serve_forever()
|
||||
|
||||
|
||||
def start_flask_app():
|
||||
print(f"[SETUP] Starting Flask server on port {SERVER_PORT}")
|
||||
import app as flask_app
|
||||
flask_app.app.run(host="127.0.0.1", port=SERVER_PORT, debug=False, use_reloader=False)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def test_servers():
|
||||
print("\n" + "="*60)
|
||||
print("INTEGRATION TEST SETUP")
|
||||
print("="*60)
|
||||
|
||||
generate_test_video()
|
||||
|
||||
http_thread = threading.Thread(target=serve_test_video, daemon=True)
|
||||
http_thread.start()
|
||||
time.sleep(1)
|
||||
|
||||
for _ in range(10):
|
||||
try:
|
||||
requests.get(f"http://127.0.0.1:{TEST_HTTP_PORT}/", timeout=1)
|
||||
break
|
||||
except:
|
||||
time.sleep(0.5)
|
||||
print("[SETUP] Test HTTP server ready")
|
||||
|
||||
flask_thread = threading.Thread(target=start_flask_app, daemon=True)
|
||||
flask_thread.start()
|
||||
time.sleep(2)
|
||||
|
||||
for _ in range(10):
|
||||
try:
|
||||
requests.get(f"http://127.0.0.1:{SERVER_PORT}/", timeout=1)
|
||||
break
|
||||
except:
|
||||
time.sleep(0.5)
|
||||
print("[SETUP] Flask server ready")
|
||||
print("="*60 + "\n")
|
||||
|
||||
yield
|
||||
|
||||
print("\n[TEARDOWN] Tests complete")
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Test URL parsing - critical function
|
||||
# ============================================================================
|
||||
|
||||
class TestURLParsing:
|
||||
"""Test URL parsing functions as per AGENTS.md."""
|
||||
|
||||
def test_url_validation_youtube(self):
|
||||
"""Test YouTube URL validation."""
|
||||
from utils import is_valid_url
|
||||
url = "https://www.youtube.com/watch?v=abc123"
|
||||
print(f"[TEST] Validating: {url}")
|
||||
result = is_valid_url(url)
|
||||
print(f"[TEST] Result: {result}")
|
||||
assert result is True, f"YouTube URL should be valid: {url}"
|
||||
|
||||
def test_url_validation_pornhub(self):
|
||||
"""Test PornHub URL validation."""
|
||||
from utils import is_valid_url
|
||||
url = "https://rt.pornhub.com/view_video.php?viewkey=abc123"
|
||||
print(f"[TEST] Validating: {url}")
|
||||
result = is_valid_url(url)
|
||||
print(f"[TEST] Result: {result}")
|
||||
assert result is True, f"PornHub URL should be valid: {url}"
|
||||
|
||||
def test_url_validation_invalid(self):
|
||||
"""Test invalid URL rejection."""
|
||||
from utils import is_valid_url
|
||||
url = "not-a-url"
|
||||
print(f"[TEST] Validating: {url}")
|
||||
result = is_valid_url(url)
|
||||
print(f"[TEST] Result: {result}")
|
||||
assert result is False, f"Invalid URL should be rejected: {url}"
|
||||
|
||||
def test_url_validation_disallowed(self):
|
||||
"""Test disallowed domain rejection."""
|
||||
from utils import is_valid_url
|
||||
url = "https://evil.com/video"
|
||||
print(f"[TEST] Validating: {url}")
|
||||
result = is_valid_url(url)
|
||||
print(f"[TEST] Result: {result}")
|
||||
assert result is False, f"Disallowed domain should be rejected: {url}"
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Test caching - critical function
|
||||
# ============================================================================
|
||||
|
||||
class TestCaching:
|
||||
"""Test caching mechanics as per AGENTS.md."""
|
||||
|
||||
def test_cache_store_and_retrieve(self):
|
||||
"""Test cache can store and retrieve data."""
|
||||
import dlp
|
||||
dlp._session_cache.clear()
|
||||
dlp._cache_timestamps.clear()
|
||||
|
||||
test_data = {"test": "data"}
|
||||
dlp._set_cached_session("http://test.com/video", test_data)
|
||||
url = "https://test.com/video"
|
||||
data = {"title": "Test", "hls_url": "http://example.com/playlist.m3u8"}
|
||||
|
||||
cached = dlp._get_cached_session("http://test.com/video")
|
||||
assert cached == test_data
|
||||
|
||||
def test_cache_expiry(self):
|
||||
dlp.CACHE_TTL = 1
|
||||
print(f"[TEST] Storing in cache: {url}")
|
||||
dlp._session_cache[url] = data
|
||||
dlp._cache_timestamps[url] = time.time()
|
||||
|
||||
print(f"[TEST] Cache contents: {dlp._session_cache}")
|
||||
assert url in dlp._session_cache
|
||||
assert dlp._session_cache[url]["title"] == "Test"
|
||||
|
||||
def test_cache_hit_detection(self):
|
||||
"""Test cache hit is detected."""
|
||||
import dlp
|
||||
dlp._session_cache.clear()
|
||||
dlp._cache_timestamps.clear()
|
||||
|
||||
dlp._set_cached_session("http://test.com/video", {"data": "test"})
|
||||
import time
|
||||
time.sleep(1.1)
|
||||
url = "https://test.com/video"
|
||||
dlp._session_cache[url] = {"title": "Test"}
|
||||
dlp._cache_timestamps[url] = time.time()
|
||||
|
||||
assert dlp._is_cache_expired("http://test.com/video") is True
|
||||
print(f"[TEST] Checking cache for: {url}")
|
||||
if url in dlp._session_cache:
|
||||
print(f"[TEST] Cache HIT!")
|
||||
else:
|
||||
print(f"[TEST] Cache MISS!")
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Test playlist proxying - critical function
|
||||
# ============================================================================
|
||||
|
||||
class TestPlaylistProxying:
|
||||
"""Test playlist proxying as per AGENTS.md."""
|
||||
|
||||
def test_main_playlist_returns_valid_hls(self, test_servers):
|
||||
"""Test main playlist returns valid HLS content."""
|
||||
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
|
||||
encoded = urllib.parse.quote(video_url, safe="")
|
||||
proxy_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--index.m3u8"
|
||||
|
||||
dlp.CACHE_TTL = 31536000
|
||||
print(f"[TEST] Requesting main playlist: {proxy_url}")
|
||||
response = requests.get(proxy_url, timeout=10)
|
||||
|
||||
print(f"[TEST] Status: {response.status_code}")
|
||||
print_headers(response.headers)
|
||||
print(f"[TEST] Content preview: {response.text[:200]}")
|
||||
|
||||
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
|
||||
assert "#EXTM3U" in response.text, "Should contain #EXTM3U"
|
||||
assert ".ts" in response.text, "Should contain segment references"
|
||||
print("[TEST] Main playlist returns valid HLS: PASS")
|
||||
|
||||
def test_playlist_contains_proxy_urls(self, test_servers):
|
||||
"""Test playlist URLs are rewritten to proxy."""
|
||||
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
|
||||
encoded = urllib.parse.quote(video_url, safe="")
|
||||
proxy_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--index.m3u8"
|
||||
|
||||
print(f"[TEST] Requesting playlist: {proxy_url}")
|
||||
response = requests.get(proxy_url, timeout=10)
|
||||
|
||||
print(f"[TEST] Content: {response.text}")
|
||||
assert "/hls/" in response.text, "Playlist should contain proxy URLs"
|
||||
print("[TEST] Playlist contains proxy URLs: PASS")
|
||||
|
||||
def test_playlist_content_type_correct(self, test_servers):
|
||||
"""Test playlist returns correct content-type."""
|
||||
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
|
||||
encoded = urllib.parse.quote(video_url, safe="")
|
||||
proxy_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--index.m3u8"
|
||||
|
||||
print(f"[TEST] Requesting: {proxy_url}")
|
||||
response = requests.get(proxy_url, timeout=10)
|
||||
|
||||
print(f"[TEST] Content-Type: {response.headers.get('Content-Type')}")
|
||||
assert "application/vnd.apple.mpegurl" in response.headers.get("Content-Type", "")
|
||||
assert "video/mp2t" not in response.headers.get("Content-Type", "")
|
||||
print("[TEST] Playlist content-type correct: PASS")
|
||||
|
||||
|
||||
class TestErrorMessages:
|
||||
def test_get_error_message(self):
|
||||
assert "Bad Request" in get_error_message(400)
|
||||
assert "Forbidden" in get_error_message(403)
|
||||
assert "Not Found" in get_error_message(404)
|
||||
assert "Internal Server Error" in get_error_message(500)
|
||||
# ============================================================================
|
||||
# Test segment proxying - critical function
|
||||
# ============================================================================
|
||||
|
||||
class TestSegmentProxying:
|
||||
"""Test segment proxying as per AGENTS.md."""
|
||||
|
||||
def test_segment_returns_video_data(self, test_servers):
|
||||
"""Test segment returns video data."""
|
||||
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
|
||||
encoded = urllib.parse.quote(video_url, safe="")
|
||||
playlist_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--index.m3u8"
|
||||
|
||||
print(f"[TEST] Getting main playlist: {playlist_url}")
|
||||
playlist_resp = requests.get(playlist_url, timeout=10)
|
||||
|
||||
# Find segment filename
|
||||
segment_filename = None
|
||||
for line in playlist_resp.text.split("\n"):
|
||||
if line.startswith("/hls/") and "--" in line and ".ts" in line:
|
||||
parts = line.rsplit("--", 1)
|
||||
if len(parts) >= 2:
|
||||
segment_filename = parts[-1]
|
||||
print(f"[TEST] Found segment: {segment_filename}")
|
||||
break
|
||||
|
||||
assert segment_filename is not None, "Should find segment in playlist"
|
||||
|
||||
seg_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--{segment_filename}"
|
||||
print(f"[TEST] Requesting segment: {seg_url}")
|
||||
|
||||
seg_resp = requests.get(seg_url, timeout=10)
|
||||
|
||||
print(f"[TEST] Segment status: {seg_resp.status_code}")
|
||||
print_headers(seg_resp.headers)
|
||||
print(f"[TEST] Segment size: {len(seg_resp.content)} bytes")
|
||||
|
||||
assert seg_resp.status_code == 200
|
||||
assert "video/mp2t" in seg_resp.headers.get("Content-Type", "")
|
||||
assert len(seg_resp.content) > 1000, "Segment should have substantial data"
|
||||
assert b"#EXTM3U" not in seg_resp.content[:100], "Segment should NOT be a playlist"
|
||||
|
||||
print("[TEST] Segment returns video data: PASS")
|
||||
|
||||
|
||||
class TestFlaskApp:
|
||||
def test_index_route(self):
|
||||
from app import app
|
||||
with app.test_client() as client:
|
||||
response = client.get("/")
|
||||
assert response.status_code == 200
|
||||
|
||||
def test_player_route_missing_url(self):
|
||||
# ============================================================================
|
||||
# Test error handling - critical function
|
||||
# ============================================================================
|
||||
|
||||
class TestErrorHandling:
|
||||
"""Test error handling as per AGENTS.md."""
|
||||
|
||||
def test_player_missing_url_returns_400(self):
|
||||
"""Test player route with missing URL returns 400."""
|
||||
from app import app
|
||||
with app.test_client() as client:
|
||||
print("[TEST] Testing /player with no URL")
|
||||
response = client.get("/player")
|
||||
print(f"[TEST] Status: {response.status_code}")
|
||||
assert response.status_code == 400
|
||||
|
||||
def test_player_route_invalid_url(self):
|
||||
|
||||
def test_player_invalid_url_returns_400(self):
|
||||
"""Test player route with invalid URL returns 400."""
|
||||
from app import app
|
||||
with app.test_client() as client:
|
||||
response = client.get("/player?url=https://evil.com/video")
|
||||
print("[TEST] Testing /player with invalid URL")
|
||||
response = client.get("/player?url=not-valid")
|
||||
print(f"[TEST] Status: {response.status_code}")
|
||||
assert response.status_code == 400
|
||||
|
||||
def test_hls_proxy_invalid_path(self):
|
||||
|
||||
def test_hls_invalid_video_url_returns_400(self):
|
||||
"""Test HLS route with invalid video URL returns 400."""
|
||||
from app import app
|
||||
with app.test_client() as client:
|
||||
response = client.get("/hls")
|
||||
print("[TEST] Testing /hls with invalid video URL")
|
||||
response = client.get("/hls/evil.com--index.m3u8")
|
||||
print(f"[TEST] Status: {response.status_code}")
|
||||
assert response.status_code == 400
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Integration tests - main application flow as per AGENTS.md
|
||||
# ============================================================================
|
||||
|
||||
class TestIntegration:
|
||||
"""Integration tests for main application flow as per AGENTS.md."""
|
||||
|
||||
def test_pornhub_video_full_flow(self):
|
||||
"""Test PornHub video with full debug output."""
|
||||
import dlp
|
||||
dlp._session_cache.clear()
|
||||
dlp._cache_timestamps.clear()
|
||||
|
||||
video_url = "https://rt.pornhub.com/view_video.php?viewkey=69c13273df690"
|
||||
|
||||
print(f"\n[TEST] PornHub video: {video_url}")
|
||||
|
||||
# Get stream info
|
||||
info = dlp.get_stream_info(video_url)
|
||||
print(f"[TEST] Title: {info.get('title', 'N/A')[:50]}")
|
||||
print(f"[TEST] HLS URL: {info.get('hls_url', 'N/A')[:80] if info.get('hls_url') else 'N/A'}")
|
||||
|
||||
# Get playlist
|
||||
playlist = dlp.get_hls_playlist(video_url)
|
||||
print(f"[TEST] Playlist content (first 300 chars): {playlist[:300]}")
|
||||
print_hex(playlist[:100])
|
||||
|
||||
assert "#EXTM3U" in playlist
|
||||
assert "/hls/" in playlist
|
||||
print("[TEST] PornHub full flow: PASS")
|
||||
|
||||
def test_youtube_video_fallback(self):
|
||||
"""Test YouTube uses direct URL fallback."""
|
||||
import dlp
|
||||
dlp._session_cache.clear()
|
||||
dlp._cache_timestamps.clear()
|
||||
|
||||
video_url = "https://www.youtube.com/watch?v=PoV9fS4CnaY"
|
||||
|
||||
print(f"\n[TEST] YouTube video: {video_url}")
|
||||
|
||||
info = dlp.get_stream_info(video_url)
|
||||
print(f"[TEST] Title: {info.get('title', 'N/A')[:50]}")
|
||||
print(f"[TEST] Direct URL: {info.get('direct_url', 'N/A')[:80] if info.get('direct_url') else 'N/A'}")
|
||||
|
||||
assert "title" in info
|
||||
print("[TEST] YouTube fallback: PASS")
|
||||
|
||||
def test_yt_dlp_consumes_proxy_playlist(self):
|
||||
"""Test yt-dlp can consume proxy playlist like browser."""
|
||||
import dlp
|
||||
dlp._session_cache.clear()
|
||||
dlp._cache_timestamps.clear()
|
||||
|
||||
video_url = "https://rt.pornhub.com/view_video.php?viewkey=69c13273df690"
|
||||
encoded_url = urllib.parse.quote(video_url, safe="")
|
||||
playlist_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded_url}--index.m3u8"
|
||||
|
||||
print(f"\n[TEST] yt-dlp proxy URL: {playlist_url}")
|
||||
|
||||
cmd = [
|
||||
"yt-dlp",
|
||||
"--hls-use-mpegts",
|
||||
"--no-download",
|
||||
"--print", "url",
|
||||
playlist_url
|
||||
]
|
||||
|
||||
print(f"[TEST] Running: {' '.join(cmd)}")
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
|
||||
|
||||
print(f"[TEST] yt-dlp return code: {result.returncode}")
|
||||
if result.stdout:
|
||||
print(f"[TEST] yt-dlp output: {result.stdout[:200]}")
|
||||
if result.returncode != 0:
|
||||
print(f"[TEST] yt-dlp stderr: {result.stderr[:500]}")
|
||||
|
||||
assert result.returncode == 0, f"yt-dlp failed: {result.stderr}"
|
||||
print("[TEST] yt-dlp consumes proxy playlist: PASS")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
pytest.main([__file__, "-v", "-s"])
|
||||
Reference in New Issue
Block a user