2026-04-01 11:10:05 +00:00
|
|
|
import logging
|
|
|
|
|
import os
|
|
|
|
|
import re
|
|
|
|
|
from urllib.parse import urlparse
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
ALLOWED_DOMAINS = os.getenv("ALLOWED_DOMAINS", "youtube.com,youtu.be,pornhub.com,xvideos.com,localhost,127.0.0.1").split(",")
|
|
|
|
|
|
|
|
|
|
VALIDATION_ENABLED = os.getenv("VALIDATION_ENABLED", "true").lower() == "true"
|
|
|
|
|
ALLOW_LOCAL = os.getenv("ALLOW_LOCAL", "true").lower() == "true"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_valid_url(url: str) -> bool:
|
|
|
|
|
if not VALIDATION_ENABLED:
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
if not url:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
parsed = urlparse(url)
|
|
|
|
|
if not parsed.scheme or not parsed.netloc:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
domain = parsed.netloc.lower()
|
|
|
|
|
if domain.startswith("www."):
|
|
|
|
|
domain = domain[4:]
|
|
|
|
|
|
|
|
|
|
if ALLOW_LOCAL and (domain in ("localhost", "127.0.0.1") or domain.startswith("localhost:") or domain.startswith("127.0.0.1:")):
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
for allowed in ALLOWED_DOMAINS:
|
|
|
|
|
allowed = allowed.strip().lower()
|
|
|
|
|
if domain == allowed or domain.endswith(f".{allowed}"):
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
return False
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"URL validation error: {e}")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_video_id(url: str) -> str:
|
|
|
|
|
patterns = {
|
|
|
|
|
r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})': 'youtube',
|
|
|
|
|
r'pornhub\.com/view_video\.php\?viewkey=([a-zA-Z0-9]+)': 'pornhub',
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for pattern, platform in patterns.items():
|
|
|
|
|
match = re.search(pattern, url)
|
|
|
|
|
if match:
|
|
|
|
|
return match.group(1)
|
|
|
|
|
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def sanitize_path(path: str) -> str:
|
|
|
|
|
return path.replace("..", "").replace("//", "/").strip("/")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_error_message(status_code: int) -> str:
|
|
|
|
|
errors = {
|
|
|
|
|
400: "Bad Request - Invalid URL or parameters",
|
|
|
|
|
403: "Forbidden - Access denied",
|
|
|
|
|
404: "Not Found - Resource not found",
|
|
|
|
|
500: "Internal Server Error",
|
|
|
|
|
502: "Bad Gateway - Upstream error",
|
|
|
|
|
503: "Service Unavailable",
|
|
|
|
|
}
|
|
|
|
|
return errors.get(status_code, "Unknown error")
|
2026-04-01 20:41:52 +00:00
|
|
|
import hashlib
|
|
|
|
|
|
|
|
|
|
# simple in-memory mapping: video_id -> original URL
|
|
|
|
|
_video_map = {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_video_id(url: str) -> str:
|
|
|
|
|
vid = hashlib.md5(url.encode()).hexdigest()
|
|
|
|
|
_video_map[vid] = url
|
|
|
|
|
return vid
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def resolve_video_id(vid: str) -> str | None:
|
|
|
|
|
return _video_map.get(vid)
|