Files

390 lines
13 KiB
Python
Raw Permalink Normal View History

import logging
import os
import time
from typing import Optional
from urllib.parse import unquote
2026-04-01 20:41:52 +00:00
from urllib.parse import urlparse
import yt_dlp
2026-04-01 20:41:52 +00:00
from yt_dlp.networking import Request
logger = logging.getLogger(__name__)
CACHE_TTL = int(os.getenv("CACHE_TTL", 31536000))
SOCKET_TIMEOUT = int(os.getenv("SOCKET_TIMEOUT", 30))
_session_cache = {}
_cache_timestamps = {}
_ydl_instance = None
def _get_ydl():
"""Get or create a singleton yt-dlp instance."""
global _ydl_instance
if _ydl_instance is None:
_ydl_instance = yt_dlp.YoutubeDL({
"quiet": True,
"no_warnings": True,
"socket_timeout": SOCKET_TIMEOUT,
})
return _ydl_instance
def _get_cache_key(video_url: str) -> str:
return video_url
def _is_cache_expired(video_url: str) -> bool:
key = _get_cache_key(video_url)
if key not in _cache_timestamps:
return True
return time.time() - _cache_timestamps[key] > CACHE_TTL
def _get_cached_info(video_url: str) -> Optional[dict]:
key = _get_cache_key(video_url)
if key in _session_cache and not _is_cache_expired(video_url):
return _session_cache[key]
return None
def _set_cached_info(video_url: str, info: dict) -> None:
key = _get_cache_key(video_url)
_session_cache[key] = info
_cache_timestamps[key] = time.time()
2026-04-01 20:41:52 +00:00
# store segment mappings per video
_segment_maps = {}
def _get_segment_id(full_url: str) -> str:
"""Build a stable segment id that survives signed query refreshes."""
import hashlib
parsed = urlparse(full_url)
stable_key = parsed.path or full_url.split("?", 1)[0]
return hashlib.md5(stable_key.encode("utf-8")).hexdigest()
def _refresh_hls_url(video_url: str, attempts: int = 3) -> Optional[str]:
"""Re-extract until yt-dlp returns an HLS URL or we exhaust retries."""
last_info = None
for _ in range(attempts):
_session_cache.pop(video_url, None)
_cache_timestamps.pop(video_url, None)
info = _get_video_info(video_url)
last_info = info
if info.get("hls_url"):
return info["hls_url"]
if last_info and last_info.get("direct_url"):
logger.info("Extractor returned direct URL but no HLS URL")
return None
def _get_request_headers(video_url: str) -> dict:
info = _get_video_info(video_url)
raw_info = info.get("raw_info") or {}
return dict(raw_info.get("http_headers") or {})
def _fetch_url(video_url: str, url: str) -> bytes:
ydl = _get_ydl()
request = Request(url, headers=_get_request_headers(video_url))
with ydl.urlopen(request) as response:
return response.read()
def _populate_nested_maps(video_url: str, content: str, base_url: str, video_id: str, visited: Optional[set[str]] = None, depth: int = 0) -> None:
"""Preload nested playlists so segment ids survive rebuilds after 410s."""
from urllib.parse import urljoin, urlparse
if visited is None:
visited = set()
if depth >= 3:
return
for line in content.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
parsed = urlparse(line)
full_url = line if parsed.scheme else urljoin(base_url, line)
if not urlparse(full_url).path.endswith(".m3u8") or full_url in visited:
continue
visited.add(full_url)
try:
nested_content = _fetch_url(video_url, full_url).decode("utf-8")
_rewrite_urls(nested_content, video_url, full_url, video_id)
_populate_nested_maps(video_url, nested_content, full_url, video_id, visited, depth + 1)
except Exception as e:
logger.info("Failed to preload nested playlist: %s", e)
def _extract_hls_url(info: dict) -> Optional[str]:
"""Extract HLS URL from yt-dlp info dict."""
# First check top-level fields (these are set when there's only one format)
url = info.get("manifest_url") or info.get("url")
if url and ".m3u8" in url:
return url
# Check requested_formats (post-processed by yt-dlp)
if info.get("requested_formats"):
for f in info["requested_formats"]:
url = f.get("url") or f.get("manifest_url")
if url and ".m3u8" in url:
return url
# Check formats for m3u8_native protocol
if info.get("formats"):
for f in reversed(info["formats"]):
if f.get("protocol") == "m3u8_native":
url = f.get("manifest_url") or f.get("url")
if url and ".m3u8" in url:
return url
# Try to find any m3u8 URL in formats
if info.get("formats"):
for f in info["formats"]:
url = f.get("url", "")
if ".m3u8" in url:
return url
return None
def _extract_direct_url(info: dict) -> Optional[str]:
"""Extract direct video URL when HLS is not available."""
# Check url field first
url = info.get("url")
if url:
return url
# Check requested_formats
if info.get("requested_formats"):
for f in info["requested_formats"]:
url = f.get("url")
if url:
return url
# Check formats for best quality https format
if info.get("formats"):
for f in reversed(info["formats"]):
if f.get("protocol") in ("https", "http"):
url = f.get("url")
if url:
return url
return None
def _get_video_info(video_url: str) -> dict:
"""Get video info using yt-dlp."""
cached = _get_cached_info(video_url)
if cached:
return cached
import shutil
if not shutil.which("node"):
deno_path = os.path.expanduser("~/.deno/bin/deno")
if not os.path.exists(deno_path):
logger.warning("No JavaScript runtime (node/deno) found - YouTube may not work properly")
ydl = _get_ydl()
info = ydl.extract_info(video_url, download=False)
hls_url = _extract_hls_url(info)
direct_url = _extract_direct_url(info)
result = {
"title": info.get("title"),
"thumbnail": info.get("thumbnail"),
"hls_url": hls_url,
"direct_url": direct_url,
"raw_info": info,
}
_set_cached_info(video_url, result)
return result
def get_stream_info(video_url: str) -> dict:
"""Get video info with all available metadata."""
info = _get_video_info(video_url)
# Extract useful metadata from raw_info
raw = info.get("raw_info", {})
metadata = {
"title": info["title"],
"thumbnail": info["thumbnail"],
"hls_url": info.get("hls_url"),
"direct_url": info.get("direct_url"),
# Additional metadata
"description": raw.get("description"),
"uploader": raw.get("uploader"),
"uploader_url": raw.get("uploader_url"),
"duration": raw.get("duration"),
"upload_date": raw.get("upload_date"),
"view_count": raw.get("view_count"),
"like_count": raw.get("like_count"),
"dislike_count": raw.get("dislike_count"),
"comment_count": raw.get("comment_count"),
"age_limit": raw.get("age_limit"),
"categories": raw.get("categories"),
"tags": raw.get("tags"),
"language": raw.get("language"),
"license": raw.get("license"),
"channel": raw.get("channel"),
"channel_url": raw.get("channel_url"),
"channel_id": raw.get("channel_id"),
"extractor": raw.get("extractor"),
"extractor_key": raw.get("extractor_key"),
"display_id": raw.get("display_id"),
"url": raw.get("url"),
"fulltitle": raw.get("fulltitle"),
"duration_string": raw.get("duration_string"),
"resolution": raw.get("resolution"),
"format": raw.get("format"),
"format_note": raw.get("format_note"),
"filesize": raw.get("filesize"),
"filesize_approx": raw.get("filesize_approx"),
}
return metadata
def get_hls_playlist(video_url: str) -> str:
"""Get HLS playlist content with rewritten URLs."""
info = _get_video_info(video_url)
hls_url = info.get("hls_url")
if not hls_url:
2026-04-01 20:41:52 +00:00
hls_url = _refresh_hls_url(video_url)
if not hls_url:
raise ValueError("No HLS stream available for this video")
2026-04-01 20:41:52 +00:00
from utils import get_video_id
video_id = get_video_id(video_url)
# Try to get playlist, retry once if URL expired
for attempt in range(2):
try:
2026-04-01 20:41:52 +00:00
playlist_content = _fetch_url(video_url, hls_url).decode("utf-8")
rewritten = _rewrite_urls(playlist_content, video_url, hls_url, video_id)
_populate_nested_maps(video_url, playlist_content, hls_url, video_id)
return rewritten
except Exception as e:
if "410" in str(e) and attempt == 0:
logger.info("HLS URL expired, fetching fresh HLS URL")
2026-04-01 20:41:52 +00:00
hls_url = _refresh_hls_url(video_url)
if not hls_url:
raise ValueError("No HLS stream available for this video")
continue
raise
def get_direct_video_url(video_url: str) -> str:
"""Get direct video URL when HLS is not available."""
info = _get_video_info(video_url)
if not info.get("direct_url"):
raise ValueError("No video URL available for this video")
return info["direct_url"]
2026-04-01 20:41:52 +00:00
def _rewrite_urls(content: str, video_url: str, base_url: str, video_id: str) -> str:
"""Rewrite relative URLs in HLS playlist to point through proxy."""
2026-04-01 20:41:52 +00:00
from urllib.parse import urljoin, urlparse
lines = content.split("\n")
new_lines = []
2026-04-01 20:41:52 +00:00
# persist mapping across nested playlists
if video_url not in _segment_maps:
_segment_maps[video_url] = {}
segment_map = _segment_maps[video_url]
for line in lines:
if line and not line.startswith("#"):
parsed = urlparse(line)
2026-04-01 20:41:52 +00:00
if parsed.scheme:
2026-04-01 20:41:52 +00:00
full_url = line
else:
2026-04-01 20:41:52 +00:00
full_url = urljoin(base_url, line)
# stable id must ignore expiring signatures in query strings
seg_id = _get_segment_id(full_url)
segment_map[seg_id] = full_url
proxy_url = f"/hls/{video_id}/seg/{seg_id}"
new_lines.append(proxy_url)
continue
2026-04-01 20:41:52 +00:00
new_lines.append(line)
2026-04-01 20:41:52 +00:00
# mapping already updated in-place
return "\n".join(new_lines)
def get_hls_segment(video_url: str, segment_url: str) -> bytes:
"""Get HLS segment or sub-playlist content."""
2026-04-01 20:41:52 +00:00
# Pure mapping-based resolution (no yt-dlp dependency here)
# New format: segment_url is index
seg_id = segment_url
segment_map = _segment_maps.get(video_url)
if not segment_map:
# build mapping on-demand to avoid state coupling
_ = get_hls_playlist(video_url)
segment_map = _segment_maps.get(video_url)
if not segment_map:
raise ValueError("No segment map available")
if seg_id not in segment_map:
# try rebuild once to refresh mappings (e.g., after expiry)
_ = get_hls_playlist(video_url)
segment_map = _segment_maps.get(video_url) or {}
if seg_id not in segment_map:
raise ValueError("Segment not found")
full_url = segment_map[seg_id]
try:
2026-04-01 20:41:52 +00:00
data = _fetch_url(video_url, full_url)
except Exception as e:
raise ValueError("HLS URL expired (410 Gone)") from e
# Detect playlist dynamically (covers sub-playlists too)
try:
from utils import get_video_id
video_id = get_video_id(video_url)
text = data.decode("utf-8", errors="ignore")
head = text.lstrip()[:200]
if "#EXTM3U" in head:
rewritten = _rewrite_urls(text, video_url, full_url, video_id)
return rewritten.encode("utf-8")
except Exception:
pass
return data
def get_hls_segment_with_retry(video_url: str, segment_url: str) -> bytes:
2026-04-01 20:41:52 +00:00
"""Get HLS segment with one rebuild after signed URL expiry."""
for attempt in range(2):
try:
2026-04-01 20:41:52 +00:00
if video_url not in _segment_maps:
_ = get_hls_playlist(video_url)
return get_hls_segment(video_url, segment_url)
except ValueError as e:
2026-04-01 20:41:52 +00:00
if "410 Gone" in str(e):
if attempt == 0:
logger.info("Segment 410, retrying")
continue
2026-04-01 20:41:52 +00:00
logger.info("Segment still 410, rebuilding playlist and map")
_session_cache.pop(video_url, None)
_cache_timestamps.pop(video_url, None)
_segment_maps.pop(video_url, None)
_ = get_hls_playlist(video_url)
return get_hls_segment(video_url, segment_url)
raise