ttvdrops/twitch/utils/images.py

97 lines
3.1 KiB
Python

from __future__ import annotations
import hashlib
import logging
import mimetypes
import re
from pathlib import Path
from urllib.parse import urlparse
from urllib.request import Request, urlopen
from django.conf import settings
logger: logging.Logger = logging.getLogger(__name__)
def _sanitize_filename(name: str) -> str:
"""Return a filesystem-safe filename."""
name = re.sub(r"[^A-Za-z0-9._-]", "_", name)
return name[:150] or "file"
def _guess_extension(url: str, content_type: str | None) -> str:
"""Guess a file extension from URL or content-type.
Args:
url: Source URL.
content_type: Optional content type from HTTP response.
Returns:
File extension including dot, like ".png".
"""
parsed = urlparse(url)
ext = Path(parsed.path).suffix.lower()
if ext in {".jpg", ".jpeg", ".png", ".gif", ".webp"}:
return ext
if content_type:
guessed = mimetypes.guess_extension(content_type.split(";")[0].strip())
if guessed:
return guessed
return ".bin"
def cache_remote_image(url: str, subdir: str, *, timeout: float = 10.0) -> str | None:
"""Download a remote image and save it under MEDIA_ROOT, returning storage path.
The file name is the SHA256 of the content to de-duplicate downloads.
Args:
url: Remote image URL.
subdir: Sub-directory under MEDIA_ROOT to store the file.
timeout: Network timeout in seconds.
Returns:
Relative storage path (under MEDIA_ROOT) suitable for assigning to FileField.name,
or None if the operation failed.
"""
url = (url or "").strip()
if not url or not url.startswith(("http://", "https://")):
return None
try:
# Enforce allowed schemes at runtime too
parsed = urlparse(url)
if parsed.scheme not in {"http", "https"}:
return None
req = Request(url, headers={"User-Agent": "TTVDrops/1.0"}) # noqa: S310
# nosec: B310 - urlopen allowed because scheme is validated (http/https only)
with urlopen(req, timeout=timeout) as resp: # noqa: S310
content: bytes = resp.read()
content_type = resp.headers.get("Content-Type")
except OSError as exc:
logger.debug("Failed to download image %s: %s", url, exc)
return None
if not content:
return None
sha = hashlib.sha256(content).hexdigest()
ext = _guess_extension(url, content_type)
# Shard into two-level directories by hash for scalability
shard1, shard2 = sha[:2], sha[2:4]
media_subdir = Path(subdir) / shard1 / shard2
target_dir: Path = Path(settings.MEDIA_ROOT) / media_subdir
target_dir.mkdir(parents=True, exist_ok=True)
filename = f"{sha}{ext}"
storage_rel_path = str(media_subdir / _sanitize_filename(filename)).replace("\\", "/")
storage_abs_path = Path(settings.MEDIA_ROOT) / storage_rel_path
if not storage_abs_path.exists():
try:
storage_abs_path.write_bytes(content)
except OSError as exc:
logger.debug("Failed to write image %s: %s", storage_abs_path, exc)
return None
return storage_rel_path