"""Management command to download and cache campaign, benefit, and reward images locally.""" from pathlib import Path from typing import TYPE_CHECKING from urllib.parse import urlparse import httpx from django.conf import settings from django.core.files.base import ContentFile from django.core.management import call_command from django.core.management.base import BaseCommand from PIL import Image from twitch.models import DropBenefit from twitch.models import DropCampaign from twitch.models import RewardCampaign if TYPE_CHECKING: from urllib.parse import ParseResult from django.core.management.base import CommandParser from django.db.models import QuerySet from django.db.models.fields.files import FieldFile from PIL.ImageFile import ImageFile class Command(BaseCommand): """Download and cache campaign, benefit, and reward images locally.""" help = "Download and cache campaign, benefit, and reward images locally." def add_arguments(self, parser: CommandParser) -> None: """Register command arguments.""" parser.add_argument( "--model", type=str, choices=["campaigns", "benefits", "rewards", "all"], default="all", help="Which model to download images for (campaigns, benefits, rewards, or all).", ) parser.add_argument( "--limit", type=int, default=None, help="Limit the number of items to process per model.", ) parser.add_argument( "--force", action="store_true", help="Re-download even if a local image file already exists.", ) def handle( self, *_args: str, **options: str | bool | int | None, ) -> None: """Download images for campaigns, benefits, and/or rewards.""" model_choice: str = str(options.get("model", "all")) limit_value: str | bool | int | None = options.get("limit") limit: int | None = limit_value if isinstance(limit_value, int) else None force: bool = bool(options.get("force")) total_stats: dict[str, int] = { "total": 0, "downloaded": 0, "skipped": 0, "failed": 0, "placeholders_404": 0, } with httpx.Client(timeout=20, follow_redirects=True) as client: if model_choice in {"campaigns", "all"}: self.stdout.write( self.style.MIGRATE_HEADING("\nProcessing Drop Campaigns..."), ) stats: dict[str, int] = self._download_campaign_images( client=client, limit=limit, force=force, ) self._merge_stats(total_stats, stats) self._print_stats("Drop Campaigns", stats) if model_choice in {"benefits", "all"}: self.stdout.write( self.style.MIGRATE_HEADING("\nProcessing Drop Benefits..."), ) stats = self._download_benefit_images( client=client, limit=limit, force=force, ) self._merge_stats(total_stats, stats) self._print_stats("Drop Benefits", stats) if model_choice in {"rewards", "all"}: self.stdout.write( self.style.MIGRATE_HEADING("\nProcessing Reward Campaigns..."), ) stats = self._download_reward_campaign_images( client=client, limit=limit, force=force, ) self._merge_stats(total_stats, stats) self._print_stats("Reward Campaigns", stats) if model_choice == "all": self.stdout.write(self.style.MIGRATE_HEADING("\nTotal Summary:")) self.stdout.write( self.style.SUCCESS( f"Processed {total_stats['total']} items. " f"Downloaded: {total_stats['downloaded']}, " f"Skipped: {total_stats['skipped']}, " f"404 placeholders: {total_stats['placeholders_404']}, " f"Failed: {total_stats['failed']}.", ), ) # Convert downloaded images to modern formats (WebP, AVIF) if total_stats["downloaded"] > 0: self.stdout.write( self.style.MIGRATE_HEADING( "\nConverting downloaded images to modern formats...", ), ) call_command("convert_images_to_modern_formats") def _download_campaign_images( self, client: httpx.Client, limit: int | None, *, force: bool, ) -> dict[str, int]: """Download DropCampaign images. Returns: Dictionary with download statistics (total, downloaded, skipped, failed, placeholders_404). """ queryset: QuerySet[DropCampaign] = DropCampaign.objects.all().order_by( "twitch_id", ) if limit: queryset = queryset[:limit] stats: dict[str, int] = { "total": 0, "downloaded": 0, "skipped": 0, "failed": 0, "placeholders_404": 0, } stats["total"] = queryset.count() for campaign in queryset: if not campaign.image_url: stats["skipped"] += 1 continue if ( campaign.image_file and getattr(campaign.image_file, "name", "") and not force ): stats["skipped"] += 1 continue result: str = self._download_image( client, campaign.image_url, campaign.twitch_id, campaign.image_file, ) stats[result] += 1 return stats def _download_benefit_images( self, client: httpx.Client, limit: int | None, *, force: bool, ) -> dict[str, int]: """Download DropBenefit images. Returns: Dictionary with download statistics (total, downloaded, skipped, failed, placeholders_404). """ queryset: QuerySet[DropBenefit] = DropBenefit.objects.all().order_by( "twitch_id", ) if limit: queryset = queryset[:limit] stats: dict[str, int] = { "total": 0, "downloaded": 0, "skipped": 0, "failed": 0, "placeholders_404": 0, } stats["total"] = queryset.count() for benefit in queryset: if not benefit.image_asset_url: stats["skipped"] += 1 continue if ( benefit.image_file and getattr(benefit.image_file, "name", "") and not force ): stats["skipped"] += 1 continue result: str = self._download_image( client, benefit.image_asset_url, benefit.twitch_id, benefit.image_file, ) stats[result] += 1 return stats def _download_reward_campaign_images( self, client: httpx.Client, limit: int | None, *, force: bool, ) -> dict[str, int]: """Download RewardCampaign images. Returns: Dictionary with download statistics (total, downloaded, skipped, failed, placeholders_404). """ queryset: QuerySet[RewardCampaign] = RewardCampaign.objects.all().order_by( "twitch_id", ) if limit: queryset = queryset[:limit] stats: dict[str, int] = { "total": 0, "downloaded": 0, "skipped": 0, "failed": 0, "placeholders_404": 0, } stats["total"] = queryset.count() for reward_campaign in queryset: if not reward_campaign.image_url: stats["skipped"] += 1 continue if ( reward_campaign.image_file and getattr(reward_campaign.image_file, "name", "") and not force ): stats["skipped"] += 1 continue result: str = self._download_image( client, reward_campaign.image_url, reward_campaign.twitch_id, reward_campaign.image_file, ) stats[result] += 1 return stats def _download_image( self, client: httpx.Client, image_url: str, twitch_id: str, file_field: FieldFile | None, ) -> str: """Download a single image and save it to the file field. Args: client: httpx.Client instance for making requests. image_url: URL of the image to download. twitch_id: Twitch ID to use in filename. file_field: Django FileField to save the image to. Returns: Status string: 'downloaded', 'skipped', 'failed', or 'placeholders_404'. """ parsed_url: ParseResult = urlparse(image_url) suffix: str = Path(parsed_url.path).suffix or ".jpg" file_name: str = f"{twitch_id}{suffix}" if file_field is None: return "failed" try: response: httpx.Response = client.get(image_url) response.raise_for_status() except httpx.HTTPError as exc: self.stdout.write( self.style.WARNING(f"Failed to download image for {twitch_id}: {exc}"), ) return "failed" # Check for 404 placeholder images (common pattern on Twitch) if "/ttv-static/404_" in str(response.url.path): return "placeholders_404" # Save the image to the FileField if hasattr(file_field, "save"): file_field.save(file_name, ContentFile(response.content), save=True) # Auto-convert to WebP and AVIF image_path: str | None = getattr(file_field, "path", None) if image_path: self._convert_to_modern_formats(image_path) return "downloaded" return "failed" def _convert_to_modern_formats(self, image_path: str) -> None: """Convert downloaded image to WebP and AVIF formats. Args: image_path: Absolute path to the downloaded image file """ try: source_path = Path(image_path) if not source_path.exists() or source_path.suffix.lower() not in { ".jpg", ".jpeg", ".png", }: return base_path: Path = source_path.with_suffix("") webp_path: Path = base_path.with_suffix(".webp") avif_path: Path = base_path.with_suffix(".avif") with Image.open(source_path) as img: # Convert to RGB if needed if img.mode in {"RGBA", "LA"} or ( img.mode == "P" and "transparency" in img.info ): background: Image.Image = Image.new( "RGB", img.size, (255, 255, 255), ) rgba_img: Image.Image | ImageFile = ( img.convert("RGBA") if img.mode == "P" else img ) background.paste( rgba_img, mask=rgba_img.split()[-1] if rgba_img.mode in {"RGBA", "LA"} else None, ) rgb_img = background elif img.mode != "RGB": rgb_img: Image.Image = img.convert("RGB") else: rgb_img: Image.Image = img # Save WebP rgb_img.save(webp_path, "WEBP", quality=85, method=6) # Save AVIF rgb_img.save(avif_path, "AVIF", quality=85, speed=4) except (OSError, ValueError) as e: # Don't fail the download if conversion fails self.stdout.write( self.style.WARNING(f"Failed to convert {image_path}: {e}"), ) def _merge_stats(self, total: dict[str, int], new: dict[str, int]) -> None: """Merge statistics from a single model into the total stats.""" for key in ["total", "downloaded", "skipped", "failed", "placeholders_404"]: total[key] += new[key] def _print_stats(self, model_name: str, stats: dict[str, int]) -> None: """Print statistics for a specific model.""" self.stdout.write( self.style.SUCCESS( f"{model_name}: Processed {stats['total']} items. " f"Downloaded: {stats['downloaded']}, " f"Skipped: {stats['skipped']}, " f"404 placeholders: {stats['placeholders_404']}, " f"Failed: {stats['failed']}.", ), ) if stats["downloaded"] > 0: media_path: Path = Path(settings.MEDIA_ROOT) if "Campaigns" in model_name and "Reward" not in model_name: image_dir: Path = media_path / "campaigns" / "images" elif "Benefits" in model_name: image_dir: Path = media_path / "benefits" / "images" else: image_dir: Path = media_path / "reward_campaigns" / "images" self.stdout.write(self.style.SUCCESS(f"Saved images to: {image_dir}"))