diff --git a/chzzk/management/commands/import_chzzk_campaign.py b/chzzk/management/commands/import_chzzk_campaign.py index 6288923..3ba11ed 100644 --- a/chzzk/management/commands/import_chzzk_campaign.py +++ b/chzzk/management/commands/import_chzzk_campaign.py @@ -5,20 +5,20 @@ if TYPE_CHECKING: import argparse from chzzk.schemas import ChzzkCampaignV2 + from chzzk.schemas import ChzzkRewardV2 -from typing import TYPE_CHECKING - import requests from django.core.management.base import BaseCommand +from django.core.management.base import CommandError from django.utils import timezone from chzzk.models import ChzzkCampaign from chzzk.models import ChzzkReward from chzzk.schemas import ChzzkApiResponseV2 -if TYPE_CHECKING: - import argparse +MAX_CAMPAIGN_OUTLIER_THRESHOLD: int = 100_000_000 +MAX_CAMPAIGN_OUTLIER_GAP: int = 1_000 CHZZK_API_URLS: list[tuple[str, str]] = [ ("v1", "https://api.chzzk.naver.com/service/v1/drops/campaigns/{campaign_no}"), @@ -37,11 +37,114 @@ class Command(BaseCommand): def add_arguments(self, parser: argparse.ArgumentParser) -> None: """Add command-line arguments for the management command.""" - parser.add_argument("campaign_no", type=int, help="Campaign number to fetch") + parser.add_argument( + "campaign_no", + nargs="?", + type=int, + help="Campaign number to fetch (required unless --latest is used)", + ) + parser.add_argument( + "--latest", + action="store_true", + help=( + "Fetches the highest existing campaign_no and imports " + "missing IDs from latest-5..latest-1 plus latest+1..latest+5." + ), + ) def handle(self, **options) -> None: - """Main handler for the management command. Fetches campaign data from both API versions, validates, and stores them.""" - campaign_no: int = int(options["campaign_no"]) + """Main handler for the management command. + + Raises: + CommandError: If campaign_no is missing when --latest is not used. + """ + latest: bool = bool(options.get("latest")) + campaign_no: int | None = options.get("campaign_no") + + if latest: + to_import: list[int] = self.get_campaign_import_candidates() + + if not to_import: + msg: str = "Nothing to import with --latest at this time." + self.stdout.write(self.style.SUCCESS(msg)) + return + + for target_no in to_import: + self.stdout.write(f"Importing campaign {target_no}...") + self._import_campaign(target_no) + + self.stdout.write(self.style.SUCCESS("--latest import completed.")) + return + + if campaign_no is None: + err_msg: str = "campaign_no is required unless --latest is used" + raise CommandError(err_msg) + + self._import_campaign(int(campaign_no)) + + def get_campaign_import_candidates(self) -> list[int]: + """Determine which campaign numbers to import when --latest is used. + + Returns: + list[int]: A list of campaign numbers that should be imported. + """ + # Handle potential outliers by checking the top two campaign IDs. + campaign_ids = list( + ChzzkCampaign.objects.order_by("-campaign_no").values_list( + "campaign_no", + flat=True, + )[:2], + ) + + max_campaign_no: int = campaign_ids[0] if campaign_ids else 0 + second_max_campaign_no: int = campaign_ids[1] if len(campaign_ids) > 1 else 0 + + if ( + max_campaign_no > MAX_CAMPAIGN_OUTLIER_THRESHOLD + and max_campaign_no - second_max_campaign_no > MAX_CAMPAIGN_OUTLIER_GAP + ): + self.stdout.write( + self.style.WARNING( + f"Detected an outlier max campaign_no {max_campaign_no}; " + f"using second max {second_max_campaign_no} instead.", + ), + ) + max_campaign_no = second_max_campaign_no + + msg: str = f"Max campaign_no in database: {max_campaign_no}" + self.stdout.write(self.style.SUCCESS(msg)) + + if max_campaign_no <= 0: + backfill_candidates: list[int] = [] + else: + backfill_start: int = max(1, max_campaign_no - 5) + existing_lower_ids: set[int] = set( + ChzzkCampaign.objects.filter( + campaign_no__gte=backfill_start, + campaign_no__lt=max_campaign_no, + ).values_list("campaign_no", flat=True), + ) + backfill_candidates: list[int] = [ + idx + for idx in range(backfill_start, max_campaign_no) + if idx not in existing_lower_ids + ] + + new_candidates: list[int] = list( + range( + max_campaign_no + 1, + max_campaign_no + 6, + ), + ) + to_import: list[int] = backfill_candidates + new_candidates + return to_import + + def _import_campaign(self, campaign_no: int) -> None: + """Import a single campaign by its campaign number. + + Args: + campaign_no (int): The campaign number to import. + """ api_version: str = "v2" # TODO(TheLovinator): Add support for v1 API # noqa: TD003 url: str = f"https://api.chzzk.naver.com/service/{api_version}/drops/campaigns/{campaign_no}" resp: requests.Response = requests.get( @@ -52,87 +155,166 @@ class Command(BaseCommand): "User-Agent": USER_AGENT, }, ) + try: resp.raise_for_status() except requests.HTTPError as e: json_msg: str = "" if resp.headers.get("Content-Type", "").startswith("application/json"): - error_data = resp.json() + error_data: dict[str, Any] = resp.json() json_msg = error_data.get("message", "") msg: str = f"Failed to fetch campaign {campaign_no}: {e} - {json_msg}" self.stdout.write(self.style.ERROR(msg)) return + data: dict[str, Any] = resp.json() + cd: ChzzkCampaignV2 = ChzzkApiResponseV2.model_validate(data).content - campaign_data: ChzzkCampaignV2 - campaign_data = ChzzkApiResponseV2.model_validate(data).content + campaign_obj: ChzzkCampaign = self.import_campaign_data( + campaign_no=campaign_no, + api_version=api_version, + data=data, + cd=cd, + ) - # Prepare raw JSON defaults for both API versions so DB inserts won't fail - raw_json_v1_val = data if api_version == "v1" else {} - raw_json_v2_val = data if api_version == "v2" else {} + cd_reward_list: list[ChzzkRewardV2] = cd.reward_list + for reward in cd_reward_list: + self.update_or_create_reward(campaign_no, campaign_obj, reward) - # Save campaign - campaign_obj, created = ChzzkCampaign.objects.update_or_create( - campaign_no=campaign_data.campaign_no, + self.stdout.write(self.style.SUCCESS(f"Imported campaign {campaign_no}")) + + def update_or_create_reward( + self, + campaign_no: int, + campaign_obj: ChzzkCampaign, + reward: ChzzkRewardV2, + ) -> None: + """Update or create a reward for a given campaign. + + Args: + campaign_no (int): The campaign number the reward belongs to. + campaign_obj (ChzzkCampaign): The campaign database object the reward belongs to. + reward (ChzzkRewardV2): The reward data parsed from the API response. + """ + reward_defaults: dict[str, Any] = { + "image_url": reward.image_url, + "title": reward.title, + "reward_type": reward.reward_type, + "campaign_reward_type": getattr( + reward, + "campaign_reward_type", + "", + ), + "condition_type": reward.condition_type, + "condition_for_minutes": reward.condition_for_minutes, + "ios_based_reward": reward.ios_based_reward, + "code_remaining_count": reward.code_remaining_count, + } + + reward_, created = ChzzkReward.objects.get_or_create( + campaign=campaign_obj, + reward_no=reward.reward_no, + defaults=reward_defaults, + ) + + if created: + msg: str = f"Created reward {reward_.reward_no} for campaign {campaign_no}" + self.stdout.write(self.style.SUCCESS(msg)) + return + + updated_reward: bool = self._apply_updates_if_changed(reward_, reward_defaults) + if updated_reward: + msg: str = f" Updated reward {reward_.reward_no} for campaign {campaign_no} (changes detected)" + self.stdout.write(self.style.SUCCESS(msg)) + + def _apply_updates_if_changed( + self, + instance: ChzzkCampaign | ChzzkReward, + changes: dict[str, Any], + ) -> bool: + """Update a model instance only if values have changed. + + Returns: + bool: True if an update occurred, False if no changes were needed. + """ + fields_to_update: list[str] = [] + + for field_name, new_value in changes.items(): + if getattr(instance, field_name) != new_value: + setattr(instance, field_name, new_value) + fields_to_update.append(field_name) + + if fields_to_update: + instance.save(update_fields=fields_to_update) + return True + + return False + + def import_campaign_data( + self, + campaign_no: int, + api_version: str, + data: dict[str, Any], + cd: ChzzkCampaignV2, + ) -> ChzzkCampaign: + """Import campaign data into the database. + + Args: + campaign_no (int): The campaign number being imported. + api_version (str): The API version used to fetch the data ("v1" or "v2"). + data (dict[str, Any]): The raw JSON data returned from the API. + cd (ChzzkCampaignV2): The parsed campaign data from the API response. + + Returns: + ChzzkCampaign: The imported or updated campaign database object. + """ + raw_json_v1_val: dict[str, Any] = data if api_version == "v1" else {} + raw_json_v2_val: dict[str, Any] = data if api_version == "v2" else {} + + defaults: dict[str, Any] = { + "title": cd.title, + "image_url": cd.image_url, + "description": cd.description, + "category_type": cd.category_type, + "category_id": cd.category_id, + "category_value": cd.category_value, + "pc_link_url": cd.pc_link_url, + "mobile_link_url": cd.mobile_link_url, + "service_id": cd.service_id, + "state": cd.state, + "start_date": cd.start_date, + "end_date": cd.end_date, + "has_ios_based_reward": cd.has_ios_based_reward, + "drops_campaign_not_started": cd.drops_campaign_not_started, + "campaign_reward_type": getattr(cd, "campaign_reward_type", ""), + "reward_type": getattr(cd, "reward_type", ""), + "account_link_url": cd.account_link_url, + "raw_json_v1": raw_json_v1_val, + "raw_json_v2": raw_json_v2_val, + } + + campaign_obj, created = ChzzkCampaign.objects.get_or_create( + campaign_no=cd.campaign_no, defaults={ - "title": campaign_data.title, - "image_url": campaign_data.image_url, - "description": campaign_data.description, - "category_type": campaign_data.category_type, - "category_id": campaign_data.category_id, - "category_value": campaign_data.category_value, - "pc_link_url": campaign_data.pc_link_url, - "mobile_link_url": campaign_data.mobile_link_url, - "service_id": campaign_data.service_id, - "state": campaign_data.state, - "start_date": campaign_data.start_date, - "end_date": campaign_data.end_date, - "has_ios_based_reward": campaign_data.has_ios_based_reward, - "drops_campaign_not_started": campaign_data.drops_campaign_not_started, - "campaign_reward_type": getattr( - campaign_data, - "campaign_reward_type", - "", - ), - "reward_type": getattr(campaign_data, "reward_type", ""), - "account_link_url": campaign_data.account_link_url, + **defaults, "scraped_at": timezone.now(), "scrape_status": "success", - "raw_json_v1": raw_json_v1_val, - "raw_json_v2": raw_json_v2_val, }, ) - if created: - self.stdout.write( - self.style.SUCCESS(f"Created campaign {campaign_no}"), - ) - for reward in campaign_data.reward_list: - reward_, created = ChzzkReward.objects.update_or_create( - campaign=campaign_obj, - reward_no=reward.reward_no, - defaults={ - "image_url": reward.image_url, - "title": reward.title, - "reward_type": reward.reward_type, - "campaign_reward_type": getattr( - reward, - "campaign_reward_type", - "", - ), - "condition_type": reward.condition_type, - "condition_for_minutes": reward.condition_for_minutes, - "ios_based_reward": reward.ios_based_reward, - "code_remaining_count": reward.code_remaining_count, - }, - ) - if created: - self.stdout.write( - self.style.SUCCESS( - f" Created reward {reward_.reward_no} for campaign {campaign_no}", - ), - ) - self.stdout.write( - self.style.SUCCESS(f"Imported campaign {campaign_no}"), - ) + if created: + msg: str = f"Created campaign {campaign_no}" + self.stdout.write(self.style.SUCCESS(msg)) + return campaign_obj + + updated: bool = self._apply_updates_if_changed(campaign_obj, defaults) + + if updated: + campaign_obj.scraped_at = timezone.now() + campaign_obj.scrape_status = "success" + campaign_obj.save(update_fields=["scraped_at", "scrape_status"]) + msg: str = f"Updated campaign {campaign_no} (changes detected)" + self.stdout.write(self.style.SUCCESS(msg)) + + return campaign_obj