WIP: Refactor JSON importer

This commit is contained in:
Joakim Hellsén 2025-07-30 05:11:57 +02:00
commit 998c6703d8

View file

@ -1,14 +1,12 @@
from __future__ import annotations
import concurrent.futures
import json
import shutil
import time
from pathlib import Path
from typing import Any
from django.core.management.base import BaseCommand, CommandError, CommandParser
from django.db import OperationalError, transaction
from django.db import transaction
from twitch.models import DropBenefit, DropBenefitEdge, DropCampaign, Game, Organization, TimeBasedDrop
@ -35,30 +33,6 @@ class Command(BaseCommand):
default="processed",
help="Name of subdirectory to move processed files to (default: 'processed')",
)
parser.add_argument(
"--max-workers",
type=int,
default=100,
help="Maximum number of worker processes to use for parallel importing (default: 100)",
)
parser.add_argument(
"--batch-size",
type=int,
default=500,
help="Number of files to process in each batch (default: 500)",
)
parser.add_argument(
"--max-retries",
type=int,
default=5,
help="Maximum number of retries for database operations when SQLite is locked (default: 5)",
)
parser.add_argument(
"--retry-delay",
type=float,
default=0.5,
help="Delay in seconds between retries for database operations (default: 0.5)",
)
def handle(self, **options) -> None:
"""Execute the command.
@ -72,47 +46,31 @@ class Command(BaseCommand):
"""
path: str = options["path"]
processed_dir: str = options["processed_dir"]
max_workers: int = options["max_workers"]
batch_size: int = options["batch_size"]
max_retries: int = options["max_retries"]
retry_delay: float = options["retry_delay"]
path_obj = Path(path)
# Store retry configuration in instance variables to make them available to other methods
self.max_retries = max_retries
self.retry_delay = retry_delay
# Check if path exists
if not path_obj.exists():
msg = f"Path {path} does not exist"
msg: str = f"Path {path} does not exist"
raise CommandError(msg)
# Process single file or directory
if path_obj.is_file():
self._process_file(path_obj, processed_dir)
elif path_obj.is_dir():
self._process_directory(path_obj, processed_dir, max_workers, batch_size)
self._process_directory(path_obj, processed_dir)
else:
msg = f"Path {path} is neither a file nor a directory"
raise CommandError(msg)
def _process_directory(self, directory: Path, processed_dir: str, max_workers: int = 100, batch_size: int = 1000) -> None:
def _process_directory(self, directory: Path, processed_dir: str) -> None:
"""Process all JSON files in a directory using parallel processing.
Args:
directory: Path to the directory.
processed_dir: Name of subdirectory to move processed files to.
max_workers: Maximum number of worker processes to use.
batch_size: Number of files to process in each batch.
"""
# Create processed directory if it doesn't exist
processed_path = directory / processed_dir
if not processed_path.exists():
processed_path.mkdir()
self.stdout.write(f"Created directory for processed files: {processed_path}")
processed_path: Path = directory / processed_dir
processed_path.mkdir(exist_ok=True)
# Process all JSON files in the directory
json_files = list(directory.glob("*.json"))
json_files: list[Path] = list(directory.glob("*.json"))
if not json_files:
self.stdout.write(self.style.WARNING(f"No JSON files found in {directory}"))
return
@ -120,223 +78,173 @@ class Command(BaseCommand):
total_files = len(json_files)
self.stdout.write(f"Found {total_files} JSON files to process")
# Process files in batches to avoid memory issues
processed_files = 0
error_count = 0
imported_campaigns = 0
# Process files in batches with parallel workers
for i in range(0, total_files, batch_size):
batch = json_files[i : i + batch_size]
batch_size_actual = len(batch)
self.stdout.write(f"Processing batch {i // batch_size + 1} with {batch_size_actual} files...")
# Process batch files concurrently
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all files in the batch for processing
future_to_file = {executor.submit(self._process_file, json_file, processed_dir): json_file for json_file in batch}
# Process results as they complete
for future in concurrent.futures.as_completed(future_to_file):
json_file = future_to_file[future]
try:
# Get the number of campaigns imported from this file
num_campaigns = future.result()
processed_files += 1
imported_campaigns += num_campaigns
if processed_files % 100 == 0 or processed_files == total_files:
self.stdout.write(
self.style.SUCCESS(
f"Progress: {processed_files}/{total_files} files processed "
f"({processed_files / total_files * 100:.1f}%), "
f"{imported_campaigns} campaigns imported"
)
)
except CommandError as e:
error_count += 1
self.stdout.write(self.style.ERROR(f"Error processing {json_file}: {e}"))
except (ValueError, TypeError, AttributeError, KeyError, IndexError) as e:
# Handle common errors explicitly instead of catching all exceptions
error_count += 1
self.stdout.write(self.style.ERROR(f"Data error processing {json_file}: {e!s}"))
for json_file in json_files:
self.stdout.write(f"Processing file {json_file.name}...")
try:
self._process_file(json_file, processed_dir)
except CommandError as e:
self.stdout.write(self.style.ERROR(f"Error processing {json_file}: {e}"))
except (ValueError, TypeError, AttributeError, KeyError, IndexError, json.JSONDecodeError) as e:
self.stdout.write(self.style.ERROR(f"Data error processing {json_file}: {e!s}"))
self.stdout.write(
self.style.SUCCESS(
f"Completed processing {processed_files} files with {error_count} errors. Imported {imported_campaigns} drop campaigns."
)
self.style.SUCCESS(f"Completed processing {total_files} JSON files in {directory}. Processed files moved to {processed_dir}.")
)
def _process_file(self, file_path: Path, processed_dir: str) -> int:
def _process_file(self, file_path: Path, processed_dir: str) -> None:
"""Process a single JSON file.
Args:
file_path: Path to the JSON file.
processed_dir: Name of subdirectory to move processed files to.
Returns:
int: Number of drop campaigns imported from this file.
Raises:
CommandError: If the file isn't a JSON file or has invalid JSON structure.
"""
# Validate file is a JSON file
if not file_path.name.endswith(".json"):
msg = f"File {file_path} is not a JSON file"
raise CommandError(msg)
with file_path.open(encoding="utf-8") as f:
data = json.load(f)
# Load JSON data
try:
with file_path.open(encoding="utf-8") as f:
data = json.load(f)
except json.JSONDecodeError:
error_dir_name = "error"
error_dir: Path = file_path.parent / error_dir_name
error_dir.mkdir(exist_ok=True)
self.stdout.write(self.style.WARNING(f"Invalid JSON in '{file_path.name}'. Moving to '{error_dir_name}'."))
shutil.move(str(file_path), str(error_dir / file_path.name))
return 0
# Counter for imported campaigns
campaigns_imported = 0
# Check if data is a list (array of objects)
if isinstance(data, list):
# Process each item in the list
for item in data:
if "data" in item and "user" in item["data"] and "dropCampaign" in item["data"]["user"]:
drop_campaign_data = item["data"]["user"]["dropCampaign"]
# Process the data with retry logic for database locks
self._import_drop_campaign_with_retry(drop_campaign_data)
campaigns_imported += 1
else:
# Check if the JSON has the expected structure for a single object
if "data" not in data or "user" not in data["data"] or "dropCampaign" not in data["data"]["user"]:
msg = "Invalid JSON structure: Missing data.user.dropCampaign"
raise CommandError(msg)
# Extract drop campaign data for a single object
drop_campaign_data = data["data"]["user"]["dropCampaign"]
# Process the data with retry logic for database locks
self._import_drop_campaign_with_retry(drop_campaign_data)
campaigns_imported += 1
# Move the processed file to the processed directory
if processed_dir:
processed_path = file_path.parent / processed_dir
if not processed_path.exists():
processed_path.mkdir()
processed_path: Path = file_path.parent / processed_dir
processed_path.mkdir(exist_ok=True)
# Move the file to the processed directory
new_path = processed_path / file_path.name
new_path: Path = processed_path / file_path.name
shutil.move(str(file_path), str(new_path))
# Return the number of campaigns imported
return campaigns_imported
def _import_drop_campaign_with_retry(self, campaign_data: dict[str, Any]) -> None:
"""Import drop campaign data into the database with retry logic for SQLite locks.
Args:
campaign_data: The drop campaign data to import.
Raises:
OperationalError: If the database is still locked after max retries.
"""
# Retry logic for database operations
max_retries = getattr(self, "max_retries", 5) # Default to 5 if not set
retry_delay = getattr(self, "retry_delay", 0.5) # Default to 0.5 seconds if not set
with transaction.atomic():
game: Game = self.game_update_or_create(campaign_data=campaign_data)
for attempt in range(max_retries):
try:
with transaction.atomic():
# First, create or update the game
game_data = campaign_data["game"]
game, _ = Game.objects.update_or_create(
id=game_data["id"],
organization: Organization = self.owner_update_or_create(campaign_data=campaign_data)
drop_campaign: DropCampaign = self.drop_campaign_update_or_get(
campaign_data=campaign_data,
game=game,
organization=organization,
)
for drop_data in campaign_data.get("timeBasedDrops", []):
time_based_drop, _ = TimeBasedDrop.objects.update_or_create(
id=drop_data["id"],
defaults={
"name": drop_data["name"],
"required_minutes_watched": drop_data["requiredMinutesWatched"],
"required_subs": drop_data.get("requiredSubs", 0),
"start_at": drop_data["startAt"],
"end_at": drop_data["endAt"],
"campaign": drop_campaign,
},
)
for benefit_edge in drop_data.get("benefitEdges", []):
benefit_data = benefit_edge["benefit"]
benefit, _ = DropBenefit.objects.update_or_create(
id=benefit_data["id"],
defaults={
"slug": game_data.get("slug", ""),
"display_name": game_data["displayName"],
},
)
# Create or update the organization
org_data = campaign_data["owner"]
organization, _ = Organization.objects.update_or_create(
id=org_data["id"],
defaults={"name": org_data["name"]},
)
# Create or update the drop campaign
drop_campaign, _ = DropCampaign.objects.update_or_create(
id=campaign_data["id"],
defaults={
"name": campaign_data["name"],
"description": campaign_data["description"].replace("\\n", "\n"),
"details_url": campaign_data.get("detailsURL", ""),
"account_link_url": campaign_data.get("accountLinkURL", ""),
"image_url": campaign_data.get("imageURL", ""),
"start_at": campaign_data["startAt"],
"end_at": campaign_data["endAt"],
"is_account_connected": campaign_data["self"]["isAccountConnected"],
"name": benefit_data["name"],
"image_asset_url": benefit_data.get("imageAssetURL", ""),
"created_at": benefit_data["createdAt"],
"entitlement_limit": benefit_data.get("entitlementLimit", 1),
"is_ios_available": benefit_data.get("isIosAvailable", False),
"distribution_type": benefit_data["distributionType"],
"game": game,
"owner": organization,
"owner_organization": organization,
},
)
# Process time-based drops
for drop_data in campaign_data.get("timeBasedDrops", []):
time_based_drop, _ = TimeBasedDrop.objects.update_or_create(
id=drop_data["id"],
defaults={
"name": drop_data["name"],
"required_minutes_watched": drop_data["requiredMinutesWatched"],
"required_subs": drop_data.get("requiredSubs", 0),
"start_at": drop_data["startAt"],
"end_at": drop_data["endAt"],
"campaign": drop_campaign,
},
)
DropBenefitEdge.objects.update_or_create(
drop=time_based_drop,
benefit=benefit,
defaults={
"entitlement_limit": benefit_edge.get("entitlementLimit", 1),
},
)
self.stdout.write(self.style.SUCCESS(f"Successfully imported drop campaign {drop_campaign.name} (ID: {drop_campaign.id})"))
# Process benefits
for benefit_edge in drop_data.get("benefitEdges", []):
benefit_data = benefit_edge["benefit"]
benefit, _ = DropBenefit.objects.update_or_create(
id=benefit_data["id"],
defaults={
"name": benefit_data["name"],
"image_asset_url": benefit_data.get("imageAssetURL", ""),
"created_at": benefit_data["createdAt"],
"entitlement_limit": benefit_data.get("entitlementLimit", 1),
"is_ios_available": benefit_data.get("isIosAvailable", False),
"distribution_type": benefit_data["distributionType"],
"game": game,
"owner_organization": organization,
},
)
def drop_campaign_update_or_get(self, campaign_data: dict[str, Any], game: Game, organization: Organization) -> DropCampaign:
"""Update or create a drop campaign.
# Create the relationship between drop and benefit
DropBenefitEdge.objects.update_or_create(
drop=time_based_drop,
benefit=benefit,
defaults={
"entitlement_limit": benefit_edge.get("entitlementLimit", 1),
},
)
# If we get here, the transaction completed successfully
break
except OperationalError as e:
# Check if this is a database lock error
if "database is locked" in str(e).lower():
if attempt < max_retries - 1: # Don't sleep on the last attempt
sleep_time = retry_delay * (2**attempt) # Exponential backoff
self.stdout.write(
self.style.WARNING(f"Database locked, retrying in {sleep_time:.2f}s (attempt {attempt + 1}/{max_retries})")
)
time.sleep(sleep_time)
else:
self.stdout.write(self.style.ERROR(f"Database still locked after {max_retries} attempts"))
raise
else:
# Not a lock error, re-raise
raise
Args:
campaign_data: The drop campaign data to import.
game: The game this drop campaing is for.
organization: The company that owns the game.
Returns:
Returns the DropCampaing object.
"""
drop_campaign, created = DropCampaign.objects.update_or_create(
id=campaign_data["id"],
defaults={
"name": campaign_data["name"],
"description": campaign_data["description"].replace("\\n", "\n"),
"details_url": campaign_data.get("detailsURL", ""),
"account_link_url": campaign_data.get("accountLinkURL", ""),
"image_url": campaign_data.get("imageURL", ""),
"start_at": campaign_data["startAt"],
"end_at": campaign_data["endAt"],
"is_account_connected": campaign_data["self"]["isAccountConnected"],
"game": game,
"owner": organization,
},
)
if created:
self.stdout.write(self.style.SUCCESS(f"Created new drop campaign: {drop_campaign.name} (ID: {drop_campaign.id})"))
return drop_campaign
def owner_update_or_create(self, campaign_data: dict[str, Any]) -> Organization:
"""Update or create an orgnization.
Args:
campaign_data: The drop campaign data to import.
Returns:
Returns the Organization object.
"""
org_data: dict[str, Any] = campaign_data["owner"]
organization, created = Organization.objects.update_or_create(
id=org_data["id"],
defaults={"name": org_data["name"]},
)
if created:
self.stdout.write(self.style.SUCCESS(f"Created new organization: {organization.name} (ID: {organization.id})"))
return organization
def game_update_or_create(self, campaign_data: dict[str, Any]) -> Game:
"""Update or create a game.
Args:
campaign_data: The drop campaign data to import.
Returns:
Returns the Game object.
"""
game_data: dict[str, Any] = campaign_data["game"]
game, created = Game.objects.update_or_create(
id=game_data["id"],
defaults={
"slug": game_data.get("slug", ""),
"display_name": game_data["displayName"],
},
)
if created:
self.stdout.write(self.style.SUCCESS(f"Created new game: {game.display_name} (ID: {game.id})"))
return game