diff --git a/twitch/management/commands/better_import_drops.py b/twitch/management/commands/better_import_drops.py index a3c9e13..bcdd31c 100644 --- a/twitch/management/commands/better_import_drops.py +++ b/twitch/management/commands/better_import_drops.py @@ -42,6 +42,55 @@ def move_failed_validation_file(file_path: Path) -> Path: return broken_dir +def move_file_to_broken_subdir(file_path: Path, subdir: str) -> Path: + """Move file to a nested broken/ directory and return that directory. + + Args: + file_path: The file to move. + subdir: Subdirectory name under "broken" (e.g., the matched keyword). + + Returns: + Path to the directory where the file was moved. + """ + broken_dir: Path = Path.home() / "broken" / subdir + broken_dir.mkdir(parents=True, exist_ok=True) + + target_file: Path = broken_dir / file_path.name + file_path.rename(target_file) + + return broken_dir + + +def detect_non_campaign_keyword(raw_text: str) -> str | None: + """Detect if payload is a known non-drop-campaign response. + + Looks for operationName values that are commonly present in unrelated + Twitch API responses. Returns the matched keyword if found. + + Args: + raw_text: The raw JSON text to scan. + + Returns: + The matched keyword, or None if no match found. + """ + probably_shit: list[str] = [ + "ChannelPointsContext", + "ClaimCommunityPoints", + "DirectoryPage_Game", + "DropCurrentSessionContext", + "DropsPage_ClaimDropRewards", + "OnsiteNotifications_DeleteNotification", + "PlaybackAccessToken", + "streamPlaybackAccessToken", + "VideoPlayerStreamInfoOverlayChannel", + ] + + for keyword in probably_shit: + if f'"operationName": "{keyword}"' in raw_text: + return keyword + return None + + class Command(BaseCommand): """Import Twitch drop campaign data from a JSON file or directory of JSON files.""" @@ -59,7 +108,7 @@ class Command(BaseCommand): parser.add_argument("path", type=str, help="Path to JSON file or directory") parser.add_argument("--recursive", action="store_true", help="Recursively search directories for JSON files") parser.add_argument("--crash-on-error", action="store_true", help="Crash the command on first error instead of continuing") - parser.add_argument("--verbose", action="store_true", help="Print per-file success messages (very chatty)") + parser.add_argument("--verbose", action="store_true", help="Print per-file success messages") def pre_fill_cache(self) -> None: """Load all existing IDs from DB into memory to avoid N+1 queries.""" @@ -145,7 +194,11 @@ class Command(BaseCommand): progress_bar.write(f"{Fore.GREEN}✓{Style.RESET_ALL} {file_path.name}") else: failed_count += 1 - progress_bar.write(f"{Fore.RED}✗{Style.RESET_ALL} {file_path.name} → {result['broken_dir']}/{file_path.name}") + reason = result.get("reason") if isinstance(result, dict) else None + if reason: + progress_bar.write(f"{Fore.RED}✗{Style.RESET_ALL} {file_path.name} → {result['broken_dir']}/{file_path.name} ({reason})") + else: + progress_bar.write(f"{Fore.RED}✗{Style.RESET_ALL} {file_path.name} → {result['broken_dir']}/{file_path.name}") except (OSError, ValueError, KeyError) as e: error_count += 1 progress_bar.write(f"{Fore.RED}✗{Style.RESET_ALL} {file_path.name} (error: {e})") @@ -208,7 +261,15 @@ class Command(BaseCommand): Dict with success status and optional broken_dir path """ try: - ViewerDropsDashboardPayload.model_validate_json(file_path.read_text(encoding="utf-8")) + raw_text: str = file_path.read_text(encoding="utf-8", errors="ignore") + + # Fast pre-filter: check for known non-campaign keywords and move early + matched: str | None = detect_non_campaign_keyword(raw_text) + if matched: + broken_dir: Path = move_file_to_broken_subdir(file_path, matched) + return {"success": False, "broken_dir": str(broken_dir), "reason": f"matched '{matched}'"} + + ViewerDropsDashboardPayload.model_validate_json(raw_text) except ValidationError: if options["crash_on_error"]: raise @@ -236,7 +297,16 @@ class Command(BaseCommand): dynamic_ncols=True, ) as progress_bar: try: - _: ViewerDropsDashboardPayload = ViewerDropsDashboardPayload.model_validate_json(file_path.read_text(encoding="utf-8")) + raw_text: str = file_path.read_text(encoding="utf-8", errors="ignore") + + # Fast pre-filter for non-campaign responses + matched: str | None = detect_non_campaign_keyword(raw_text) + if matched: + broken_dir: Path = move_file_to_broken_subdir(file_path, matched) + progress_bar.write(f"{Fore.RED}✗{Style.RESET_ALL} {file_path.name} → {broken_dir}/{file_path.name} (matched '{matched}')") + return + + _: ViewerDropsDashboardPayload = ViewerDropsDashboardPayload.model_validate_json(raw_text) progress_bar.update(1) progress_bar.write(f"{Fore.GREEN}✓{Style.RESET_ALL} {file_path.name}") except ValidationError: