import logging import os import time from typing import Optional from urllib.parse import unquote import yt_dlp logger = logging.getLogger(__name__) CACHE_TTL = int(os.getenv("CACHE_TTL", 31536000)) SOCKET_TIMEOUT = int(os.getenv("SOCKET_TIMEOUT", 30)) _session_cache = {} _cache_timestamps = {} _ydl_instance = None def _get_ydl(): """Get or create a singleton yt-dlp instance.""" global _ydl_instance if _ydl_instance is None: _ydl_instance = yt_dlp.YoutubeDL({ "quiet": True, "no_warnings": True, "socket_timeout": SOCKET_TIMEOUT, }) return _ydl_instance def _get_cache_key(video_url: str) -> str: return video_url def _is_cache_expired(video_url: str) -> bool: key = _get_cache_key(video_url) if key not in _cache_timestamps: return True return time.time() - _cache_timestamps[key] > CACHE_TTL def _get_cached_info(video_url: str) -> Optional[dict]: key = _get_cache_key(video_url) if key in _session_cache and not _is_cache_expired(video_url): return _session_cache[key] return None def _set_cached_info(video_url: str, info: dict) -> None: key = _get_cache_key(video_url) _session_cache[key] = info _cache_timestamps[key] = time.time() def _extract_hls_url(info: dict) -> Optional[str]: """Extract HLS URL from yt-dlp info dict.""" # First check top-level fields (these are set when there's only one format) url = info.get("manifest_url") or info.get("url") if url and ".m3u8" in url: return url # Check requested_formats (post-processed by yt-dlp) if info.get("requested_formats"): for f in info["requested_formats"]: url = f.get("url") or f.get("manifest_url") if url and ".m3u8" in url: return url # Check formats for m3u8_native protocol if info.get("formats"): for f in reversed(info["formats"]): if f.get("protocol") == "m3u8_native": url = f.get("manifest_url") or f.get("url") if url and ".m3u8" in url: return url # Try to find any m3u8 URL in formats if info.get("formats"): for f in info["formats"]: url = f.get("url", "") if ".m3u8" in url: return url return None def _extract_direct_url(info: dict) -> Optional[str]: """Extract direct video URL when HLS is not available.""" # Check url field first url = info.get("url") if url: return url # Check requested_formats if info.get("requested_formats"): for f in info["requested_formats"]: url = f.get("url") if url: return url # Check formats for best quality https format if info.get("formats"): for f in reversed(info["formats"]): if f.get("protocol") in ("https", "http"): url = f.get("url") if url: return url return None def _get_video_info(video_url: str) -> dict: """Get video info using yt-dlp.""" cached = _get_cached_info(video_url) if cached: return cached import shutil if not shutil.which("node"): deno_path = os.path.expanduser("~/.deno/bin/deno") if not os.path.exists(deno_path): logger.warning("No JavaScript runtime (node/deno) found - YouTube may not work properly") ydl = _get_ydl() info = ydl.extract_info(video_url, download=False) hls_url = _extract_hls_url(info) direct_url = _extract_direct_url(info) result = { "title": info.get("title"), "thumbnail": info.get("thumbnail"), "hls_url": hls_url, "direct_url": direct_url, "raw_info": info, } _set_cached_info(video_url, result) return result def get_stream_info(video_url: str) -> dict: """Get video info with all available metadata.""" info = _get_video_info(video_url) # Extract useful metadata from raw_info raw = info.get("raw_info", {}) metadata = { "title": info["title"], "thumbnail": info["thumbnail"], "hls_url": info.get("hls_url"), "direct_url": info.get("direct_url"), # Additional metadata "description": raw.get("description"), "uploader": raw.get("uploader"), "uploader_url": raw.get("uploader_url"), "duration": raw.get("duration"), "upload_date": raw.get("upload_date"), "view_count": raw.get("view_count"), "like_count": raw.get("like_count"), "dislike_count": raw.get("dislike_count"), "comment_count": raw.get("comment_count"), "age_limit": raw.get("age_limit"), "categories": raw.get("categories"), "tags": raw.get("tags"), "language": raw.get("language"), "license": raw.get("license"), "channel": raw.get("channel"), "channel_url": raw.get("channel_url"), "channel_id": raw.get("channel_id"), "extractor": raw.get("extractor"), "extractor_key": raw.get("extractor_key"), "display_id": raw.get("display_id"), "url": raw.get("url"), "fulltitle": raw.get("fulltitle"), "duration_string": raw.get("duration_string"), "resolution": raw.get("resolution"), "format": raw.get("format"), "format_note": raw.get("format_note"), "filesize": raw.get("filesize"), "filesize_approx": raw.get("filesize_approx"), } return metadata def get_hls_playlist(video_url: str) -> str: """Get HLS playlist content with rewritten URLs.""" import urllib.request import urllib.error # First call _get_video_info to ensure cache is populated (yt-dlp quirk) info = _get_video_info(video_url) hls_url = info.get("hls_url") if not hls_url: raise ValueError("No HLS stream available for this video") # Try to get playlist, retry once if URL expired for attempt in range(2): try: with urllib.request.urlopen(hls_url, timeout=SOCKET_TIMEOUT) as response: playlist_content = response.read().decode("utf-8") return _rewrite_urls(playlist_content, video_url, hls_url) except urllib.error.HTTPError as e: if e.code == 410 and attempt == 0: # Clear cache and fetch fresh HLS URL _session_cache.pop(video_url, None) _cache_timestamps.pop(video_url, None) logger.info("HLS URL expired, fetching fresh HLS URL") info = _get_video_info(video_url) hls_url = info.get("hls_url") if not hls_url: raise ValueError("No HLS stream available for this video") continue raise def get_direct_video_url(video_url: str) -> str: """Get direct video URL when HLS is not available.""" info = _get_video_info(video_url) if not info.get("direct_url"): raise ValueError("No video URL available for this video") return info["direct_url"] def _rewrite_urls(content: str, video_url: str, base_url: str) -> str: """Rewrite relative URLs in HLS playlist to point through proxy.""" from urllib.parse import urljoin, quote, urlparse, parse_qs, urlencode # URL encode the video URL for safe path usage encoded_video_url = quote(video_url, safe="") # Parse base URL to get directory path and query base_parsed = urlparse(base_url) base_path = base_parsed.path base_query = parse_qs(base_parsed.query) # Get directory path (remove the .m3u8 filename) dir_path = base_path.rsplit("/", 1)[0] lines = content.split("\n") new_lines = [] for line in lines: if line and not line.startswith("#"): parsed = urlparse(line) if parsed.scheme: # Absolute URL - extract just the path component # e.g., https://example.com/video/segment.ts -> segment.ts filename = quote(parsed.path.split("/")[-1], safe="") if parsed.query: filename += "?" + quote(parsed.query, safe="") else: # Relative URL - use as-is (with query params if any) filename = quote(line, safe="") # New format: /hls/-- (-- is delimiter) proxy_url = f"/hls/{encoded_video_url}--{filename}" new_lines.append(proxy_url) continue new_lines.append(line) return "\n".join(new_lines) def get_hls_segment(video_url: str, segment_url: str) -> bytes: """Get HLS segment or sub-playlist content.""" import urllib.request import urllib.error from urllib.parse import unquote, urlparse, parse_qs, urlencode # Get the base URL from yt-dlp cache info = _get_video_info(video_url) hls_url = info.get("hls_url") if not hls_url: raise ValueError("No HLS URL available") # Parse the HLS URL to get base path base_parsed = urlparse(hls_url) base_path = base_parsed.path.rsplit("/", 1)[0] base_query = parse_qs(base_parsed.query) # Check if it's a playlist (regardless of query params) is_playlist = unquote(segment_url).split("?")[0].endswith(".m3u8") # Reconstruct full URL from filename filename = unquote(segment_url) if "?" in filename: rel_path, rel_query = filename.split("?", 1) rel_qs = parse_qs(rel_query) full_url = f"{base_parsed.scheme}://{base_parsed.netloc}{base_path}/{rel_path}" merged_qs = {**base_query, **rel_qs} if merged_qs: full_url += "?" + urlencode(merged_qs, doseq=True) else: full_url = f"{base_parsed.scheme}://{base_parsed.netloc}{base_path}/{filename}" try: response = urllib.request.urlopen(full_url, timeout=SOCKET_TIMEOUT) data = response.read() except urllib.error.HTTPError as e: if e.code == 410: raise ValueError("HLS URL expired (410 Gone)") raise if is_playlist: return _rewrite_urls(data.decode("utf-8"), video_url, full_url).encode("utf-8") return data def get_hls_segment_with_retry(video_url: str, segment_url: str) -> bytes: """Get HLS segment with retry on 410 error (refetches sub-playlist if needed).""" from urllib.parse import unquote # Check if this is a segment (not a playlist) is_segment = not unquote(segment_url).split("?")[0].endswith(".m3u8") for attempt in range(2): try: return get_hls_segment(video_url, segment_url) except ValueError as e: if "410 Gone" in str(e) and attempt == 0: if is_segment: # For segments: re-fetch the sub-playlist (which has fresh segment URLs) logger.info("Segment URL expired, re-fetching sub-playlist") # Get fresh HLS URL info = _get_video_info(video_url) hls_url = info.get("hls_url") if not hls_url: raise ValueError("No HLS stream available") # Fetch the sub-playlist from the fresh HLS URL import urllib.request from urllib.parse import urlparse, parse_qs, urlencode # Get base path from HLS URL parsed = urlparse(hls_url) base_path = parsed.path.rsplit("/", 1)[0] base_query = parse_qs(parsed.query) # Find sub-playlist in main playlist with urllib.request.urlopen(hls_url, timeout=SOCKET_TIMEOUT) as response: playlist_content = response.read().decode("utf-8") # Extract sub-playlist filename from first #EXT-X-STREAM-INF sub_playlist_path = None for line in playlist_content.split("\n"): if line.startswith("#EXT-X-STREAM-INF:"): continue elif line and not line.startswith("#"): sub_playlist_path = line break if not sub_playlist_path: raise ValueError("Could not find sub-playlist URL") # Build full sub-playlist URL with fresh tokens if "?" in sub_playlist_path: rel_path, rel_query = sub_playlist_path.split("?", 1) rel_qs = parse_qs(rel_query) full_url = f"{parsed.scheme}://{parsed.netloc}{base_path}/{rel_path}" merged_qs = {**base_query, **rel_qs} full_url += "?" + urlencode(merged_qs, doseq=True) else: full_url = f"{parsed.scheme}://{parsed.netloc}{base_path}/{sub_playlist_path}" logger.info(f"Fetching fresh sub-playlist: {full_url[:100]}...") # Fetch sub-playlist content with urllib.request.urlopen(full_url, timeout=SOCKET_TIMEOUT) as response: sub_content = response.read().decode("utf-8") # Rewrite URLs in sub-playlist rewritten = _rewrite_urls(sub_content, video_url, full_url) logger.info(f"Rewritten sub-playlist (first 200 chars): {rewritten[:200]}...") return rewritten.encode("utf-8") else: # For sub-playlist: clear cache and retry _session_cache.pop(video_url, None) _cache_timestamps.pop(video_url, None) logger.info("Sub-playlist expired, refetching") continue raise