Fix Kick importer
All checks were successful
Deploy to Server / deploy (push) Successful in 25s

This commit is contained in:
Joakim Hellsén 2026-05-09 22:46:17 +02:00
commit 2993dc75b6
Signed by: Joakim Hellsén
SSH key fingerprint: SHA256:/9h/CsExpFp+PRhsfA0xznFx2CGfTT5R/kpuFfUgEQk
3 changed files with 156 additions and 30 deletions

View file

@ -6,6 +6,7 @@ from typing import TYPE_CHECKING
import httpx
from django.core.management.base import BaseCommand
from django.db import transaction
from pydantic import ValidationError
from kick.models import KickCategory
@ -72,6 +73,11 @@ class Command(BaseCommand):
default=KICK_DROPS_API_URL,
help="API endpoint to fetch (default: %(default)s).",
)
parser.add_argument(
"--reimport",
action="store_true",
help="Clear existing Kick import data before importing the fetched response.",
)
@staticmethod
def _save_if_changed(
@ -88,6 +94,17 @@ class Command(BaseCommand):
if changed_fields:
obj.save(update_fields=changed_fields)
@staticmethod
def _clear_existing_import_data() -> None:
"""Delete existing Kick import data before rebuilding from the API response."""
with transaction.atomic():
KickDropCampaign.objects.all().delete()
KickReward.objects.all().delete()
KickChannel.objects.all().delete()
KickUser.objects.all().delete()
KickOrganization.objects.all().delete()
KickCategory.objects.all().delete()
def handle(
self,
*_args: str,
@ -126,6 +143,12 @@ class Command(BaseCommand):
return
campaigns: list[KickDropCampaignSchema] = drops_response.data
if options.get("reimport"):
self.stdout.write(
"Reimport requested. Clearing existing Kick import data ...",
)
self._clear_existing_import_data()
self.stdout.write(f"Found {len(campaigns)} campaign(s). Importing ...")
imported = 0
@ -165,25 +188,27 @@ class Command(BaseCommand):
logger.info("Created new organization: %s", org.kick_id)
# Category
cat_data: KickCategorySchema = data.category
category_defaults: dict[str, KickFieldValue] = {
"name": cat_data.name,
"slug": cat_data.slug,
"image_url": cat_data.image_url,
}
category: KickCategory | None = KickCategory.objects.filter(
kick_id=cat_data.id,
).first()
created = category is None
if category is None:
category = KickCategory.objects.create(
cat_data: KickCategorySchema | None = data.category
category: KickCategory | None = None
if cat_data is not None:
category_defaults: dict[str, KickFieldValue] = {
"name": cat_data.name,
"slug": cat_data.slug,
"image_url": cat_data.image_url,
}
category = KickCategory.objects.filter(
kick_id=cat_data.id,
**category_defaults,
)
else:
self._save_if_changed(category, category_defaults)
if created:
logger.info("Created new category: %s", category.kick_id)
).first()
created = category is None
if category is None:
category = KickCategory.objects.create(
kick_id=cat_data.id,
**category_defaults,
)
else:
self._save_if_changed(category, category_defaults)
if created:
logger.info("Created new category: %s", category.kick_id)
# Campaign
campaign_defaults: dict[str, KickFieldValue] = {
@ -260,17 +285,18 @@ class Command(BaseCommand):
for reward_data in data.rewards:
# Resolve reward's category (may differ from campaign category)
reward_category: KickCategory = category
if reward_data.category_id != cat_data.id:
reward_category = KickCategory.objects.filter(
reward_category: KickCategory | None = category
if reward_data.category_id > 0 and (
cat_data is None or reward_data.category_id != cat_data.id
):
reward_category, created = KickCategory.objects.get_or_create(
kick_id=reward_data.category_id,
).first() or KickCategory.objects.create(
kick_id=reward_data.category_id,
name="",
slug="",
image_url="",
defaults={
"name": "",
"slug": "",
"image_url": "",
},
)
created = not reward_category.name and not reward_category.slug
if created:
logger.info("Created new category: %s", reward_category.kick_id)