diff --git a/AGENTS.md b/AGENTS.md
index cb1fdd8..013be4a 100644
--- a/AGENTS.md
+++ b/AGENTS.md
@@ -41,10 +41,11 @@ As an HTML templating engine, you can use Jinja2, which is built into Flask, for
```
- app.py - main Flask application file that handles incoming HTTP requests and interacts with yt-dlp through functions from dlp.py.
-- dlp.py - module for interacting with yt-dlp, containing functions to get HLS playlists and segments.
+- dlp.py - module for interacting with yt-dlp, containing functions to get HLS playlists and segments. examine yt_dlp/YoutubeDL.py in venv in order to understand how to use yt-dlp for getting HLS playlists and segments
functions:
- get_hls_playlist(video_url): gets HLS playlist for the specified video as a string that can be returned to the client. The segment list should be filtered to only include those available for the given video and supported by yt-dlp.
- - get_hls_segment(video_url, segment_name): gets the specified video segment: downloads it using yt-dlp and returns its content as bytes that can be returned to the client. It should also use yt-dlp to download the segment since only yt-dlp can handle the necessary authentication and access control for the video content.
+ it should also rewrite segment filenames in case if they expire during of before download, so that they can be requested through the proxy using predictable URL structure.
+ - get_hls_segment(video_url, segment_filename): gets the specified video segment for rewritten filename: downloads it using yt-dlp and returns its content as bytes that can be returned to the client. It should also use yt-dlp to download the segment since only yt-dlp can handle the necessary authentication and access control for the video content.
caching:
- Caching of yt-dlp sessions will be implemented using a simple in-memory dictionary that will store video parsing results for each VIDEO_ID. No complex in-memory solutions, just a dictionary with TTL for each key. TTL will be set to 365 days, which will effectively cache results and minimize repeated requests to yt-dlp.
diff --git a/README.md b/README.md
index 824d631..121e1b3 100644
--- a/README.md
+++ b/README.md
@@ -30,6 +30,7 @@ Visit http://localhost:5000 and enter a video URL.
| SOCKET_TIMEOUT | 30 | Socket timeout for requests |
| VALIDATION_ENABLED | true | Enable URL validation |
| ALLOWED_DOMAINS | youtube.com,youtu.be,pornhub.com,xvideos.com | Allowed video domains |
+| ALLOW_LOCAL | true | Allow localhost/127.0.0.1 URLs (for testing) |
## Routes
diff --git a/app.py b/app.py
index b46fece..4048abc 100644
--- a/app.py
+++ b/app.py
@@ -56,27 +56,27 @@ def hls_proxy():
if not url_param:
abort(400, description="Missing url parameter")
- from urllib.parse import urlparse, unquote
+ from urllib.parse import unquote
path = request.args.get("path", "")
-
- if ".m3u8" in url_param and not path:
- video_url = url_param
- elif ".m3u8" in url_param and path:
- video_url = url_param
- else:
- video_url = url_param
-
- video_url = unquote(video_url)
+ video_url = unquote(url_param)
if not is_valid_url(video_url):
abort(400, description="Invalid URL")
- if path.endswith(".m3u8") or not path:
+ # Main playlist request - get from yt-dlp and rewrite URLs
+ if path == "index.m3u8" or path == "":
playlist = dlp.get_hls_playlist(video_url)
return Response(playlist, mimetype="application/vnd.apple.mpegurl")
+ # Sub-playlist or segment request - path is the absolute URL
segment_data = dlp.get_hls_segment(video_url, path)
+
+ if segment_data is None:
+ abort(500, description="Failed to fetch segment")
+
+ if path.endswith(".m3u8"):
+ return Response(segment_data, mimetype="application/vnd.apple.mpegurl")
return Response(segment_data, mimetype="video/mp2t")
except HTTPException:
diff --git a/dlp.py b/dlp.py
index 39e0954..77bc1eb 100644
--- a/dlp.py
+++ b/dlp.py
@@ -1,22 +1,18 @@
import logging
import os
import time
-import re
from typing import Optional
import yt_dlp
logger = logging.getLogger(__name__)
CACHE_TTL = int(os.getenv("CACHE_TTL", 31536000))
+SOCKET_TIMEOUT = int(os.getenv("SOCKET_TIMEOUT", 30))
_session_cache = {}
_cache_timestamps = {}
-def _is_hls_url(url: str) -> bool:
- return url.endswith(".m3u8") or "m3u8" in url
-
-
def _get_cache_key(video_url: str) -> str:
return video_url
@@ -28,110 +24,112 @@ def _is_cache_expired(video_url: str) -> bool:
return time.time() - _cache_timestamps[key] > CACHE_TTL
-def _get_cached_session(video_url: str) -> Optional[dict]:
+def _get_cached_info(video_url: str) -> Optional[dict]:
key = _get_cache_key(video_url)
if key in _session_cache and not _is_cache_expired(video_url):
return _session_cache[key]
return None
-def _set_cached_session(video_url: str, session_data: dict) -> None:
+def _set_cached_info(video_url: str, info: dict) -> None:
key = _get_cache_key(video_url)
- _session_cache[key] = session_data
+ _session_cache[key] = info
_cache_timestamps[key] = time.time()
-def clear_expired_cache() -> None:
- expired_keys = [
- key for key in _session_cache
- if _is_cache_expired(key)
- ]
- for key in expired_keys:
- del _session_cache[key]
- del _cache_timestamps[key]
+def _extract_hls_url(info: dict) -> Optional[str]:
+ """Extract HLS URL from yt-dlp info dict."""
+ if info.get("formats"):
+ for f in reversed(info["formats"]):
+ if f.get("protocol") == "m3u8_native":
+ url = f.get("manifest_url") or f.get("url")
+ if url and ".m3u8" in url:
+ return url
+ return None
-def get_hls_playlist(video_url: str) -> str:
- cached = _get_cached_session(video_url)
- if cached and "hls_playlist" in cached:
- return cached["hls_playlist"]
-
- if _is_hls_url(video_url):
- hls_url = video_url
- else:
- ydl_opts = {
- "quiet": True,
- "no_warnings": True,
- "socket_timeout": int(os.getenv("SOCKET_TIMEOUT", 30)),
- }
-
- with yt_dlp.YoutubeDL(ydl_opts) as ydl:
- info = ydl.extract_info(video_url, download=False)
-
- if not info or "hls" not in info or not info["hls"]:
- raise ValueError("No HLS stream available for this video")
-
- hls_url = info["hls"]
-
- import urllib.request
- with urllib.request.urlopen(hls_url, timeout=30) as response:
- playlist_content = response.read().decode("utf-8")
-
- session_data = {
- "hls_playlist": playlist_content,
- "hls_url": hls_url,
- "video_url": video_url,
- }
- _set_cached_session(video_url, session_data)
-
- return playlist_content
-
-
-def get_hls_segment(video_url: str, segment_name: str) -> bytes:
- cached = _get_cached_session(video_url)
- if not cached or "hls_url" not in cached:
- get_hls_playlist(video_url)
- cached = _get_cached_session(video_url)
-
- hls_url = cached["hls_url"]
- base_url = hls_url.rsplit("/", 1)[0]
-
- if segment_name.startswith("/"):
- segment_name = segment_name[1:]
-
- segment_url = f"{base_url}/{segment_name}"
-
- import urllib.request
- with urllib.request.urlopen(segment_url, timeout=30) as response:
- return response.read()
-
-
-def get_stream_info(video_url: str) -> dict:
- cached = _get_cached_session(video_url)
+def _get_video_info(video_url: str) -> dict:
+ """Get video info using yt-dlp."""
+ cached = _get_cached_info(video_url)
if cached:
return cached
- if _is_hls_url(video_url):
- return {
- "title": "Test Video",
- "hls_url": video_url,
- "thumbnail": None,
- }
-
ydl_opts = {
"quiet": True,
"no_warnings": True,
- "socket_timeout": int(os.getenv("SOCKET_TIMEOUT", 30)),
+ "socket_timeout": SOCKET_TIMEOUT,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=False)
- if not info:
- raise ValueError("Could not extract video info")
+ hls_url = _extract_hls_url(info)
+ result = {
+ "title": info.get("title"),
+ "thumbnail": info.get("thumbnail"),
+ "hls_url": hls_url,
+ "raw_info": info,
+ }
+ _set_cached_info(video_url, result)
+ return result
- return {
- "title": info.get("title", "Unknown"),
- "hls_url": info.get("hls"),
- "thumbnail": info.get("thumbnail"),
- }
+
+def get_stream_info(video_url: str) -> dict:
+ """Get video info (title, hls_url, thumbnail)."""
+ info = _get_video_info(video_url)
+ return {
+ "title": info["title"],
+ "hls_url": info["hls_url"],
+ "thumbnail": info["thumbnail"],
+ }
+
+
+def get_hls_playlist(video_url: str) -> str:
+ """Get HLS playlist content with rewritten URLs."""
+ info = _get_video_info(video_url)
+ if not info["hls_url"]:
+ raise ValueError("No HLS stream available for this video")
+
+ import urllib.request
+ with urllib.request.urlopen(info["hls_url"], timeout=SOCKET_TIMEOUT) as response:
+ playlist_content = response.read().decode("utf-8")
+
+ return _rewrite_urls(playlist_content, video_url, info["hls_url"])
+
+
+def _rewrite_urls(content: str, video_url: str, base_url: str) -> str:
+ """Rewrite relative URLs in HLS playlist to point through proxy."""
+ from urllib.parse import urljoin, quote
+
+ lines = content.split("\n")
+ new_lines = []
+ for line in lines:
+ if line and not line.startswith("#") and line.startswith("http"):
+ abs_url = line
+ elif line and not line.startswith("#"):
+ abs_url = urljoin(base_url, line)
+ proxy_url = f"/hls?url={quote(video_url, safe='')}&path={quote(abs_url, safe='')}"
+ new_lines.append(proxy_url)
+ continue
+ new_lines.append(line)
+ return "\n".join(new_lines)
+
+
+def get_hls_segment(video_url: str, segment_url: str) -> bytes:
+ """Get HLS segment or sub-playlist content."""
+ from urllib.parse import unquote
+
+ decoded_url = unquote(segment_url)
+
+ import urllib.request
+ try:
+ response = urllib.request.urlopen(decoded_url, timeout=SOCKET_TIMEOUT)
+ data = response.read()
+ except urllib.error.HTTPError as e:
+ if e.code == 410:
+ raise ValueError("HLS URL expired (410 Gone)")
+ raise
+
+ if decoded_url.endswith(".m3u8"):
+ return _rewrite_urls(data.decode("utf-8"), video_url, decoded_url).encode("utf-8")
+ return data
diff --git a/templates/player.html b/templates/player.html
index 4582931..471a6f2 100644
--- a/templates/player.html
+++ b/templates/player.html
@@ -40,7 +40,7 @@