From 01a376ae2123543656cb65bbf3c6f174687ae7ab Mon Sep 17 00:00:00 2001 From: Mikhail Yevchenko Date: Wed, 1 Apr 2026 12:47:21 +0000 Subject: [PATCH] Enhance HLS proxy functionality and improve caching mechanism - Updated AGENTS.md to clarify dlp.py module usage and segment handling. - Modified README.md to include ALLOW_LOCAL configuration for testing. - Refactored app.py to streamline HLS proxy logic and improve error handling. - Enhanced dlp.py to optimize caching and segment retrieval processes. - Updated player.html to ensure proper JSON formatting for proxy URLs. - Improved test_integration.py to validate HLS segment proxying and added test for Pornhub HLS extraction. - Adjusted test_proxy.py to reflect changes in caching functions and data structure. --- AGENTS.md | 5 +- README.md | 1 + app.py | 22 ++--- dlp.py | 174 +++++++++++++++++++------------------- templates/player.html | 2 +- tests/test_integration.py | 36 +++++++- tests/test_proxy.py | 13 +-- 7 files changed, 143 insertions(+), 110 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index cb1fdd8..013be4a 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -41,10 +41,11 @@ As an HTML templating engine, you can use Jinja2, which is built into Flask, for ``` - app.py - main Flask application file that handles incoming HTTP requests and interacts with yt-dlp through functions from dlp.py. -- dlp.py - module for interacting with yt-dlp, containing functions to get HLS playlists and segments. +- dlp.py - module for interacting with yt-dlp, containing functions to get HLS playlists and segments. examine yt_dlp/YoutubeDL.py in venv in order to understand how to use yt-dlp for getting HLS playlists and segments functions: - get_hls_playlist(video_url): gets HLS playlist for the specified video as a string that can be returned to the client. The segment list should be filtered to only include those available for the given video and supported by yt-dlp. - - get_hls_segment(video_url, segment_name): gets the specified video segment: downloads it using yt-dlp and returns its content as bytes that can be returned to the client. It should also use yt-dlp to download the segment since only yt-dlp can handle the necessary authentication and access control for the video content. + it should also rewrite segment filenames in case if they expire during of before download, so that they can be requested through the proxy using predictable URL structure. + - get_hls_segment(video_url, segment_filename): gets the specified video segment for rewritten filename: downloads it using yt-dlp and returns its content as bytes that can be returned to the client. It should also use yt-dlp to download the segment since only yt-dlp can handle the necessary authentication and access control for the video content. caching: - Caching of yt-dlp sessions will be implemented using a simple in-memory dictionary that will store video parsing results for each VIDEO_ID. No complex in-memory solutions, just a dictionary with TTL for each key. TTL will be set to 365 days, which will effectively cache results and minimize repeated requests to yt-dlp. diff --git a/README.md b/README.md index 824d631..121e1b3 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ Visit http://localhost:5000 and enter a video URL. | SOCKET_TIMEOUT | 30 | Socket timeout for requests | | VALIDATION_ENABLED | true | Enable URL validation | | ALLOWED_DOMAINS | youtube.com,youtu.be,pornhub.com,xvideos.com | Allowed video domains | +| ALLOW_LOCAL | true | Allow localhost/127.0.0.1 URLs (for testing) | ## Routes diff --git a/app.py b/app.py index b46fece..4048abc 100644 --- a/app.py +++ b/app.py @@ -56,27 +56,27 @@ def hls_proxy(): if not url_param: abort(400, description="Missing url parameter") - from urllib.parse import urlparse, unquote + from urllib.parse import unquote path = request.args.get("path", "") - - if ".m3u8" in url_param and not path: - video_url = url_param - elif ".m3u8" in url_param and path: - video_url = url_param - else: - video_url = url_param - - video_url = unquote(video_url) + video_url = unquote(url_param) if not is_valid_url(video_url): abort(400, description="Invalid URL") - if path.endswith(".m3u8") or not path: + # Main playlist request - get from yt-dlp and rewrite URLs + if path == "index.m3u8" or path == "": playlist = dlp.get_hls_playlist(video_url) return Response(playlist, mimetype="application/vnd.apple.mpegurl") + # Sub-playlist or segment request - path is the absolute URL segment_data = dlp.get_hls_segment(video_url, path) + + if segment_data is None: + abort(500, description="Failed to fetch segment") + + if path.endswith(".m3u8"): + return Response(segment_data, mimetype="application/vnd.apple.mpegurl") return Response(segment_data, mimetype="video/mp2t") except HTTPException: diff --git a/dlp.py b/dlp.py index 39e0954..77bc1eb 100644 --- a/dlp.py +++ b/dlp.py @@ -1,22 +1,18 @@ import logging import os import time -import re from typing import Optional import yt_dlp logger = logging.getLogger(__name__) CACHE_TTL = int(os.getenv("CACHE_TTL", 31536000)) +SOCKET_TIMEOUT = int(os.getenv("SOCKET_TIMEOUT", 30)) _session_cache = {} _cache_timestamps = {} -def _is_hls_url(url: str) -> bool: - return url.endswith(".m3u8") or "m3u8" in url - - def _get_cache_key(video_url: str) -> str: return video_url @@ -28,110 +24,112 @@ def _is_cache_expired(video_url: str) -> bool: return time.time() - _cache_timestamps[key] > CACHE_TTL -def _get_cached_session(video_url: str) -> Optional[dict]: +def _get_cached_info(video_url: str) -> Optional[dict]: key = _get_cache_key(video_url) if key in _session_cache and not _is_cache_expired(video_url): return _session_cache[key] return None -def _set_cached_session(video_url: str, session_data: dict) -> None: +def _set_cached_info(video_url: str, info: dict) -> None: key = _get_cache_key(video_url) - _session_cache[key] = session_data + _session_cache[key] = info _cache_timestamps[key] = time.time() -def clear_expired_cache() -> None: - expired_keys = [ - key for key in _session_cache - if _is_cache_expired(key) - ] - for key in expired_keys: - del _session_cache[key] - del _cache_timestamps[key] +def _extract_hls_url(info: dict) -> Optional[str]: + """Extract HLS URL from yt-dlp info dict.""" + if info.get("formats"): + for f in reversed(info["formats"]): + if f.get("protocol") == "m3u8_native": + url = f.get("manifest_url") or f.get("url") + if url and ".m3u8" in url: + return url + return None -def get_hls_playlist(video_url: str) -> str: - cached = _get_cached_session(video_url) - if cached and "hls_playlist" in cached: - return cached["hls_playlist"] - - if _is_hls_url(video_url): - hls_url = video_url - else: - ydl_opts = { - "quiet": True, - "no_warnings": True, - "socket_timeout": int(os.getenv("SOCKET_TIMEOUT", 30)), - } - - with yt_dlp.YoutubeDL(ydl_opts) as ydl: - info = ydl.extract_info(video_url, download=False) - - if not info or "hls" not in info or not info["hls"]: - raise ValueError("No HLS stream available for this video") - - hls_url = info["hls"] - - import urllib.request - with urllib.request.urlopen(hls_url, timeout=30) as response: - playlist_content = response.read().decode("utf-8") - - session_data = { - "hls_playlist": playlist_content, - "hls_url": hls_url, - "video_url": video_url, - } - _set_cached_session(video_url, session_data) - - return playlist_content - - -def get_hls_segment(video_url: str, segment_name: str) -> bytes: - cached = _get_cached_session(video_url) - if not cached or "hls_url" not in cached: - get_hls_playlist(video_url) - cached = _get_cached_session(video_url) - - hls_url = cached["hls_url"] - base_url = hls_url.rsplit("/", 1)[0] - - if segment_name.startswith("/"): - segment_name = segment_name[1:] - - segment_url = f"{base_url}/{segment_name}" - - import urllib.request - with urllib.request.urlopen(segment_url, timeout=30) as response: - return response.read() - - -def get_stream_info(video_url: str) -> dict: - cached = _get_cached_session(video_url) +def _get_video_info(video_url: str) -> dict: + """Get video info using yt-dlp.""" + cached = _get_cached_info(video_url) if cached: return cached - if _is_hls_url(video_url): - return { - "title": "Test Video", - "hls_url": video_url, - "thumbnail": None, - } - ydl_opts = { "quiet": True, "no_warnings": True, - "socket_timeout": int(os.getenv("SOCKET_TIMEOUT", 30)), + "socket_timeout": SOCKET_TIMEOUT, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(video_url, download=False) - if not info: - raise ValueError("Could not extract video info") + hls_url = _extract_hls_url(info) + result = { + "title": info.get("title"), + "thumbnail": info.get("thumbnail"), + "hls_url": hls_url, + "raw_info": info, + } + _set_cached_info(video_url, result) + return result - return { - "title": info.get("title", "Unknown"), - "hls_url": info.get("hls"), - "thumbnail": info.get("thumbnail"), - } + +def get_stream_info(video_url: str) -> dict: + """Get video info (title, hls_url, thumbnail).""" + info = _get_video_info(video_url) + return { + "title": info["title"], + "hls_url": info["hls_url"], + "thumbnail": info["thumbnail"], + } + + +def get_hls_playlist(video_url: str) -> str: + """Get HLS playlist content with rewritten URLs.""" + info = _get_video_info(video_url) + if not info["hls_url"]: + raise ValueError("No HLS stream available for this video") + + import urllib.request + with urllib.request.urlopen(info["hls_url"], timeout=SOCKET_TIMEOUT) as response: + playlist_content = response.read().decode("utf-8") + + return _rewrite_urls(playlist_content, video_url, info["hls_url"]) + + +def _rewrite_urls(content: str, video_url: str, base_url: str) -> str: + """Rewrite relative URLs in HLS playlist to point through proxy.""" + from urllib.parse import urljoin, quote + + lines = content.split("\n") + new_lines = [] + for line in lines: + if line and not line.startswith("#") and line.startswith("http"): + abs_url = line + elif line and not line.startswith("#"): + abs_url = urljoin(base_url, line) + proxy_url = f"/hls?url={quote(video_url, safe='')}&path={quote(abs_url, safe='')}" + new_lines.append(proxy_url) + continue + new_lines.append(line) + return "\n".join(new_lines) + + +def get_hls_segment(video_url: str, segment_url: str) -> bytes: + """Get HLS segment or sub-playlist content.""" + from urllib.parse import unquote + + decoded_url = unquote(segment_url) + + import urllib.request + try: + response = urllib.request.urlopen(decoded_url, timeout=SOCKET_TIMEOUT) + data = response.read() + except urllib.error.HTTPError as e: + if e.code == 410: + raise ValueError("HLS URL expired (410 Gone)") + raise + + if decoded_url.endswith(".m3u8"): + return _rewrite_urls(data.decode("utf-8"), video_url, decoded_url).encode("utf-8") + return data diff --git a/templates/player.html b/templates/player.html index 4582931..471a6f2 100644 --- a/templates/player.html +++ b/templates/player.html @@ -40,7 +40,7 @@