import logging import os import time from typing import Optional import yt_dlp logger = logging.getLogger(__name__) CACHE_TTL = int(os.getenv("CACHE_TTL", 31536000)) SOCKET_TIMEOUT = int(os.getenv("SOCKET_TIMEOUT", 30)) _session_cache = {} _cache_timestamps = {} def _get_cache_key(video_url: str) -> str: return video_url def _is_cache_expired(video_url: str) -> bool: key = _get_cache_key(video_url) if key not in _cache_timestamps: return True return time.time() - _cache_timestamps[key] > CACHE_TTL def _get_cached_info(video_url: str) -> Optional[dict]: key = _get_cache_key(video_url) if key in _session_cache and not _is_cache_expired(video_url): return _session_cache[key] return None def _set_cached_info(video_url: str, info: dict) -> None: key = _get_cache_key(video_url) _session_cache[key] = info _cache_timestamps[key] = time.time() def _extract_hls_url(info: dict) -> Optional[str]: """Extract HLS URL from yt-dlp info dict.""" if info.get("formats"): for f in reversed(info["formats"]): if f.get("protocol") == "m3u8_native": url = f.get("manifest_url") or f.get("url") if url and ".m3u8" in url: return url return None def _get_video_info(video_url: str) -> dict: """Get video info using yt-dlp.""" cached = _get_cached_info(video_url) if cached: return cached ydl_opts = { "quiet": True, "no_warnings": True, "socket_timeout": SOCKET_TIMEOUT, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(video_url, download=False) hls_url = _extract_hls_url(info) result = { "title": info.get("title"), "thumbnail": info.get("thumbnail"), "hls_url": hls_url, "raw_info": info, } _set_cached_info(video_url, result) return result def get_stream_info(video_url: str) -> dict: """Get video info (title, hls_url, thumbnail).""" info = _get_video_info(video_url) return { "title": info["title"], "hls_url": info["hls_url"], "thumbnail": info["thumbnail"], } def get_hls_playlist(video_url: str) -> str: """Get HLS playlist content with rewritten URLs.""" info = _get_video_info(video_url) if not info["hls_url"]: raise ValueError("No HLS stream available for this video") import urllib.request with urllib.request.urlopen(info["hls_url"], timeout=SOCKET_TIMEOUT) as response: playlist_content = response.read().decode("utf-8") return _rewrite_urls(playlist_content, video_url, info["hls_url"]) def _rewrite_urls(content: str, video_url: str, base_url: str) -> str: """Rewrite relative URLs in HLS playlist to point through proxy.""" from urllib.parse import urljoin, quote lines = content.split("\n") new_lines = [] for line in lines: if line and not line.startswith("#") and line.startswith("http"): abs_url = line elif line and not line.startswith("#"): abs_url = urljoin(base_url, line) proxy_url = f"/hls?url={quote(video_url, safe='')}&path={quote(abs_url, safe='')}" new_lines.append(proxy_url) continue new_lines.append(line) return "\n".join(new_lines) def get_hls_segment(video_url: str, segment_url: str) -> bytes: """Get HLS segment or sub-playlist content.""" from urllib.parse import unquote decoded_url = unquote(segment_url) import urllib.request try: response = urllib.request.urlopen(decoded_url, timeout=SOCKET_TIMEOUT) data = response.read() except urllib.error.HTTPError as e: if e.code == 410: raise ValueError("HLS URL expired (410 Gone)") raise if decoded_url.endswith(".m3u8"): return _rewrite_urls(data.decode("utf-8"), video_url, decoded_url).encode("utf-8") return data