ttvdrops/twitch/management/commands/download_campaign_images.py
2026-03-16 19:35:51 +01:00

400 lines
14 KiB
Python

"""Management command to download and cache campaign, benefit, and reward images locally."""
from pathlib import Path
from typing import TYPE_CHECKING
from urllib.parse import urlparse
import httpx
from django.conf import settings
from django.core.files.base import ContentFile
from django.core.management import call_command
from django.core.management.base import BaseCommand
from PIL import Image
from twitch.models import DropBenefit
from twitch.models import DropCampaign
from twitch.models import RewardCampaign
if TYPE_CHECKING:
from urllib.parse import ParseResult
from django.core.management.base import CommandParser
from django.db.models import QuerySet
from django.db.models.fields.files import FieldFile
from PIL.ImageFile import ImageFile
class Command(BaseCommand):
"""Download and cache campaign, benefit, and reward images locally."""
help = "Download and cache campaign, benefit, and reward images locally."
def add_arguments(self, parser: CommandParser) -> None:
"""Register command arguments."""
parser.add_argument(
"--model",
type=str,
choices=["campaigns", "benefits", "rewards", "all"],
default="all",
help="Which model to download images for (campaigns, benefits, rewards, or all).",
)
parser.add_argument(
"--limit",
type=int,
default=None,
help="Limit the number of items to process per model.",
)
parser.add_argument(
"--force",
action="store_true",
help="Re-download even if a local image file already exists.",
)
def handle(self, *_args: object, **options: object) -> None:
"""Download images for campaigns, benefits, and/or rewards."""
model_choice: str = str(options.get("model", "all"))
limit_value: object | None = options.get("limit")
limit: int | None = limit_value if isinstance(limit_value, int) else None
force: bool = bool(options.get("force"))
total_stats: dict[str, int] = {
"total": 0,
"downloaded": 0,
"skipped": 0,
"failed": 0,
"placeholders_404": 0,
}
with httpx.Client(timeout=20, follow_redirects=True) as client:
if model_choice in {"campaigns", "all"}:
self.stdout.write(
self.style.MIGRATE_HEADING("\nProcessing Drop Campaigns..."),
)
stats: dict[str, int] = self._download_campaign_images(
client=client,
limit=limit,
force=force,
)
self._merge_stats(total_stats, stats)
self._print_stats("Drop Campaigns", stats)
if model_choice in {"benefits", "all"}:
self.stdout.write(
self.style.MIGRATE_HEADING("\nProcessing Drop Benefits..."),
)
stats = self._download_benefit_images(
client=client,
limit=limit,
force=force,
)
self._merge_stats(total_stats, stats)
self._print_stats("Drop Benefits", stats)
if model_choice in {"rewards", "all"}:
self.stdout.write(
self.style.MIGRATE_HEADING("\nProcessing Reward Campaigns..."),
)
stats = self._download_reward_campaign_images(
client=client,
limit=limit,
force=force,
)
self._merge_stats(total_stats, stats)
self._print_stats("Reward Campaigns", stats)
if model_choice == "all":
self.stdout.write(self.style.MIGRATE_HEADING("\nTotal Summary:"))
self.stdout.write(
self.style.SUCCESS(
f"Processed {total_stats['total']} items. "
f"Downloaded: {total_stats['downloaded']}, "
f"Skipped: {total_stats['skipped']}, "
f"404 placeholders: {total_stats['placeholders_404']}, "
f"Failed: {total_stats['failed']}.",
),
)
# Convert downloaded images to modern formats (WebP, AVIF)
if total_stats["downloaded"] > 0:
self.stdout.write(
self.style.MIGRATE_HEADING(
"\nConverting downloaded images to modern formats...",
),
)
call_command("convert_images_to_modern_formats")
def _download_campaign_images(
self,
client: httpx.Client,
limit: int | None,
*,
force: bool,
) -> dict[str, int]:
"""Download DropCampaign images.
Returns:
Dictionary with download statistics (total, downloaded, skipped, failed, placeholders_404).
"""
queryset: QuerySet[DropCampaign] = DropCampaign.objects.all().order_by(
"twitch_id",
)
if limit:
queryset = queryset[:limit]
stats: dict[str, int] = {
"total": 0,
"downloaded": 0,
"skipped": 0,
"failed": 0,
"placeholders_404": 0,
}
stats["total"] = queryset.count()
for campaign in queryset:
if not campaign.image_url:
stats["skipped"] += 1
continue
if (
campaign.image_file
and getattr(campaign.image_file, "name", "")
and not force
):
stats["skipped"] += 1
continue
result: str = self._download_image(
client,
campaign.image_url,
campaign.twitch_id,
campaign.image_file,
)
stats[result] += 1
return stats
def _download_benefit_images(
self,
client: httpx.Client,
limit: int | None,
*,
force: bool,
) -> dict[str, int]:
"""Download DropBenefit images.
Returns:
Dictionary with download statistics (total, downloaded, skipped, failed, placeholders_404).
"""
queryset: QuerySet[DropBenefit] = DropBenefit.objects.all().order_by(
"twitch_id",
)
if limit:
queryset = queryset[:limit]
stats: dict[str, int] = {
"total": 0,
"downloaded": 0,
"skipped": 0,
"failed": 0,
"placeholders_404": 0,
}
stats["total"] = queryset.count()
for benefit in queryset:
if not benefit.image_asset_url:
stats["skipped"] += 1
continue
if (
benefit.image_file
and getattr(benefit.image_file, "name", "")
and not force
):
stats["skipped"] += 1
continue
result: str = self._download_image(
client,
benefit.image_asset_url,
benefit.twitch_id,
benefit.image_file,
)
stats[result] += 1
return stats
def _download_reward_campaign_images(
self,
client: httpx.Client,
limit: int | None,
*,
force: bool,
) -> dict[str, int]:
"""Download RewardCampaign images.
Returns:
Dictionary with download statistics (total, downloaded, skipped, failed, placeholders_404).
"""
queryset: QuerySet[RewardCampaign] = RewardCampaign.objects.all().order_by(
"twitch_id",
)
if limit:
queryset = queryset[:limit]
stats: dict[str, int] = {
"total": 0,
"downloaded": 0,
"skipped": 0,
"failed": 0,
"placeholders_404": 0,
}
stats["total"] = queryset.count()
for reward_campaign in queryset:
if not reward_campaign.image_url:
stats["skipped"] += 1
continue
if (
reward_campaign.image_file
and getattr(reward_campaign.image_file, "name", "")
and not force
):
stats["skipped"] += 1
continue
result: str = self._download_image(
client,
reward_campaign.image_url,
reward_campaign.twitch_id,
reward_campaign.image_file,
)
stats[result] += 1
return stats
def _download_image(
self,
client: httpx.Client,
image_url: str,
twitch_id: str,
file_field: FieldFile | None,
) -> str:
"""Download a single image and save it to the file field.
Args:
client: httpx.Client instance for making requests.
image_url: URL of the image to download.
twitch_id: Twitch ID to use in filename.
file_field: Django FileField to save the image to.
Returns:
Status string: 'downloaded', 'skipped', 'failed', or 'placeholders_404'.
"""
parsed_url: ParseResult = urlparse(image_url)
suffix: str = Path(parsed_url.path).suffix or ".jpg"
file_name: str = f"{twitch_id}{suffix}"
if file_field is None:
return "failed"
try:
response: httpx.Response = client.get(image_url)
response.raise_for_status()
except httpx.HTTPError as exc:
self.stdout.write(
self.style.WARNING(f"Failed to download image for {twitch_id}: {exc}"),
)
return "failed"
# Check for 404 placeholder images (common pattern on Twitch)
if "/ttv-static/404_" in str(response.url.path):
return "placeholders_404"
# Save the image to the FileField
if hasattr(file_field, "save"):
file_field.save(file_name, ContentFile(response.content), save=True)
# Auto-convert to WebP and AVIF
image_path: str | None = getattr(file_field, "path", None)
if image_path:
self._convert_to_modern_formats(image_path)
return "downloaded"
return "failed"
def _convert_to_modern_formats(self, image_path: str) -> None:
"""Convert downloaded image to WebP and AVIF formats.
Args:
image_path: Absolute path to the downloaded image file
"""
try:
source_path = Path(image_path)
if not source_path.exists() or source_path.suffix.lower() not in {
".jpg",
".jpeg",
".png",
}:
return
base_path: Path = source_path.with_suffix("")
webp_path: Path = base_path.with_suffix(".webp")
avif_path: Path = base_path.with_suffix(".avif")
with Image.open(source_path) as img:
# Convert to RGB if needed
if img.mode in {"RGBA", "LA"} or (
img.mode == "P" and "transparency" in img.info
):
background: Image = Image.new("RGB", img.size, (255, 255, 255))
rgba_img: Image | ImageFile = (
img.convert("RGBA") if img.mode == "P" else img
)
background.paste(
rgba_img,
mask=rgba_img.split()[-1]
if rgba_img.mode in {"RGBA", "LA"}
else None,
)
rgb_img = background
elif img.mode != "RGB":
rgb_img: Image = img.convert("RGB")
else:
rgb_img: ImageFile = img
# Save WebP
rgb_img.save(webp_path, "WEBP", quality=85, method=6)
# Save AVIF
rgb_img.save(avif_path, "AVIF", quality=85, speed=4)
except (OSError, ValueError) as e:
# Don't fail the download if conversion fails
self.stdout.write(
self.style.WARNING(f"Failed to convert {image_path}: {e}"),
)
def _merge_stats(self, total: dict[str, int], new: dict[str, int]) -> None:
"""Merge statistics from a single model into the total stats."""
for key in ["total", "downloaded", "skipped", "failed", "placeholders_404"]:
total[key] += new[key]
def _print_stats(self, model_name: str, stats: dict[str, int]) -> None:
"""Print statistics for a specific model."""
self.stdout.write(
self.style.SUCCESS(
f"{model_name}: Processed {stats['total']} items. "
f"Downloaded: {stats['downloaded']}, "
f"Skipped: {stats['skipped']}, "
f"404 placeholders: {stats['placeholders_404']}, "
f"Failed: {stats['failed']}.",
),
)
if stats["downloaded"] > 0:
media_path: Path = Path(settings.MEDIA_ROOT)
if "Campaigns" in model_name and "Reward" not in model_name:
image_dir: Path = media_path / "campaigns" / "images"
elif "Benefits" in model_name:
image_dir: Path = media_path / "benefits" / "images"
else:
image_dir: Path = media_path / "reward_campaigns" / "images"
self.stdout.write(self.style.SUCCESS(f"Saved images to: {image_dir}"))