From 998c6703d84f80ad984fc92d98e81596a5f5ab2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Hells=C3=A9n?= Date: Wed, 30 Jul 2025 05:11:57 +0200 Subject: [PATCH] WIP: Refactor JSON importer --- .../commands/import_drop_campaign.py | 356 +++++++----------- 1 file changed, 132 insertions(+), 224 deletions(-) diff --git a/twitch/management/commands/import_drop_campaign.py b/twitch/management/commands/import_drop_campaign.py index d5c2fe7..83124a5 100644 --- a/twitch/management/commands/import_drop_campaign.py +++ b/twitch/management/commands/import_drop_campaign.py @@ -1,14 +1,12 @@ from __future__ import annotations -import concurrent.futures import json import shutil -import time from pathlib import Path from typing import Any from django.core.management.base import BaseCommand, CommandError, CommandParser -from django.db import OperationalError, transaction +from django.db import transaction from twitch.models import DropBenefit, DropBenefitEdge, DropCampaign, Game, Organization, TimeBasedDrop @@ -35,30 +33,6 @@ class Command(BaseCommand): default="processed", help="Name of subdirectory to move processed files to (default: 'processed')", ) - parser.add_argument( - "--max-workers", - type=int, - default=100, - help="Maximum number of worker processes to use for parallel importing (default: 100)", - ) - parser.add_argument( - "--batch-size", - type=int, - default=500, - help="Number of files to process in each batch (default: 500)", - ) - parser.add_argument( - "--max-retries", - type=int, - default=5, - help="Maximum number of retries for database operations when SQLite is locked (default: 5)", - ) - parser.add_argument( - "--retry-delay", - type=float, - default=0.5, - help="Delay in seconds between retries for database operations (default: 0.5)", - ) def handle(self, **options) -> None: """Execute the command. @@ -72,47 +46,31 @@ class Command(BaseCommand): """ path: str = options["path"] processed_dir: str = options["processed_dir"] - max_workers: int = options["max_workers"] - batch_size: int = options["batch_size"] - max_retries: int = options["max_retries"] - retry_delay: float = options["retry_delay"] path_obj = Path(path) - # Store retry configuration in instance variables to make them available to other methods - self.max_retries = max_retries - self.retry_delay = retry_delay - - # Check if path exists if not path_obj.exists(): - msg = f"Path {path} does not exist" + msg: str = f"Path {path} does not exist" raise CommandError(msg) - # Process single file or directory if path_obj.is_file(): self._process_file(path_obj, processed_dir) elif path_obj.is_dir(): - self._process_directory(path_obj, processed_dir, max_workers, batch_size) + self._process_directory(path_obj, processed_dir) else: msg = f"Path {path} is neither a file nor a directory" raise CommandError(msg) - def _process_directory(self, directory: Path, processed_dir: str, max_workers: int = 100, batch_size: int = 1000) -> None: + def _process_directory(self, directory: Path, processed_dir: str) -> None: """Process all JSON files in a directory using parallel processing. Args: directory: Path to the directory. processed_dir: Name of subdirectory to move processed files to. - max_workers: Maximum number of worker processes to use. - batch_size: Number of files to process in each batch. """ - # Create processed directory if it doesn't exist - processed_path = directory / processed_dir - if not processed_path.exists(): - processed_path.mkdir() - self.stdout.write(f"Created directory for processed files: {processed_path}") + processed_path: Path = directory / processed_dir + processed_path.mkdir(exist_ok=True) - # Process all JSON files in the directory - json_files = list(directory.glob("*.json")) + json_files: list[Path] = list(directory.glob("*.json")) if not json_files: self.stdout.write(self.style.WARNING(f"No JSON files found in {directory}")) return @@ -120,223 +78,173 @@ class Command(BaseCommand): total_files = len(json_files) self.stdout.write(f"Found {total_files} JSON files to process") - # Process files in batches to avoid memory issues - processed_files = 0 - error_count = 0 - imported_campaigns = 0 - - # Process files in batches with parallel workers - for i in range(0, total_files, batch_size): - batch = json_files[i : i + batch_size] - batch_size_actual = len(batch) - self.stdout.write(f"Processing batch {i // batch_size + 1} with {batch_size_actual} files...") - - # Process batch files concurrently - with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - # Submit all files in the batch for processing - future_to_file = {executor.submit(self._process_file, json_file, processed_dir): json_file for json_file in batch} - - # Process results as they complete - for future in concurrent.futures.as_completed(future_to_file): - json_file = future_to_file[future] - try: - # Get the number of campaigns imported from this file - num_campaigns = future.result() - processed_files += 1 - imported_campaigns += num_campaigns - - if processed_files % 100 == 0 or processed_files == total_files: - self.stdout.write( - self.style.SUCCESS( - f"Progress: {processed_files}/{total_files} files processed " - f"({processed_files / total_files * 100:.1f}%), " - f"{imported_campaigns} campaigns imported" - ) - ) - except CommandError as e: - error_count += 1 - self.stdout.write(self.style.ERROR(f"Error processing {json_file}: {e}")) - except (ValueError, TypeError, AttributeError, KeyError, IndexError) as e: - # Handle common errors explicitly instead of catching all exceptions - error_count += 1 - self.stdout.write(self.style.ERROR(f"Data error processing {json_file}: {e!s}")) + for json_file in json_files: + self.stdout.write(f"Processing file {json_file.name}...") + try: + self._process_file(json_file, processed_dir) + except CommandError as e: + self.stdout.write(self.style.ERROR(f"Error processing {json_file}: {e}")) + except (ValueError, TypeError, AttributeError, KeyError, IndexError, json.JSONDecodeError) as e: + self.stdout.write(self.style.ERROR(f"Data error processing {json_file}: {e!s}")) self.stdout.write( - self.style.SUCCESS( - f"Completed processing {processed_files} files with {error_count} errors. Imported {imported_campaigns} drop campaigns." - ) + self.style.SUCCESS(f"Completed processing {total_files} JSON files in {directory}. Processed files moved to {processed_dir}.") ) - def _process_file(self, file_path: Path, processed_dir: str) -> int: + def _process_file(self, file_path: Path, processed_dir: str) -> None: """Process a single JSON file. Args: file_path: Path to the JSON file. processed_dir: Name of subdirectory to move processed files to. - Returns: - int: Number of drop campaigns imported from this file. - Raises: CommandError: If the file isn't a JSON file or has invalid JSON structure. """ - # Validate file is a JSON file - if not file_path.name.endswith(".json"): - msg = f"File {file_path} is not a JSON file" - raise CommandError(msg) + with file_path.open(encoding="utf-8") as f: + data = json.load(f) - # Load JSON data - try: - with file_path.open(encoding="utf-8") as f: - data = json.load(f) - except json.JSONDecodeError: - error_dir_name = "error" - error_dir: Path = file_path.parent / error_dir_name - error_dir.mkdir(exist_ok=True) - self.stdout.write(self.style.WARNING(f"Invalid JSON in '{file_path.name}'. Moving to '{error_dir_name}'.")) - shutil.move(str(file_path), str(error_dir / file_path.name)) - return 0 - - # Counter for imported campaigns - campaigns_imported = 0 - - # Check if data is a list (array of objects) if isinstance(data, list): - # Process each item in the list for item in data: if "data" in item and "user" in item["data"] and "dropCampaign" in item["data"]["user"]: drop_campaign_data = item["data"]["user"]["dropCampaign"] - # Process the data with retry logic for database locks self._import_drop_campaign_with_retry(drop_campaign_data) - campaigns_imported += 1 + else: - # Check if the JSON has the expected structure for a single object if "data" not in data or "user" not in data["data"] or "dropCampaign" not in data["data"]["user"]: msg = "Invalid JSON structure: Missing data.user.dropCampaign" raise CommandError(msg) - # Extract drop campaign data for a single object drop_campaign_data = data["data"]["user"]["dropCampaign"] - # Process the data with retry logic for database locks self._import_drop_campaign_with_retry(drop_campaign_data) - campaigns_imported += 1 - # Move the processed file to the processed directory if processed_dir: - processed_path = file_path.parent / processed_dir - if not processed_path.exists(): - processed_path.mkdir() + processed_path: Path = file_path.parent / processed_dir + processed_path.mkdir(exist_ok=True) - # Move the file to the processed directory - new_path = processed_path / file_path.name + new_path: Path = processed_path / file_path.name shutil.move(str(file_path), str(new_path)) - # Return the number of campaigns imported - return campaigns_imported - def _import_drop_campaign_with_retry(self, campaign_data: dict[str, Any]) -> None: """Import drop campaign data into the database with retry logic for SQLite locks. Args: campaign_data: The drop campaign data to import. - - Raises: - OperationalError: If the database is still locked after max retries. """ - # Retry logic for database operations - max_retries = getattr(self, "max_retries", 5) # Default to 5 if not set - retry_delay = getattr(self, "retry_delay", 0.5) # Default to 0.5 seconds if not set + with transaction.atomic(): + game: Game = self.game_update_or_create(campaign_data=campaign_data) - for attempt in range(max_retries): - try: - with transaction.atomic(): - # First, create or update the game - game_data = campaign_data["game"] - game, _ = Game.objects.update_or_create( - id=game_data["id"], + organization: Organization = self.owner_update_or_create(campaign_data=campaign_data) + + drop_campaign: DropCampaign = self.drop_campaign_update_or_get( + campaign_data=campaign_data, + game=game, + organization=organization, + ) + + for drop_data in campaign_data.get("timeBasedDrops", []): + time_based_drop, _ = TimeBasedDrop.objects.update_or_create( + id=drop_data["id"], + defaults={ + "name": drop_data["name"], + "required_minutes_watched": drop_data["requiredMinutesWatched"], + "required_subs": drop_data.get("requiredSubs", 0), + "start_at": drop_data["startAt"], + "end_at": drop_data["endAt"], + "campaign": drop_campaign, + }, + ) + + for benefit_edge in drop_data.get("benefitEdges", []): + benefit_data = benefit_edge["benefit"] + benefit, _ = DropBenefit.objects.update_or_create( + id=benefit_data["id"], defaults={ - "slug": game_data.get("slug", ""), - "display_name": game_data["displayName"], - }, - ) - - # Create or update the organization - org_data = campaign_data["owner"] - organization, _ = Organization.objects.update_or_create( - id=org_data["id"], - defaults={"name": org_data["name"]}, - ) - - # Create or update the drop campaign - drop_campaign, _ = DropCampaign.objects.update_or_create( - id=campaign_data["id"], - defaults={ - "name": campaign_data["name"], - "description": campaign_data["description"].replace("\\n", "\n"), - "details_url": campaign_data.get("detailsURL", ""), - "account_link_url": campaign_data.get("accountLinkURL", ""), - "image_url": campaign_data.get("imageURL", ""), - "start_at": campaign_data["startAt"], - "end_at": campaign_data["endAt"], - "is_account_connected": campaign_data["self"]["isAccountConnected"], + "name": benefit_data["name"], + "image_asset_url": benefit_data.get("imageAssetURL", ""), + "created_at": benefit_data["createdAt"], + "entitlement_limit": benefit_data.get("entitlementLimit", 1), + "is_ios_available": benefit_data.get("isIosAvailable", False), + "distribution_type": benefit_data["distributionType"], "game": game, - "owner": organization, + "owner_organization": organization, }, ) - # Process time-based drops - for drop_data in campaign_data.get("timeBasedDrops", []): - time_based_drop, _ = TimeBasedDrop.objects.update_or_create( - id=drop_data["id"], - defaults={ - "name": drop_data["name"], - "required_minutes_watched": drop_data["requiredMinutesWatched"], - "required_subs": drop_data.get("requiredSubs", 0), - "start_at": drop_data["startAt"], - "end_at": drop_data["endAt"], - "campaign": drop_campaign, - }, - ) + DropBenefitEdge.objects.update_or_create( + drop=time_based_drop, + benefit=benefit, + defaults={ + "entitlement_limit": benefit_edge.get("entitlementLimit", 1), + }, + ) + self.stdout.write(self.style.SUCCESS(f"Successfully imported drop campaign {drop_campaign.name} (ID: {drop_campaign.id})")) - # Process benefits - for benefit_edge in drop_data.get("benefitEdges", []): - benefit_data = benefit_edge["benefit"] - benefit, _ = DropBenefit.objects.update_or_create( - id=benefit_data["id"], - defaults={ - "name": benefit_data["name"], - "image_asset_url": benefit_data.get("imageAssetURL", ""), - "created_at": benefit_data["createdAt"], - "entitlement_limit": benefit_data.get("entitlementLimit", 1), - "is_ios_available": benefit_data.get("isIosAvailable", False), - "distribution_type": benefit_data["distributionType"], - "game": game, - "owner_organization": organization, - }, - ) + def drop_campaign_update_or_get(self, campaign_data: dict[str, Any], game: Game, organization: Organization) -> DropCampaign: + """Update or create a drop campaign. - # Create the relationship between drop and benefit - DropBenefitEdge.objects.update_or_create( - drop=time_based_drop, - benefit=benefit, - defaults={ - "entitlement_limit": benefit_edge.get("entitlementLimit", 1), - }, - ) - # If we get here, the transaction completed successfully - break - except OperationalError as e: - # Check if this is a database lock error - if "database is locked" in str(e).lower(): - if attempt < max_retries - 1: # Don't sleep on the last attempt - sleep_time = retry_delay * (2**attempt) # Exponential backoff - self.stdout.write( - self.style.WARNING(f"Database locked, retrying in {sleep_time:.2f}s (attempt {attempt + 1}/{max_retries})") - ) - time.sleep(sleep_time) - else: - self.stdout.write(self.style.ERROR(f"Database still locked after {max_retries} attempts")) - raise - else: - # Not a lock error, re-raise - raise + Args: + campaign_data: The drop campaign data to import. + game: The game this drop campaing is for. + organization: The company that owns the game. + + Returns: + Returns the DropCampaing object. + """ + drop_campaign, created = DropCampaign.objects.update_or_create( + id=campaign_data["id"], + defaults={ + "name": campaign_data["name"], + "description": campaign_data["description"].replace("\\n", "\n"), + "details_url": campaign_data.get("detailsURL", ""), + "account_link_url": campaign_data.get("accountLinkURL", ""), + "image_url": campaign_data.get("imageURL", ""), + "start_at": campaign_data["startAt"], + "end_at": campaign_data["endAt"], + "is_account_connected": campaign_data["self"]["isAccountConnected"], + "game": game, + "owner": organization, + }, + ) + if created: + self.stdout.write(self.style.SUCCESS(f"Created new drop campaign: {drop_campaign.name} (ID: {drop_campaign.id})")) + return drop_campaign + + def owner_update_or_create(self, campaign_data: dict[str, Any]) -> Organization: + """Update or create an orgnization. + + Args: + campaign_data: The drop campaign data to import. + + Returns: + Returns the Organization object. + """ + org_data: dict[str, Any] = campaign_data["owner"] + organization, created = Organization.objects.update_or_create( + id=org_data["id"], + defaults={"name": org_data["name"]}, + ) + if created: + self.stdout.write(self.style.SUCCESS(f"Created new organization: {organization.name} (ID: {organization.id})")) + return organization + + def game_update_or_create(self, campaign_data: dict[str, Any]) -> Game: + """Update or create a game. + + Args: + campaign_data: The drop campaign data to import. + + Returns: + Returns the Game object. + """ + game_data: dict[str, Any] = campaign_data["game"] + game, created = Game.objects.update_or_create( + id=game_data["id"], + defaults={ + "slug": game_data.get("slug", ""), + "display_name": game_data["displayName"], + }, + ) + if created: + self.stdout.write(self.style.SUCCESS(f"Created new game: {game.display_name} (ID: {game.id})")) + return game