Lower line-length to default and don't add from __future__ import annotations to everything

This commit is contained in:
Joakim Hellsén 2026-03-09 04:37:54 +01:00
commit 1118c03c1b
Signed by: Joakim Hellsén
SSH key fingerprint: SHA256:/9h/CsExpFp+PRhsfA0xznFx2CGfTT5R/kpuFfUgEQk
46 changed files with 2338 additions and 1085 deletions

View file

@ -1,17 +1,13 @@
"""Management command to download and cache campaign, benefit, and reward images locally."""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
from urllib.parse import ParseResult
from urllib.parse import urlparse
import httpx
from django.conf import settings
from django.core.files.base import ContentFile
from django.core.management.base import BaseCommand
from django.core.management.base import CommandParser
from PIL import Image
from twitch.models import DropBenefit
@ -19,6 +15,9 @@ from twitch.models import DropCampaign
from twitch.models import RewardCampaign
if TYPE_CHECKING:
from urllib.parse import ParseResult
from django.core.management.base import CommandParser
from django.db.models import QuerySet
from django.db.models.fields.files import FieldFile
@ -66,20 +65,38 @@ class Command(BaseCommand):
with httpx.Client(timeout=20, follow_redirects=True) as client:
if model_choice in {"campaigns", "all"}:
self.stdout.write(self.style.MIGRATE_HEADING("\nProcessing Drop Campaigns..."))
stats = self._download_campaign_images(client=client, limit=limit, force=force)
self.stdout.write(
self.style.MIGRATE_HEADING("\nProcessing Drop Campaigns..."),
)
stats = self._download_campaign_images(
client=client,
limit=limit,
force=force,
)
self._merge_stats(total_stats, stats)
self._print_stats("Drop Campaigns", stats)
if model_choice in {"benefits", "all"}:
self.stdout.write(self.style.MIGRATE_HEADING("\nProcessing Drop Benefits..."))
stats = self._download_benefit_images(client=client, limit=limit, force=force)
self.stdout.write(
self.style.MIGRATE_HEADING("\nProcessing Drop Benefits..."),
)
stats = self._download_benefit_images(
client=client,
limit=limit,
force=force,
)
self._merge_stats(total_stats, stats)
self._print_stats("Drop Benefits", stats)
if model_choice in {"rewards", "all"}:
self.stdout.write(self.style.MIGRATE_HEADING("\nProcessing Reward Campaigns..."))
stats = self._download_reward_campaign_images(client=client, limit=limit, force=force)
self.stdout.write(
self.style.MIGRATE_HEADING("\nProcessing Reward Campaigns..."),
)
stats = self._download_reward_campaign_images(
client=client,
limit=limit,
force=force,
)
self._merge_stats(total_stats, stats)
self._print_stats("Reward Campaigns", stats)
@ -107,18 +124,30 @@ class Command(BaseCommand):
Returns:
Dictionary with download statistics (total, downloaded, skipped, failed, placeholders_404).
"""
queryset: QuerySet[DropCampaign] = DropCampaign.objects.all().order_by("twitch_id")
queryset: QuerySet[DropCampaign] = DropCampaign.objects.all().order_by(
"twitch_id",
)
if limit:
queryset = queryset[:limit]
stats: dict[str, int] = {"total": 0, "downloaded": 0, "skipped": 0, "failed": 0, "placeholders_404": 0}
stats: dict[str, int] = {
"total": 0,
"downloaded": 0,
"skipped": 0,
"failed": 0,
"placeholders_404": 0,
}
stats["total"] = queryset.count()
for campaign in queryset:
if not campaign.image_url:
stats["skipped"] += 1
continue
if campaign.image_file and getattr(campaign.image_file, "name", "") and not force:
if (
campaign.image_file
and getattr(campaign.image_file, "name", "")
and not force
):
stats["skipped"] += 1
continue
@ -144,18 +173,30 @@ class Command(BaseCommand):
Returns:
Dictionary with download statistics (total, downloaded, skipped, failed, placeholders_404).
"""
queryset: QuerySet[DropBenefit] = DropBenefit.objects.all().order_by("twitch_id")
queryset: QuerySet[DropBenefit] = DropBenefit.objects.all().order_by(
"twitch_id",
)
if limit:
queryset = queryset[:limit]
stats: dict[str, int] = {"total": 0, "downloaded": 0, "skipped": 0, "failed": 0, "placeholders_404": 0}
stats: dict[str, int] = {
"total": 0,
"downloaded": 0,
"skipped": 0,
"failed": 0,
"placeholders_404": 0,
}
stats["total"] = queryset.count()
for benefit in queryset:
if not benefit.image_asset_url:
stats["skipped"] += 1
continue
if benefit.image_file and getattr(benefit.image_file, "name", "") and not force:
if (
benefit.image_file
and getattr(benefit.image_file, "name", "")
and not force
):
stats["skipped"] += 1
continue
@ -181,18 +222,30 @@ class Command(BaseCommand):
Returns:
Dictionary with download statistics (total, downloaded, skipped, failed, placeholders_404).
"""
queryset: QuerySet[RewardCampaign] = RewardCampaign.objects.all().order_by("twitch_id")
queryset: QuerySet[RewardCampaign] = RewardCampaign.objects.all().order_by(
"twitch_id",
)
if limit:
queryset = queryset[:limit]
stats: dict[str, int] = {"total": 0, "downloaded": 0, "skipped": 0, "failed": 0, "placeholders_404": 0}
stats: dict[str, int] = {
"total": 0,
"downloaded": 0,
"skipped": 0,
"failed": 0,
"placeholders_404": 0,
}
stats["total"] = queryset.count()
for reward_campaign in queryset:
if not reward_campaign.image_url:
stats["skipped"] += 1
continue
if reward_campaign.image_file and getattr(reward_campaign.image_file, "name", "") and not force:
if (
reward_campaign.image_file
and getattr(reward_campaign.image_file, "name", "")
and not force
):
stats["skipped"] += 1
continue
@ -233,9 +286,7 @@ class Command(BaseCommand):
response.raise_for_status()
except httpx.HTTPError as exc:
self.stdout.write(
self.style.WARNING(
f"Failed to download image for {twitch_id}: {exc}",
),
self.style.WARNING(f"Failed to download image for {twitch_id}: {exc}"),
)
return "failed"
@ -262,7 +313,11 @@ class Command(BaseCommand):
"""
try:
source_path = Path(image_path)
if not source_path.exists() or source_path.suffix.lower() not in {".jpg", ".jpeg", ".png"}:
if not source_path.exists() or source_path.suffix.lower() not in {
".jpg",
".jpeg",
".png",
}:
return
base_path = source_path.with_suffix("")
@ -271,10 +326,17 @@ class Command(BaseCommand):
with Image.open(source_path) as img:
# Convert to RGB if needed
if img.mode in {"RGBA", "LA"} or (img.mode == "P" and "transparency" in img.info):
if img.mode in {"RGBA", "LA"} or (
img.mode == "P" and "transparency" in img.info
):
background = Image.new("RGB", img.size, (255, 255, 255))
rgba_img = img.convert("RGBA") if img.mode == "P" else img
background.paste(rgba_img, mask=rgba_img.split()[-1] if rgba_img.mode in {"RGBA", "LA"} else None)
background.paste(
rgba_img,
mask=rgba_img.split()[-1]
if rgba_img.mode in {"RGBA", "LA"}
else None,
)
rgb_img = background
elif img.mode != "RGB":
rgb_img = img.convert("RGB")
@ -289,7 +351,9 @@ class Command(BaseCommand):
except (OSError, ValueError) as e:
# Don't fail the download if conversion fails
self.stdout.write(self.style.WARNING(f"Failed to convert {image_path}: {e}"))
self.stdout.write(
self.style.WARNING(f"Failed to convert {image_path}: {e}"),
)
def _merge_stats(self, total: dict[str, int], new: dict[str, int]) -> None:
"""Merge statistics from a single model into the total stats."""