Implement Chzzk campaign management features, including models, views, and templates
Some checks failed
Deploy to Server / deploy (push) Failing after 9s

This commit is contained in:
Joakim Hellsén 2026-04-01 04:04:58 +02:00
commit 9ce324fd2d
Signed by: Joakim Hellsén
SSH key fingerprint: SHA256:/9h/CsExpFp+PRhsfA0xznFx2CGfTT5R/kpuFfUgEQk
12 changed files with 594 additions and 164 deletions

View file

@ -4,7 +4,6 @@ from typing import Any
if TYPE_CHECKING:
import argparse
from chzzk.schemas import ChzzkCampaignV1
from chzzk.schemas import ChzzkCampaignV2
@ -16,7 +15,6 @@ from django.utils import timezone
from chzzk.models import ChzzkCampaign
from chzzk.models import ChzzkReward
from chzzk.schemas import ChzzkApiResponseV1
from chzzk.schemas import ChzzkApiResponseV2
if TYPE_CHECKING:
@ -44,94 +42,87 @@ class Command(BaseCommand):
def handle(self, **options) -> None:
"""Main handler for the management command. Fetches campaign data from both API versions, validates, and stores them."""
campaign_no: int = int(options["campaign_no"])
for api_version, url_template in CHZZK_API_URLS:
url: str = url_template.format(campaign_no=campaign_no)
resp: requests.Response = requests.get(
url,
timeout=2,
headers={
"Accept": "application/json",
"User-Agent": USER_AGENT,
},
api_version: str = "v2" # TODO(TheLovinator): Add support for v1 API # noqa: TD003
url: str = f"https://api.chzzk.naver.com/service/{api_version}/drops/campaigns/{campaign_no}"
resp: requests.Response = requests.get(
url,
timeout=2,
headers={
"Accept": "application/json",
"User-Agent": USER_AGENT,
},
)
resp.raise_for_status()
data: dict[str, Any] = resp.json()
campaign_data: ChzzkCampaignV2
campaign_data = ChzzkApiResponseV2.model_validate(data).content
# Prepare raw JSON defaults for both API versions so DB inserts won't fail
raw_json_v1_val = data if api_version == "v1" else {}
raw_json_v2_val = data if api_version == "v2" else {}
# Save campaign
campaign_obj, created = ChzzkCampaign.objects.update_or_create(
campaign_no=campaign_data.campaign_no,
defaults={
"title": campaign_data.title,
"image_url": campaign_data.image_url,
"description": campaign_data.description,
"category_type": campaign_data.category_type,
"category_id": campaign_data.category_id,
"category_value": campaign_data.category_value,
"pc_link_url": campaign_data.pc_link_url,
"mobile_link_url": campaign_data.mobile_link_url,
"service_id": campaign_data.service_id,
"state": campaign_data.state,
"start_date": campaign_data.start_date,
"end_date": campaign_data.end_date,
"has_ios_based_reward": campaign_data.has_ios_based_reward,
"drops_campaign_not_started": campaign_data.drops_campaign_not_started,
"campaign_reward_type": getattr(
campaign_data,
"campaign_reward_type",
"",
),
"reward_type": getattr(campaign_data, "reward_type", ""),
"account_link_url": campaign_data.account_link_url,
"scraped_at": timezone.now(),
"scrape_status": "success",
"raw_json_v1": raw_json_v1_val,
"raw_json_v2": raw_json_v2_val,
},
)
if created:
self.stdout.write(
self.style.SUCCESS(f"Created campaign {campaign_no}"),
)
resp.raise_for_status()
data: dict[str, Any] = resp.json()
campaign_data: ChzzkCampaignV1 | ChzzkCampaignV2
if api_version == "v1":
campaign_data = ChzzkApiResponseV1.model_validate(data).content
elif api_version == "v2":
campaign_data = ChzzkApiResponseV2.model_validate(data).content
else:
msg: str = f"Unknown API version: {api_version}"
self.stdout.write(self.style.ERROR(msg))
continue
# Save campaign
campaign_obj, created = ChzzkCampaign.objects.update_or_create(
campaign_no=campaign_data.campaign_no,
source_api=api_version,
for reward in campaign_data.reward_list:
reward_, created = ChzzkReward.objects.update_or_create(
campaign=campaign_obj,
reward_no=reward.reward_no,
defaults={
"title": campaign_data.title,
"image_url": campaign_data.image_url,
"description": campaign_data.description,
"category_type": campaign_data.category_type,
"category_id": campaign_data.category_id,
"category_value": campaign_data.category_value,
"pc_link_url": campaign_data.pc_link_url,
"mobile_link_url": campaign_data.mobile_link_url,
"service_id": campaign_data.service_id,
"state": campaign_data.state,
"start_date": campaign_data.start_date,
"end_date": campaign_data.end_date,
"has_ios_based_reward": campaign_data.has_ios_based_reward,
"drops_campaign_not_started": campaign_data.drops_campaign_not_started,
"image_url": reward.image_url,
"title": reward.title,
"reward_type": reward.reward_type,
"campaign_reward_type": getattr(
campaign_data,
reward,
"campaign_reward_type",
"",
),
"reward_type": getattr(campaign_data, "reward_type", ""),
"account_link_url": campaign_data.account_link_url,
"scraped_at": timezone.now(),
"scrape_status": "success",
"raw_json": data,
"condition_type": reward.condition_type,
"condition_for_minutes": reward.condition_for_minutes,
"ios_based_reward": reward.ios_based_reward,
"code_remaining_count": reward.code_remaining_count,
},
)
if created:
self.stdout.write(
self.style.SUCCESS(
f"Created campaign {campaign_no} from {api_version}",
f" Created reward {reward_.reward_no} for campaign {campaign_no}",
),
)
for reward in campaign_data.reward_list:
reward_, created = ChzzkReward.objects.update_or_create(
campaign=campaign_obj,
reward_no=reward.reward_no,
defaults={
"image_url": reward.image_url,
"title": reward.title,
"reward_type": reward.reward_type,
"campaign_reward_type": getattr(
reward,
"campaign_reward_type",
"",
),
"condition_type": reward.condition_type,
"condition_for_minutes": reward.condition_for_minutes,
"ios_based_reward": reward.ios_based_reward,
"code_remaining_count": reward.code_remaining_count,
},
)
if created:
self.stdout.write(
self.style.SUCCESS(
f" Created reward {reward_.reward_no} for campaign {campaign_no}",
),
)
self.stdout.write(
self.style.SUCCESS(
f"Imported campaign {campaign_no} from {api_version}",
),
)
self.stdout.write(
self.style.SUCCESS(f"Imported campaign {campaign_no}"),
)