import logging import os import time from typing import Optional from urllib.parse import unquote from urllib.parse import urlparse import yt_dlp from yt_dlp.networking import Request logger = logging.getLogger(__name__) CACHE_TTL = int(os.getenv("CACHE_TTL", 31536000)) SOCKET_TIMEOUT = int(os.getenv("SOCKET_TIMEOUT", 30)) _session_cache = {} _cache_timestamps = {} _ydl_instance = None def _get_ydl(): """Get or create a singleton yt-dlp instance.""" global _ydl_instance if _ydl_instance is None: _ydl_instance = yt_dlp.YoutubeDL({ "quiet": True, "no_warnings": True, "socket_timeout": SOCKET_TIMEOUT, }) return _ydl_instance def _get_cache_key(video_url: str) -> str: return video_url def _is_cache_expired(video_url: str) -> bool: key = _get_cache_key(video_url) if key not in _cache_timestamps: return True return time.time() - _cache_timestamps[key] > CACHE_TTL def _get_cached_info(video_url: str) -> Optional[dict]: key = _get_cache_key(video_url) if key in _session_cache and not _is_cache_expired(video_url): return _session_cache[key] return None def _set_cached_info(video_url: str, info: dict) -> None: key = _get_cache_key(video_url) _session_cache[key] = info _cache_timestamps[key] = time.time() # store segment mappings per video _segment_maps = {} def _get_segment_id(full_url: str) -> str: """Build a stable segment id that survives signed query refreshes.""" import hashlib parsed = urlparse(full_url) stable_key = parsed.path or full_url.split("?", 1)[0] return hashlib.md5(stable_key.encode("utf-8")).hexdigest() def _refresh_hls_url(video_url: str, attempts: int = 3) -> Optional[str]: """Re-extract until yt-dlp returns an HLS URL or we exhaust retries.""" last_info = None for _ in range(attempts): _session_cache.pop(video_url, None) _cache_timestamps.pop(video_url, None) info = _get_video_info(video_url) last_info = info if info.get("hls_url"): return info["hls_url"] if last_info and last_info.get("direct_url"): logger.info("Extractor returned direct URL but no HLS URL") return None def _get_request_headers(video_url: str) -> dict: info = _get_video_info(video_url) raw_info = info.get("raw_info") or {} return dict(raw_info.get("http_headers") or {}) def _fetch_url(video_url: str, url: str) -> bytes: ydl = _get_ydl() request = Request(url, headers=_get_request_headers(video_url)) with ydl.urlopen(request) as response: return response.read() def _populate_nested_maps(video_url: str, content: str, base_url: str, video_id: str, visited: Optional[set[str]] = None, depth: int = 0) -> None: """Preload nested playlists so segment ids survive rebuilds after 410s.""" from urllib.parse import urljoin, urlparse if visited is None: visited = set() if depth >= 3: return for line in content.splitlines(): line = line.strip() if not line or line.startswith("#"): continue parsed = urlparse(line) full_url = line if parsed.scheme else urljoin(base_url, line) if not urlparse(full_url).path.endswith(".m3u8") or full_url in visited: continue visited.add(full_url) try: nested_content = _fetch_url(video_url, full_url).decode("utf-8") _rewrite_urls(nested_content, video_url, full_url, video_id) _populate_nested_maps(video_url, nested_content, full_url, video_id, visited, depth + 1) except Exception as e: logger.info("Failed to preload nested playlist: %s", e) def _extract_hls_url(info: dict) -> Optional[str]: """Extract HLS URL from yt-dlp info dict.""" # First check top-level fields (these are set when there's only one format) url = info.get("manifest_url") or info.get("url") if url and ".m3u8" in url: return url # Check requested_formats (post-processed by yt-dlp) if info.get("requested_formats"): for f in info["requested_formats"]: url = f.get("url") or f.get("manifest_url") if url and ".m3u8" in url: return url # Check formats for m3u8_native protocol if info.get("formats"): for f in reversed(info["formats"]): if f.get("protocol") == "m3u8_native": url = f.get("manifest_url") or f.get("url") if url and ".m3u8" in url: return url # Try to find any m3u8 URL in formats if info.get("formats"): for f in info["formats"]: url = f.get("url", "") if ".m3u8" in url: return url return None def _extract_direct_url(info: dict) -> Optional[str]: """Extract direct video URL when HLS is not available.""" # Check url field first url = info.get("url") if url: return url # Check requested_formats if info.get("requested_formats"): for f in info["requested_formats"]: url = f.get("url") if url: return url # Check formats for best quality https format if info.get("formats"): for f in reversed(info["formats"]): if f.get("protocol") in ("https", "http"): url = f.get("url") if url: return url return None def _get_video_info(video_url: str) -> dict: """Get video info using yt-dlp.""" cached = _get_cached_info(video_url) if cached: return cached import shutil if not shutil.which("node"): deno_path = os.path.expanduser("~/.deno/bin/deno") if not os.path.exists(deno_path): logger.warning("No JavaScript runtime (node/deno) found - YouTube may not work properly") ydl = _get_ydl() info = ydl.extract_info(video_url, download=False) hls_url = _extract_hls_url(info) direct_url = _extract_direct_url(info) result = { "title": info.get("title"), "thumbnail": info.get("thumbnail"), "hls_url": hls_url, "direct_url": direct_url, "raw_info": info, } _set_cached_info(video_url, result) return result def get_stream_info(video_url: str) -> dict: """Get video info with all available metadata.""" info = _get_video_info(video_url) # Extract useful metadata from raw_info raw = info.get("raw_info", {}) metadata = { "title": info["title"], "thumbnail": info["thumbnail"], "hls_url": info.get("hls_url"), "direct_url": info.get("direct_url"), # Additional metadata "description": raw.get("description"), "uploader": raw.get("uploader"), "uploader_url": raw.get("uploader_url"), "duration": raw.get("duration"), "upload_date": raw.get("upload_date"), "view_count": raw.get("view_count"), "like_count": raw.get("like_count"), "dislike_count": raw.get("dislike_count"), "comment_count": raw.get("comment_count"), "age_limit": raw.get("age_limit"), "categories": raw.get("categories"), "tags": raw.get("tags"), "language": raw.get("language"), "license": raw.get("license"), "channel": raw.get("channel"), "channel_url": raw.get("channel_url"), "channel_id": raw.get("channel_id"), "extractor": raw.get("extractor"), "extractor_key": raw.get("extractor_key"), "display_id": raw.get("display_id"), "url": raw.get("url"), "fulltitle": raw.get("fulltitle"), "duration_string": raw.get("duration_string"), "resolution": raw.get("resolution"), "format": raw.get("format"), "format_note": raw.get("format_note"), "filesize": raw.get("filesize"), "filesize_approx": raw.get("filesize_approx"), } return metadata def get_hls_playlist(video_url: str) -> str: """Get HLS playlist content with rewritten URLs.""" info = _get_video_info(video_url) hls_url = info.get("hls_url") if not hls_url: hls_url = _refresh_hls_url(video_url) if not hls_url: raise ValueError("No HLS stream available for this video") from utils import get_video_id video_id = get_video_id(video_url) # Try to get playlist, retry once if URL expired for attempt in range(2): try: playlist_content = _fetch_url(video_url, hls_url).decode("utf-8") rewritten = _rewrite_urls(playlist_content, video_url, hls_url, video_id) _populate_nested_maps(video_url, playlist_content, hls_url, video_id) return rewritten except Exception as e: if "410" in str(e) and attempt == 0: logger.info("HLS URL expired, fetching fresh HLS URL") hls_url = _refresh_hls_url(video_url) if not hls_url: raise ValueError("No HLS stream available for this video") continue raise def get_direct_video_url(video_url: str) -> str: """Get direct video URL when HLS is not available.""" info = _get_video_info(video_url) if not info.get("direct_url"): raise ValueError("No video URL available for this video") return info["direct_url"] def _rewrite_urls(content: str, video_url: str, base_url: str, video_id: str) -> str: """Rewrite relative URLs in HLS playlist to point through proxy.""" from urllib.parse import urljoin, urlparse lines = content.split("\n") new_lines = [] # persist mapping across nested playlists if video_url not in _segment_maps: _segment_maps[video_url] = {} segment_map = _segment_maps[video_url] for line in lines: if line and not line.startswith("#"): parsed = urlparse(line) if parsed.scheme: full_url = line else: full_url = urljoin(base_url, line) # stable id must ignore expiring signatures in query strings seg_id = _get_segment_id(full_url) segment_map[seg_id] = full_url proxy_url = f"/hls/{video_id}/seg/{seg_id}" new_lines.append(proxy_url) continue new_lines.append(line) # mapping already updated in-place return "\n".join(new_lines) def get_hls_segment(video_url: str, segment_url: str) -> bytes: """Get HLS segment or sub-playlist content.""" # Pure mapping-based resolution (no yt-dlp dependency here) # New format: segment_url is index seg_id = segment_url segment_map = _segment_maps.get(video_url) if not segment_map: # build mapping on-demand to avoid state coupling _ = get_hls_playlist(video_url) segment_map = _segment_maps.get(video_url) if not segment_map: raise ValueError("No segment map available") if seg_id not in segment_map: # try rebuild once to refresh mappings (e.g., after expiry) _ = get_hls_playlist(video_url) segment_map = _segment_maps.get(video_url) or {} if seg_id not in segment_map: raise ValueError("Segment not found") full_url = segment_map[seg_id] try: data = _fetch_url(video_url, full_url) except Exception as e: raise ValueError("HLS URL expired (410 Gone)") from e # Detect playlist dynamically (covers sub-playlists too) try: from utils import get_video_id video_id = get_video_id(video_url) text = data.decode("utf-8", errors="ignore") head = text.lstrip()[:200] if "#EXTM3U" in head: rewritten = _rewrite_urls(text, video_url, full_url, video_id) return rewritten.encode("utf-8") except Exception: pass return data def get_hls_segment_with_retry(video_url: str, segment_url: str) -> bytes: """Get HLS segment with one rebuild after signed URL expiry.""" for attempt in range(2): try: if video_url not in _segment_maps: _ = get_hls_playlist(video_url) return get_hls_segment(video_url, segment_url) except ValueError as e: if "410 Gone" in str(e): if attempt == 0: logger.info("Segment 410, retrying") continue logger.info("Segment still 410, rebuilding playlist and map") _session_cache.pop(video_url, None) _cache_timestamps.pop(video_url, None) _segment_maps.pop(video_url, None) _ = get_hls_playlist(video_url) return get_hls_segment(video_url, segment_url) raise