Files
yt-dlp-proxy/dlp.py
T

138 lines
3.5 KiB
Python
Raw Normal View History

import logging
import os
import time
import re
from typing import Optional
import yt_dlp
logger = logging.getLogger(__name__)
CACHE_TTL = int(os.getenv("CACHE_TTL", 31536000))
_session_cache = {}
_cache_timestamps = {}
def _is_hls_url(url: str) -> bool:
return url.endswith(".m3u8") or "m3u8" in url
def _get_cache_key(video_url: str) -> str:
return video_url
def _is_cache_expired(video_url: str) -> bool:
key = _get_cache_key(video_url)
if key not in _cache_timestamps:
return True
return time.time() - _cache_timestamps[key] > CACHE_TTL
def _get_cached_session(video_url: str) -> Optional[dict]:
key = _get_cache_key(video_url)
if key in _session_cache and not _is_cache_expired(video_url):
return _session_cache[key]
return None
def _set_cached_session(video_url: str, session_data: dict) -> None:
key = _get_cache_key(video_url)
_session_cache[key] = session_data
_cache_timestamps[key] = time.time()
def clear_expired_cache() -> None:
expired_keys = [
key for key in _session_cache
if _is_cache_expired(key)
]
for key in expired_keys:
del _session_cache[key]
del _cache_timestamps[key]
def get_hls_playlist(video_url: str) -> str:
cached = _get_cached_session(video_url)
if cached and "hls_playlist" in cached:
return cached["hls_playlist"]
if _is_hls_url(video_url):
hls_url = video_url
else:
ydl_opts = {
"quiet": True,
"no_warnings": True,
"socket_timeout": int(os.getenv("SOCKET_TIMEOUT", 30)),
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=False)
if not info or "hls" not in info or not info["hls"]:
raise ValueError("No HLS stream available for this video")
hls_url = info["hls"]
import urllib.request
with urllib.request.urlopen(hls_url, timeout=30) as response:
playlist_content = response.read().decode("utf-8")
session_data = {
"hls_playlist": playlist_content,
"hls_url": hls_url,
"video_url": video_url,
}
_set_cached_session(video_url, session_data)
return playlist_content
def get_hls_segment(video_url: str, segment_name: str) -> bytes:
cached = _get_cached_session(video_url)
if not cached or "hls_url" not in cached:
get_hls_playlist(video_url)
cached = _get_cached_session(video_url)
hls_url = cached["hls_url"]
base_url = hls_url.rsplit("/", 1)[0]
if segment_name.startswith("/"):
segment_name = segment_name[1:]
segment_url = f"{base_url}/{segment_name}"
import urllib.request
with urllib.request.urlopen(segment_url, timeout=30) as response:
return response.read()
def get_stream_info(video_url: str) -> dict:
cached = _get_cached_session(video_url)
if cached:
return cached
if _is_hls_url(video_url):
return {
"title": "Test Video",
"hls_url": video_url,
"thumbnail": None,
}
ydl_opts = {
"quiet": True,
"no_warnings": True,
"socket_timeout": int(os.getenv("SOCKET_TIMEOUT", 30)),
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=False)
if not info:
raise ValueError("Could not extract video info")
return {
"title": info.get("title", "Unknown"),
"hls_url": info.get("hls"),
"thumbnail": info.get("thumbnail"),
}