Enhance import command with parallel processing and retry logic for database operations
This commit is contained in:
parent
597da47a60
commit
1e484c5511
1 changed files with 223 additions and 97 deletions
|
|
@ -1,11 +1,14 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import concurrent.futures
|
||||||
import json
|
import json
|
||||||
import shutil
|
import shutil
|
||||||
|
import time
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from django.core.management.base import BaseCommand, CommandError, CommandParser
|
from django.core.management.base import BaseCommand, CommandError, CommandParser
|
||||||
|
from django.db import transaction, OperationalError
|
||||||
|
|
||||||
from twitch.models import DropBenefit, DropBenefitEdge, DropCampaign, Game, Organization, TimeBasedDrop
|
from twitch.models import DropBenefit, DropBenefitEdge, DropCampaign, Game, Organization, TimeBasedDrop
|
||||||
|
|
||||||
|
|
@ -32,8 +35,32 @@ class Command(BaseCommand):
|
||||||
default="processed",
|
default="processed",
|
||||||
help="Name of subdirectory to move processed files to (default: 'processed')",
|
help="Name of subdirectory to move processed files to (default: 'processed')",
|
||||||
)
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--max-workers",
|
||||||
|
type=int,
|
||||||
|
default=4,
|
||||||
|
help="Maximum number of worker processes to use for parallel importing (default: 4)",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--batch-size",
|
||||||
|
type=int,
|
||||||
|
default=100,
|
||||||
|
help="Number of files to process in each batch (default: 100)",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--max-retries",
|
||||||
|
type=int,
|
||||||
|
default=5,
|
||||||
|
help="Maximum number of retries for database operations when SQLite is locked (default: 5)",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--retry-delay",
|
||||||
|
type=float,
|
||||||
|
default=0.5,
|
||||||
|
help="Delay in seconds between retries for database operations (default: 0.5)",
|
||||||
|
)
|
||||||
|
|
||||||
def handle(self, **options: str) -> None:
|
def handle(self, **options) -> None:
|
||||||
"""Execute the command.
|
"""Execute the command.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
|
@ -45,8 +72,16 @@ class Command(BaseCommand):
|
||||||
"""
|
"""
|
||||||
path: str = options["path"]
|
path: str = options["path"]
|
||||||
processed_dir: str = options["processed_dir"]
|
processed_dir: str = options["processed_dir"]
|
||||||
|
max_workers: int = options["max_workers"]
|
||||||
|
batch_size: int = options["batch_size"]
|
||||||
|
max_retries: int = options["max_retries"]
|
||||||
|
retry_delay: float = options["retry_delay"]
|
||||||
path_obj = Path(path)
|
path_obj = Path(path)
|
||||||
|
|
||||||
|
# Store retry configuration in instance variables to make them available to other methods
|
||||||
|
self.max_retries = max_retries
|
||||||
|
self.retry_delay = retry_delay
|
||||||
|
|
||||||
# Check if path exists
|
# Check if path exists
|
||||||
if not path_obj.exists():
|
if not path_obj.exists():
|
||||||
msg = f"Path {path} does not exist"
|
msg = f"Path {path} does not exist"
|
||||||
|
|
@ -56,17 +91,19 @@ class Command(BaseCommand):
|
||||||
if path_obj.is_file():
|
if path_obj.is_file():
|
||||||
self._process_file(path_obj, processed_dir)
|
self._process_file(path_obj, processed_dir)
|
||||||
elif path_obj.is_dir():
|
elif path_obj.is_dir():
|
||||||
self._process_directory(path_obj, processed_dir)
|
self._process_directory(path_obj, processed_dir, max_workers, batch_size)
|
||||||
else:
|
else:
|
||||||
msg = f"Path {path} is neither a file nor a directory"
|
msg = f"Path {path} is neither a file nor a directory"
|
||||||
raise CommandError(msg)
|
raise CommandError(msg)
|
||||||
|
|
||||||
def _process_directory(self, directory: Path, processed_dir: str) -> None:
|
def _process_directory(self, directory: Path, processed_dir: str, max_workers: int = 4, batch_size: int = 100) -> None:
|
||||||
"""Process all JSON files in a directory.
|
"""Process all JSON files in a directory using parallel processing.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
directory: Path to the directory.
|
directory: Path to the directory.
|
||||||
processed_dir: Name of subdirectory to move processed files to.
|
processed_dir: Name of subdirectory to move processed files to.
|
||||||
|
max_workers: Maximum number of worker processes to use.
|
||||||
|
batch_size: Number of files to process in each batch.
|
||||||
"""
|
"""
|
||||||
# Create processed directory if it doesn't exist
|
# Create processed directory if it doesn't exist
|
||||||
processed_path = directory / processed_dir
|
processed_path = directory / processed_dir
|
||||||
|
|
@ -80,19 +117,66 @@ class Command(BaseCommand):
|
||||||
self.stdout.write(self.style.WARNING(f"No JSON files found in {directory}"))
|
self.stdout.write(self.style.WARNING(f"No JSON files found in {directory}"))
|
||||||
return
|
return
|
||||||
|
|
||||||
for json_file in json_files:
|
total_files = len(json_files)
|
||||||
try:
|
self.stdout.write(f"Found {total_files} JSON files to process")
|
||||||
self._process_file(json_file, processed_dir)
|
|
||||||
except CommandError as e:
|
|
||||||
self.stdout.write(self.style.ERROR(f"Error processing {json_file}: {e}"))
|
|
||||||
|
|
||||||
def _process_file(self, file_path: Path, processed_dir: str) -> None:
|
# Process files in batches to avoid memory issues
|
||||||
|
processed_files = 0
|
||||||
|
error_count = 0
|
||||||
|
imported_campaigns = 0
|
||||||
|
|
||||||
|
# Process files in batches with parallel workers
|
||||||
|
for i in range(0, total_files, batch_size):
|
||||||
|
batch = json_files[i : i + batch_size]
|
||||||
|
batch_size_actual = len(batch)
|
||||||
|
self.stdout.write(f"Processing batch {i // batch_size + 1} with {batch_size_actual} files...")
|
||||||
|
|
||||||
|
# Process batch files concurrently
|
||||||
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||||
|
# Submit all files in the batch for processing
|
||||||
|
future_to_file = {executor.submit(self._process_file, json_file, processed_dir): json_file for json_file in batch}
|
||||||
|
|
||||||
|
# Process results as they complete
|
||||||
|
for future in concurrent.futures.as_completed(future_to_file):
|
||||||
|
json_file = future_to_file[future]
|
||||||
|
try:
|
||||||
|
# Get the number of campaigns imported from this file
|
||||||
|
num_campaigns = future.result()
|
||||||
|
processed_files += 1
|
||||||
|
imported_campaigns += num_campaigns
|
||||||
|
|
||||||
|
if processed_files % 100 == 0 or processed_files == total_files:
|
||||||
|
self.stdout.write(
|
||||||
|
self.style.SUCCESS(
|
||||||
|
f"Progress: {processed_files}/{total_files} files processed "
|
||||||
|
f"({processed_files / total_files * 100:.1f}%), "
|
||||||
|
f"{imported_campaigns} campaigns imported"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
except CommandError as e:
|
||||||
|
error_count += 1
|
||||||
|
self.stdout.write(self.style.ERROR(f"Error processing {json_file}: {e}"))
|
||||||
|
except (ValueError, TypeError, AttributeError, KeyError, IndexError) as e:
|
||||||
|
# Handle common errors explicitly instead of catching all exceptions
|
||||||
|
error_count += 1
|
||||||
|
self.stdout.write(self.style.ERROR(f"Data error processing {json_file}: {e!s}"))
|
||||||
|
|
||||||
|
self.stdout.write(
|
||||||
|
self.style.SUCCESS(
|
||||||
|
f"Completed processing {processed_files} files with {error_count} errors. Imported {imported_campaigns} drop campaigns."
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
def _process_file(self, file_path: Path, processed_dir: str) -> int:
|
||||||
"""Process a single JSON file.
|
"""Process a single JSON file.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
file_path: Path to the JSON file.
|
file_path: Path to the JSON file.
|
||||||
processed_dir: Name of subdirectory to move processed files to.
|
processed_dir: Name of subdirectory to move processed files to.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: Number of drop campaigns imported from this file.
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
CommandError: If the file isn't a JSON file or has invalid JSON structure.
|
CommandError: If the file isn't a JSON file or has invalid JSON structure.
|
||||||
"""
|
"""
|
||||||
|
|
@ -109,16 +193,29 @@ class Command(BaseCommand):
|
||||||
msg = f"Error decoding JSON: {e}"
|
msg = f"Error decoding JSON: {e}"
|
||||||
raise CommandError(msg) from e
|
raise CommandError(msg) from e
|
||||||
|
|
||||||
# Check if the JSON has the expected structure
|
# Counter for imported campaigns
|
||||||
if "data" not in data or "user" not in data["data"] or "dropCampaign" not in data["data"]["user"]:
|
campaigns_imported = 0
|
||||||
msg = "Invalid JSON structure: Missing data.user.dropCampaign"
|
|
||||||
raise CommandError(msg)
|
|
||||||
|
|
||||||
# Extract drop campaign data
|
# Check if data is a list (array of objects)
|
||||||
drop_campaign_data = data["data"]["user"]["dropCampaign"]
|
if isinstance(data, list):
|
||||||
|
# Process each item in the list
|
||||||
|
for item in data:
|
||||||
|
if "data" in item and "user" in item["data"] and "dropCampaign" in item["data"]["user"]:
|
||||||
|
drop_campaign_data = item["data"]["user"]["dropCampaign"]
|
||||||
|
# Process the data with retry logic for database locks
|
||||||
|
self._import_drop_campaign_with_retry(drop_campaign_data)
|
||||||
|
campaigns_imported += 1
|
||||||
|
else:
|
||||||
|
# Check if the JSON has the expected structure for a single object
|
||||||
|
if "data" not in data or "user" not in data["data"] or "dropCampaign" not in data["data"]["user"]:
|
||||||
|
msg = "Invalid JSON structure: Missing data.user.dropCampaign"
|
||||||
|
raise CommandError(msg)
|
||||||
|
|
||||||
# Process the data
|
# Extract drop campaign data for a single object
|
||||||
self._import_drop_campaign(drop_campaign_data)
|
drop_campaign_data = data["data"]["user"]["dropCampaign"]
|
||||||
|
# Process the data with retry logic for database locks
|
||||||
|
self._import_drop_campaign_with_retry(drop_campaign_data)
|
||||||
|
campaigns_imported += 1
|
||||||
|
|
||||||
# Move the processed file to the processed directory
|
# Move the processed file to the processed directory
|
||||||
if processed_dir:
|
if processed_dir:
|
||||||
|
|
@ -129,87 +226,116 @@ class Command(BaseCommand):
|
||||||
# Move the file to the processed directory
|
# Move the file to the processed directory
|
||||||
new_path = processed_path / file_path.name
|
new_path = processed_path / file_path.name
|
||||||
shutil.move(str(file_path), str(new_path))
|
shutil.move(str(file_path), str(new_path))
|
||||||
self.stdout.write(f"Moved processed file to: {new_path}")
|
|
||||||
|
|
||||||
self.stdout.write(self.style.SUCCESS(f"Successfully imported drop campaign: {drop_campaign_data['name']}"))
|
# Return the number of campaigns imported
|
||||||
|
return campaigns_imported
|
||||||
|
|
||||||
def _import_drop_campaign(self, campaign_data: dict[str, Any]) -> None:
|
def _import_drop_campaign_with_retry(self, campaign_data: dict[str, Any]) -> None:
|
||||||
"""Import drop campaign data into the database.
|
"""Import drop campaign data into the database with retry logic for SQLite locks.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
campaign_data: The drop campaign data to import.
|
campaign_data: The drop campaign data to import.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
OperationalError: If the database is still locked after max retries.
|
||||||
"""
|
"""
|
||||||
# First, create or update the game
|
# Retry logic for database operations
|
||||||
game_data = campaign_data["game"]
|
max_retries = getattr(self, "max_retries", 5) # Default to 5 if not set
|
||||||
game, _ = Game.objects.update_or_create(
|
retry_delay = getattr(self, "retry_delay", 0.5) # Default to 0.5 seconds if not set
|
||||||
id=game_data["id"],
|
|
||||||
defaults={
|
for attempt in range(max_retries):
|
||||||
"slug": game_data.get("slug", ""),
|
try:
|
||||||
"display_name": game_data["displayName"],
|
with transaction.atomic():
|
||||||
},
|
# First, create or update the game
|
||||||
)
|
game_data = campaign_data["game"]
|
||||||
|
game, _ = Game.objects.update_or_create(
|
||||||
|
id=game_data["id"],
|
||||||
|
defaults={
|
||||||
|
"slug": game_data.get("slug", ""),
|
||||||
|
"display_name": game_data["displayName"],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
# Create or update the organization
|
# Create or update the organization
|
||||||
org_data = campaign_data["owner"]
|
org_data = campaign_data["owner"]
|
||||||
organization, _ = Organization.objects.update_or_create(
|
organization, _ = Organization.objects.update_or_create(
|
||||||
id=org_data["id"],
|
id=org_data["id"],
|
||||||
defaults={"name": org_data["name"]},
|
defaults={"name": org_data["name"]},
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create or update the drop campaign
|
# Create or update the drop campaign
|
||||||
drop_campaign, _ = DropCampaign.objects.update_or_create(
|
drop_campaign, _ = DropCampaign.objects.update_or_create(
|
||||||
id=campaign_data["id"],
|
id=campaign_data["id"],
|
||||||
defaults={
|
defaults={
|
||||||
"name": campaign_data["name"],
|
"name": campaign_data["name"],
|
||||||
"description": campaign_data["description"],
|
"description": campaign_data["description"],
|
||||||
"details_url": campaign_data.get("detailsURL", ""),
|
"details_url": campaign_data.get("detailsURL", ""),
|
||||||
"account_link_url": campaign_data.get("accountLinkURL", ""),
|
"account_link_url": campaign_data.get("accountLinkURL", ""),
|
||||||
"image_url": campaign_data.get("imageURL", ""),
|
"image_url": campaign_data.get("imageURL", ""),
|
||||||
"start_at": campaign_data["startAt"],
|
"start_at": campaign_data["startAt"],
|
||||||
"end_at": campaign_data["endAt"],
|
"end_at": campaign_data["endAt"],
|
||||||
"status": campaign_data["status"],
|
"status": campaign_data["status"],
|
||||||
"is_account_connected": campaign_data["self"]["isAccountConnected"],
|
"is_account_connected": campaign_data["self"]["isAccountConnected"],
|
||||||
"game": game,
|
"game": game,
|
||||||
"owner": organization,
|
"owner": organization,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
# Process time-based drops
|
# Process time-based drops
|
||||||
for drop_data in campaign_data.get("timeBasedDrops", []):
|
for drop_data in campaign_data.get("timeBasedDrops", []):
|
||||||
time_based_drop, _ = TimeBasedDrop.objects.update_or_create(
|
time_based_drop, _ = TimeBasedDrop.objects.update_or_create(
|
||||||
id=drop_data["id"],
|
id=drop_data["id"],
|
||||||
defaults={
|
defaults={
|
||||||
"name": drop_data["name"],
|
"name": drop_data["name"],
|
||||||
"required_minutes_watched": drop_data["requiredMinutesWatched"],
|
"required_minutes_watched": drop_data["requiredMinutesWatched"],
|
||||||
"required_subs": drop_data.get("requiredSubs", 0),
|
"required_subs": drop_data.get("requiredSubs", 0),
|
||||||
"start_at": drop_data["startAt"],
|
"start_at": drop_data["startAt"],
|
||||||
"end_at": drop_data["endAt"],
|
"end_at": drop_data["endAt"],
|
||||||
"campaign": drop_campaign,
|
"campaign": drop_campaign,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
# Process benefits
|
# Process benefits
|
||||||
for benefit_edge in drop_data.get("benefitEdges", []):
|
for benefit_edge in drop_data.get("benefitEdges", []):
|
||||||
benefit_data = benefit_edge["benefit"]
|
benefit_data = benefit_edge["benefit"]
|
||||||
benefit, _ = DropBenefit.objects.update_or_create(
|
benefit, _ = DropBenefit.objects.update_or_create(
|
||||||
id=benefit_data["id"],
|
id=benefit_data["id"],
|
||||||
defaults={
|
defaults={
|
||||||
"name": benefit_data["name"],
|
"name": benefit_data["name"],
|
||||||
"image_asset_url": benefit_data.get("imageAssetURL", ""),
|
"image_asset_url": benefit_data.get("imageAssetURL", ""),
|
||||||
"created_at": benefit_data["createdAt"],
|
"created_at": benefit_data["createdAt"],
|
||||||
"entitlement_limit": benefit_data.get("entitlementLimit", 1),
|
"entitlement_limit": benefit_data.get("entitlementLimit", 1),
|
||||||
"is_ios_available": benefit_data.get("isIosAvailable", False),
|
"is_ios_available": benefit_data.get("isIosAvailable", False),
|
||||||
"distribution_type": benefit_data["distributionType"],
|
"distribution_type": benefit_data["distributionType"],
|
||||||
"game": game,
|
"game": game,
|
||||||
"owner_organization": organization,
|
"owner_organization": organization,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create the relationship between drop and benefit
|
# Create the relationship between drop and benefit
|
||||||
DropBenefitEdge.objects.update_or_create(
|
DropBenefitEdge.objects.update_or_create(
|
||||||
drop=time_based_drop,
|
drop=time_based_drop,
|
||||||
benefit=benefit,
|
benefit=benefit,
|
||||||
defaults={
|
defaults={
|
||||||
"entitlement_limit": benefit_edge.get("entitlementLimit", 1),
|
"entitlement_limit": benefit_edge.get("entitlementLimit", 1),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
# If we get here, the transaction completed successfully
|
||||||
|
break
|
||||||
|
except OperationalError as e:
|
||||||
|
# Check if this is a database lock error
|
||||||
|
if "database is locked" in str(e).lower():
|
||||||
|
if attempt < max_retries - 1: # Don't sleep on the last attempt
|
||||||
|
sleep_time = retry_delay * (2 ** attempt) # Exponential backoff
|
||||||
|
self.stdout.write(
|
||||||
|
self.style.WARNING(
|
||||||
|
f"Database locked, retrying in {sleep_time:.2f}s (attempt {attempt + 1}/{max_retries})"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
time.sleep(sleep_time)
|
||||||
|
else:
|
||||||
|
self.stdout.write(self.style.ERROR(f"Database still locked after {max_retries} attempts"))
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
# Not a lock error, re-raise
|
||||||
|
raise
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue