ttvdrops/twitch/management/commands/download_campaign_images.py
2026-03-16 18:40:04 +01:00

387 lines
13 KiB
Python

"""Management command to download and cache campaign, benefit, and reward images locally."""
from pathlib import Path
from typing import TYPE_CHECKING
from urllib.parse import urlparse
import httpx
from django.conf import settings
from django.core.files.base import ContentFile
from django.core.management.base import BaseCommand
from PIL import Image
from twitch.models import DropBenefit
from twitch.models import DropCampaign
from twitch.models import RewardCampaign
if TYPE_CHECKING:
from urllib.parse import ParseResult
from django.core.management.base import CommandParser
from django.db.models import QuerySet
from django.db.models.fields.files import FieldFile
class Command(BaseCommand):
"""Download and cache campaign, benefit, and reward images locally."""
help = "Download and cache campaign, benefit, and reward images locally."
def add_arguments(self, parser: CommandParser) -> None:
"""Register command arguments."""
parser.add_argument(
"--model",
type=str,
choices=["campaigns", "benefits", "rewards", "all"],
default="all",
help="Which model to download images for (campaigns, benefits, rewards, or all).",
)
parser.add_argument(
"--limit",
type=int,
default=None,
help="Limit the number of items to process per model.",
)
parser.add_argument(
"--force",
action="store_true",
help="Re-download even if a local image file already exists.",
)
def handle(self, *_args: object, **options: object) -> None:
"""Download images for campaigns, benefits, and/or rewards."""
model_choice: str = str(options.get("model", "all"))
limit_value: object | None = options.get("limit")
limit: int | None = limit_value if isinstance(limit_value, int) else None
force: bool = bool(options.get("force"))
total_stats: dict[str, int] = {
"total": 0,
"downloaded": 0,
"skipped": 0,
"failed": 0,
"placeholders_404": 0,
}
with httpx.Client(timeout=20, follow_redirects=True) as client:
if model_choice in {"campaigns", "all"}:
self.stdout.write(
self.style.MIGRATE_HEADING("\nProcessing Drop Campaigns..."),
)
stats = self._download_campaign_images(
client=client,
limit=limit,
force=force,
)
self._merge_stats(total_stats, stats)
self._print_stats("Drop Campaigns", stats)
if model_choice in {"benefits", "all"}:
self.stdout.write(
self.style.MIGRATE_HEADING("\nProcessing Drop Benefits..."),
)
stats = self._download_benefit_images(
client=client,
limit=limit,
force=force,
)
self._merge_stats(total_stats, stats)
self._print_stats("Drop Benefits", stats)
if model_choice in {"rewards", "all"}:
self.stdout.write(
self.style.MIGRATE_HEADING("\nProcessing Reward Campaigns..."),
)
stats = self._download_reward_campaign_images(
client=client,
limit=limit,
force=force,
)
self._merge_stats(total_stats, stats)
self._print_stats("Reward Campaigns", stats)
if model_choice == "all":
self.stdout.write(self.style.MIGRATE_HEADING("\nTotal Summary:"))
self.stdout.write(
self.style.SUCCESS(
f"Processed {total_stats['total']} items. "
f"Downloaded: {total_stats['downloaded']}, "
f"Skipped: {total_stats['skipped']}, "
f"404 placeholders: {total_stats['placeholders_404']}, "
f"Failed: {total_stats['failed']}.",
),
)
def _download_campaign_images(
self,
client: httpx.Client,
limit: int | None,
*,
force: bool,
) -> dict[str, int]:
"""Download DropCampaign images.
Returns:
Dictionary with download statistics (total, downloaded, skipped, failed, placeholders_404).
"""
queryset: QuerySet[DropCampaign] = DropCampaign.objects.all().order_by(
"twitch_id",
)
if limit:
queryset = queryset[:limit]
stats: dict[str, int] = {
"total": 0,
"downloaded": 0,
"skipped": 0,
"failed": 0,
"placeholders_404": 0,
}
stats["total"] = queryset.count()
for campaign in queryset:
if not campaign.image_url:
stats["skipped"] += 1
continue
if (
campaign.image_file
and getattr(campaign.image_file, "name", "")
and not force
):
stats["skipped"] += 1
continue
result = self._download_image(
client,
campaign.image_url,
campaign.twitch_id,
campaign.image_file,
)
stats[result] += 1
return stats
def _download_benefit_images(
self,
client: httpx.Client,
limit: int | None,
*,
force: bool,
) -> dict[str, int]:
"""Download DropBenefit images.
Returns:
Dictionary with download statistics (total, downloaded, skipped, failed, placeholders_404).
"""
queryset: QuerySet[DropBenefit] = DropBenefit.objects.all().order_by(
"twitch_id",
)
if limit:
queryset = queryset[:limit]
stats: dict[str, int] = {
"total": 0,
"downloaded": 0,
"skipped": 0,
"failed": 0,
"placeholders_404": 0,
}
stats["total"] = queryset.count()
for benefit in queryset:
if not benefit.image_asset_url:
stats["skipped"] += 1
continue
if (
benefit.image_file
and getattr(benefit.image_file, "name", "")
and not force
):
stats["skipped"] += 1
continue
result = self._download_image(
client,
benefit.image_asset_url,
benefit.twitch_id,
benefit.image_file,
)
stats[result] += 1
return stats
def _download_reward_campaign_images(
self,
client: httpx.Client,
limit: int | None,
*,
force: bool,
) -> dict[str, int]:
"""Download RewardCampaign images.
Returns:
Dictionary with download statistics (total, downloaded, skipped, failed, placeholders_404).
"""
queryset: QuerySet[RewardCampaign] = RewardCampaign.objects.all().order_by(
"twitch_id",
)
if limit:
queryset = queryset[:limit]
stats: dict[str, int] = {
"total": 0,
"downloaded": 0,
"skipped": 0,
"failed": 0,
"placeholders_404": 0,
}
stats["total"] = queryset.count()
for reward_campaign in queryset:
if not reward_campaign.image_url:
stats["skipped"] += 1
continue
if (
reward_campaign.image_file
and getattr(reward_campaign.image_file, "name", "")
and not force
):
stats["skipped"] += 1
continue
result = self._download_image(
client,
reward_campaign.image_url,
reward_campaign.twitch_id,
reward_campaign.image_file,
)
stats[result] += 1
return stats
def _download_image(
self,
client: httpx.Client,
image_url: str,
twitch_id: str,
file_field: FieldFile | None,
) -> str:
"""Download a single image and save it to the file field.
Args:
client: httpx.Client instance for making requests.
image_url: URL of the image to download.
twitch_id: Twitch ID to use in filename.
file_field: Django FileField to save the image to.
Returns:
Status string: 'downloaded', 'skipped', 'failed', or 'placeholders_404'.
"""
parsed_url: ParseResult = urlparse(image_url)
suffix: str = Path(parsed_url.path).suffix or ".jpg"
file_name: str = f"{twitch_id}{suffix}"
if file_field is None:
return "failed"
try:
response: httpx.Response = client.get(image_url)
response.raise_for_status()
except httpx.HTTPError as exc:
self.stdout.write(
self.style.WARNING(f"Failed to download image for {twitch_id}: {exc}"),
)
return "failed"
# Check for 404 placeholder images (common pattern on Twitch)
if "/ttv-static/404_" in str(response.url.path):
return "placeholders_404"
# Save the image to the FileField
if hasattr(file_field, "save"):
file_field.save(file_name, ContentFile(response.content), save=True)
# Auto-convert to WebP and AVIF
image_path: str | None = getattr(file_field, "path", None)
if image_path:
self._convert_to_modern_formats(image_path)
return "downloaded"
return "failed"
def _convert_to_modern_formats(self, image_path: str) -> None:
"""Convert downloaded image to WebP and AVIF formats.
Args:
image_path: Absolute path to the downloaded image file
"""
try:
source_path = Path(image_path)
if not source_path.exists() or source_path.suffix.lower() not in {
".jpg",
".jpeg",
".png",
}:
return
base_path = source_path.with_suffix("")
webp_path = base_path.with_suffix(".webp")
avif_path = base_path.with_suffix(".avif")
with Image.open(source_path) as img:
# Convert to RGB if needed
if img.mode in {"RGBA", "LA"} or (
img.mode == "P" and "transparency" in img.info
):
background = Image.new("RGB", img.size, (255, 255, 255))
rgba_img = img.convert("RGBA") if img.mode == "P" else img
background.paste(
rgba_img,
mask=rgba_img.split()[-1]
if rgba_img.mode in {"RGBA", "LA"}
else None,
)
rgb_img = background
elif img.mode != "RGB":
rgb_img = img.convert("RGB")
else:
rgb_img = img
# Save WebP
rgb_img.save(webp_path, "WEBP", quality=85, method=6)
# Save AVIF
rgb_img.save(avif_path, "AVIF", quality=85, speed=4)
except (OSError, ValueError) as e:
# Don't fail the download if conversion fails
self.stdout.write(
self.style.WARNING(f"Failed to convert {image_path}: {e}"),
)
def _merge_stats(self, total: dict[str, int], new: dict[str, int]) -> None:
"""Merge statistics from a single model into the total stats."""
for key in ["total", "downloaded", "skipped", "failed", "placeholders_404"]:
total[key] += new[key]
def _print_stats(self, model_name: str, stats: dict[str, int]) -> None:
"""Print statistics for a specific model."""
self.stdout.write(
self.style.SUCCESS(
f"{model_name}: Processed {stats['total']} items. "
f"Downloaded: {stats['downloaded']}, "
f"Skipped: {stats['skipped']}, "
f"404 placeholders: {stats['placeholders_404']}, "
f"Failed: {stats['failed']}.",
),
)
if stats["downloaded"] > 0:
media_path = Path(settings.MEDIA_ROOT)
if "Campaigns" in model_name and "Reward" not in model_name:
image_dir = media_path / "campaigns" / "images"
elif "Benefits" in model_name:
image_dir = media_path / "benefits" / "images"
else:
image_dir = media_path / "reward_campaigns" / "images"
self.stdout.write(self.style.SUCCESS(f"Saved images to: {image_dir}"))