Remove bloat
This commit is contained in:
parent
011c617328
commit
715cbf4bf0
51 changed files with 691 additions and 3032 deletions
|
|
@ -2,25 +2,24 @@ from __future__ import annotations
|
|||
|
||||
import concurrent.futures
|
||||
import shutil
|
||||
import time
|
||||
import threading
|
||||
import traceback
|
||||
from datetime import timedelta
|
||||
from functools import lru_cache
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
import dateparser
|
||||
import json_repair
|
||||
from django.core.exceptions import MultipleObjectsReturned
|
||||
from django.core.management.base import BaseCommand, CommandError, CommandParser
|
||||
from django.db import transaction
|
||||
from django.db import DatabaseError, IntegrityError, transaction
|
||||
from django.utils import timezone
|
||||
from tqdm import tqdm
|
||||
|
||||
from twitch.models import Channel, DropBenefit, DropBenefitEdge, DropCampaign, Game, Organization, TimeBasedDrop
|
||||
from twitch.utils.images import cache_remote_image
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from datetime import datetime
|
||||
from typing import Literal
|
||||
|
||||
|
||||
@lru_cache(maxsize=4096)
|
||||
|
|
@ -58,6 +57,22 @@ class Command(BaseCommand):
|
|||
help = "Import Twitch drop campaign data from a JSON file or directory"
|
||||
requires_migrations_checks = True
|
||||
|
||||
# In-memory caches
|
||||
_game_cache: dict[str, Game] = {}
|
||||
_organization_cache: dict[str, Organization] = {}
|
||||
_drop_campaign_cache: dict[str, DropCampaign] = {}
|
||||
_channel_cache: dict[str, Channel] = {}
|
||||
_benefit_cache: dict[str, DropBenefit] = {}
|
||||
|
||||
# Locks for thread-safety
|
||||
_cache_locks: dict[str, threading.RLock] = {
|
||||
"game": threading.RLock(),
|
||||
"org": threading.RLock(),
|
||||
"campaign": threading.RLock(),
|
||||
"channel": threading.RLock(),
|
||||
"benefit": threading.RLock(),
|
||||
}
|
||||
|
||||
def add_arguments(self, parser: CommandParser) -> None:
|
||||
"""Add command arguments.
|
||||
|
||||
|
|
@ -81,6 +96,11 @@ class Command(BaseCommand):
|
|||
action="store_true",
|
||||
help="Continue processing if an error occurs.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--no-preload",
|
||||
action="store_true",
|
||||
help="Do not preload existing DB objects into memory (default: preload).",
|
||||
)
|
||||
|
||||
def handle(self, **options) -> None:
|
||||
"""Execute the command.
|
||||
|
|
@ -89,17 +109,40 @@ class Command(BaseCommand):
|
|||
**options: Arbitrary keyword arguments.
|
||||
|
||||
Raises:
|
||||
CommandError: If the file/directory doesn't exist, isn't a JSON file,
|
||||
or has an invalid JSON structure.
|
||||
ValueError: If the JSON file has an invalid structure.
|
||||
TypeError: If the JSON file has an invalid JSON structure.
|
||||
AttributeError: If the JSON file has an invalid JSON structure.
|
||||
KeyError: If the JSON file has an invalid JSON structure.
|
||||
IndexError: If the JSON file has an invalid JSON structure.
|
||||
CommandError: If a critical error occurs and --continue-on-error is not set.
|
||||
ValueError: If the input data is invalid.
|
||||
TypeError: If the input data is of an unexpected type.
|
||||
AttributeError: If expected attributes are missing in the data.
|
||||
KeyError: If expected keys are missing in the data.
|
||||
IndexError: If list indices are out of range in the data.
|
||||
|
||||
"""
|
||||
paths: list[str] = options["paths"]
|
||||
processed_dir: str = options["processed_dir"]
|
||||
continue_on_error: bool = options["continue_on_error"]
|
||||
no_preload: bool = options.get("no_preload", False)
|
||||
|
||||
# Preload DB objects into caches (unless disabled)
|
||||
if not no_preload:
|
||||
try:
|
||||
self.stdout.write("Preloading existing database objects into memory...")
|
||||
self._preload_caches()
|
||||
self.stdout.write(
|
||||
f"Preloaded {len(self._game_cache)} games, "
|
||||
f"{len(self._organization_cache)} orgs, "
|
||||
f"{len(self._drop_campaign_cache)} campaigns, "
|
||||
f"{len(self._channel_cache)} channels, "
|
||||
f"{len(self._benefit_cache)} benefits."
|
||||
)
|
||||
except (FileNotFoundError, OSError, RuntimeError):
|
||||
# If preload fails for any reason, continue without it
|
||||
self.stdout.write(self.style.WARNING("Preloading caches failed — continuing without preload."))
|
||||
self.stdout.write(self.style.ERROR(traceback.format_exc()))
|
||||
self._game_cache = {}
|
||||
self._organization_cache = {}
|
||||
self._drop_campaign_cache = {}
|
||||
self._channel_cache = {}
|
||||
self._benefit_cache = {}
|
||||
|
||||
for p in paths:
|
||||
try:
|
||||
|
|
@ -129,6 +172,20 @@ class Command(BaseCommand):
|
|||
self.stdout.write(self.style.WARNING("Interrupted by user, exiting import."))
|
||||
return
|
||||
|
||||
def _preload_caches(self) -> None:
|
||||
"""Load existing DB objects into in-memory caches to avoid repeated queries."""
|
||||
# These queries may be heavy if DB is huge — safe because optional via --no-preload
|
||||
with self._cache_locks["game"]:
|
||||
self._game_cache = {str(g.id): g for g in Game.objects.all()}
|
||||
with self._cache_locks["org"]:
|
||||
self._organization_cache = {str(o.id): o for o in Organization.objects.all()}
|
||||
with self._cache_locks["campaign"]:
|
||||
self._drop_campaign_cache = {str(c.id): c for c in DropCampaign.objects.all()}
|
||||
with self._cache_locks["channel"]:
|
||||
self._channel_cache = {str(ch.id): ch for ch in Channel.objects.all()}
|
||||
with self._cache_locks["benefit"]:
|
||||
self._benefit_cache = {str(b.id): b for b in DropBenefit.objects.all()}
|
||||
|
||||
def process_drops(self, *, continue_on_error: bool, path: Path, processed_path: Path) -> None:
|
||||
"""Process drops from a file or directory.
|
||||
|
||||
|
|
@ -138,8 +195,7 @@ class Command(BaseCommand):
|
|||
processed_path: Name of subdirectory to move processed files to.
|
||||
|
||||
Raises:
|
||||
CommandError: If the file/directory doesn't exist, isn't a JSON file,
|
||||
or has an invalid JSON structure.
|
||||
CommandError: If the path is neither a file nor a directory.
|
||||
"""
|
||||
if path.is_file():
|
||||
self._process_file(file_path=path, processed_path=processed_path)
|
||||
|
|
@ -170,18 +226,18 @@ class Command(BaseCommand):
|
|||
"""Process all JSON files in a directory using parallel processing.
|
||||
|
||||
Args:
|
||||
directory: Path to the directory containing JSON files.
|
||||
processed_path: Path to the subdirectory where processed files will be moved.
|
||||
continue_on_error: Whether to continue processing remaining files if an error occurs.
|
||||
directory: The directory containing JSON files.
|
||||
processed_path: Name of subdirectory to move processed files to.
|
||||
continue_on_error: Continue processing if an error occurs.
|
||||
|
||||
Raises:
|
||||
CommandError: If the path is invalid or moving files fails.
|
||||
ValueError: If a JSON file has an invalid structure.
|
||||
TypeError: If a JSON file has an invalid structure.
|
||||
AttributeError: If a JSON file has an invalid structure.
|
||||
KeyError: If a JSON file has an invalid structure.
|
||||
IndexError: If a JSON file has an invalid structure.
|
||||
KeyboardInterrupt: If processing is interrupted by the user.
|
||||
AttributeError: If expected attributes are missing in the data.
|
||||
CommandError: If a critical error occurs and --continue-on-error is not set.
|
||||
IndexError: If list indices are out of range in the data.
|
||||
KeyboardInterrupt: If the process is interrupted by the user.
|
||||
KeyError: If expected keys are missing in the data.
|
||||
TypeError: If the input data is of an unexpected type.
|
||||
ValueError: If the input data is invalid.
|
||||
"""
|
||||
json_files: list[Path] = list(directory.glob("*.json"))
|
||||
if not json_files:
|
||||
|
|
@ -190,51 +246,39 @@ class Command(BaseCommand):
|
|||
|
||||
total_files: int = len(json_files)
|
||||
self.stdout.write(f"Found {total_files} JSON files to process")
|
||||
start_time: float = time.time()
|
||||
processed = 0
|
||||
|
||||
try:
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
try:
|
||||
future_to_file: dict[concurrent.futures.Future[None], Path] = {
|
||||
executor.submit(self._process_file, json_file, processed_path): json_file for json_file in json_files
|
||||
}
|
||||
for future in concurrent.futures.as_completed(future_to_file):
|
||||
# Wrap the as_completed iterator with tqdm for a progress bar
|
||||
for future in tqdm(concurrent.futures.as_completed(future_to_file), total=total_files, desc="Processing files"):
|
||||
json_file: Path = future_to_file[future]
|
||||
self.stdout.write(f"Processing file {json_file.name}...")
|
||||
try:
|
||||
future.result()
|
||||
except CommandError as e:
|
||||
if not continue_on_error:
|
||||
# To stop all processing, we shut down the executor and re-raise
|
||||
executor.shutdown(wait=False, cancel_futures=True)
|
||||
raise
|
||||
self.stdout.write(self.style.ERROR(f"Error processing {json_file}: {e}"))
|
||||
except (ValueError, TypeError, AttributeError, KeyError, IndexError):
|
||||
if not continue_on_error:
|
||||
# To stop all processing, we shut down the executor and re-raise
|
||||
executor.shutdown(wait=False, cancel_futures=True)
|
||||
raise
|
||||
self.stdout.write(self.style.ERROR(f"Data error processing {json_file}"))
|
||||
self.stdout.write(self.style.ERROR(traceback.format_exc()))
|
||||
|
||||
self.update_processing_progress(total_files=total_files, start_time=start_time, processed=processed)
|
||||
except KeyboardInterrupt:
|
||||
self.stdout.write(self.style.WARNING("Interrupted by user, exiting import."))
|
||||
raise
|
||||
else:
|
||||
msg: str = f"Processed {total_files} JSON files in {directory}. Moved processed files to {processed_path}."
|
||||
self.stdout.write(self.style.SUCCESS(msg))
|
||||
msg: str = f"Processed {total_files} JSON files in {directory}. Moved processed files to {processed_path}."
|
||||
self.stdout.write(self.style.SUCCESS(msg))
|
||||
|
||||
def update_processing_progress(self, total_files: int, start_time: float, processed: int) -> None:
|
||||
"""Update and display processing progress.
|
||||
|
||||
Args:
|
||||
total_files: Total number of files to process.
|
||||
start_time: Timestamp when processing started.
|
||||
processed: Number of files processed so far.
|
||||
"""
|
||||
processed += 1
|
||||
elapsed: float = time.time() - start_time
|
||||
rate: float | Literal[0] = processed / elapsed if elapsed > 0 else 0
|
||||
remaining: int = total_files - processed
|
||||
eta: timedelta = timedelta(seconds=int(remaining / rate)) if rate > 0 else timedelta(seconds=0)
|
||||
self.stdout.write(f"Progress: {processed}/{total_files} files - {rate:.2f} files/sec - ETA {eta}")
|
||||
except KeyboardInterrupt:
|
||||
self.stdout.write(self.style.WARNING("Interruption received, shutting down threads immediately..."))
|
||||
executor.shutdown(wait=False, cancel_futures=True)
|
||||
# Re-raise the exception to allow the main `handle` method to catch it and exit
|
||||
raise
|
||||
|
||||
def _process_file(self, file_path: Path, processed_path: Path) -> None:
|
||||
"""Process a single JSON file.
|
||||
|
|
@ -276,7 +320,7 @@ class Command(BaseCommand):
|
|||
target_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
self.move_file(file_path, target_dir / file_path.name)
|
||||
self.stdout.write(f"Moved {file_path} to {target_dir} (matched '{keyword}')")
|
||||
tqdm.write(f"Moved {file_path} to {target_dir} (matched '{keyword}')")
|
||||
return
|
||||
|
||||
# Some responses have errors:
|
||||
|
|
@ -286,7 +330,7 @@ class Command(BaseCommand):
|
|||
actual_error_dir: Path = processed_path / "actual_error"
|
||||
actual_error_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.move_file(file_path, actual_error_dir / file_path.name)
|
||||
self.stdout.write(f"Moved {file_path} to {actual_error_dir} (contains Twitch errors)")
|
||||
tqdm.write(f"Moved {file_path} to {actual_error_dir} (contains Twitch errors)")
|
||||
return
|
||||
|
||||
# If file has "__typename": "BroadcastSettings" move it to the "broadcast_settings" directory
|
||||
|
|
@ -305,13 +349,13 @@ class Command(BaseCommand):
|
|||
and data["data"]["channel"]["viewerDropCampaigns"] is None
|
||||
):
|
||||
file_path.unlink()
|
||||
self.stdout.write(f"Removed {file_path} (only contains empty viewerDropCampaigns)")
|
||||
tqdm.write(f"Removed {file_path} (only contains empty viewerDropCampaigns)")
|
||||
return
|
||||
|
||||
# If file only contains {"data": {"user": null}} remove the file
|
||||
if isinstance(data, dict) and data.get("data", {}).keys() == {"user"} and data["data"]["user"] is None:
|
||||
file_path.unlink()
|
||||
self.stdout.write(f"Removed {file_path} (only contains empty user)")
|
||||
tqdm.write(f"Removed {file_path} (only contains empty user)")
|
||||
return
|
||||
|
||||
# If file only contains {"data": {"game": {}}} remove the file
|
||||
|
|
@ -319,7 +363,7 @@ class Command(BaseCommand):
|
|||
game_data = data["data"]["game"]
|
||||
if isinstance(game_data, dict) and game_data.get("__typename") == "Game":
|
||||
file_path.unlink()
|
||||
self.stdout.write(f"Removed {file_path} (only contains game data)")
|
||||
tqdm.write(f"Removed {file_path} (only contains game data)")
|
||||
return
|
||||
|
||||
# If file has "__typename": "DropCurrentSession" move it to the "drop_current_session" directory so we can process it separately.
|
||||
|
|
@ -338,7 +382,7 @@ class Command(BaseCommand):
|
|||
and data[0]["data"]["user"] is None
|
||||
):
|
||||
file_path.unlink()
|
||||
self.stdout.write(f"Removed {file_path} (list with one item: empty user)")
|
||||
tqdm.write(f"Removed {file_path} (list with one item: empty user)")
|
||||
return
|
||||
|
||||
if isinstance(data, list):
|
||||
|
|
@ -363,30 +407,28 @@ class Command(BaseCommand):
|
|||
shutil.move(str(file_path), str(processed_path))
|
||||
except FileExistsError:
|
||||
# Rename the file if contents is different than the existing one
|
||||
with (
|
||||
file_path.open("rb") as f1,
|
||||
(processed_path / file_path.name).open("rb") as f2,
|
||||
):
|
||||
if f1.read() != f2.read():
|
||||
new_name: Path = processed_path / f"{file_path.stem}_duplicate{file_path.suffix}"
|
||||
shutil.move(str(file_path), str(new_name))
|
||||
self.stdout.write(f"Moved {file_path!s} to {new_name!s} (content differs)")
|
||||
else:
|
||||
self.stdout.write(f"{file_path!s} already exists in {processed_path!s}, removing original file.")
|
||||
file_path.unlink()
|
||||
try:
|
||||
with (
|
||||
file_path.open("rb") as f1,
|
||||
(processed_path / file_path.name).open("rb") as f2,
|
||||
):
|
||||
if f1.read() != f2.read():
|
||||
new_name: Path = processed_path / f"{file_path.stem}_duplicate{file_path.suffix}"
|
||||
shutil.move(str(file_path), str(new_name))
|
||||
tqdm.write(f"Moved {file_path!s} to {new_name!s} (content differs)")
|
||||
else:
|
||||
tqdm.write(f"{file_path!s} already exists in {processed_path!s}, removing original file.")
|
||||
file_path.unlink()
|
||||
except FileNotFoundError:
|
||||
tqdm.write(f"{file_path!s} not found when handling duplicate case, skipping.")
|
||||
except FileNotFoundError:
|
||||
self.stdout.write(f"{file_path!s} not found, skipping.")
|
||||
tqdm.write(f"{file_path!s} not found, skipping.")
|
||||
except (PermissionError, OSError, shutil.Error) as e:
|
||||
self.stdout.write(self.style.ERROR(f"Error moving {file_path!s} to {processed_path!s}: {e}"))
|
||||
traceback.print_exc()
|
||||
|
||||
def import_drop_campaign(self, data: dict[str, Any], file_path: Path) -> None:
|
||||
"""Find and import drop campaign data from various JSON structures.
|
||||
|
||||
Args:
|
||||
data: The JSON data.
|
||||
file_path: The path to the file being processed.
|
||||
"""
|
||||
"""Find and import drop campaign data from various JSON structures."""
|
||||
# Add this check: If this is a known "empty" response, ignore it silently.
|
||||
if (
|
||||
"data" in data
|
||||
|
|
@ -403,7 +445,7 @@ class Command(BaseCommand):
|
|||
d: The dictionary to check for drop campaign data.
|
||||
|
||||
Returns:
|
||||
True if any drop campaign data was imported, False otherwise.
|
||||
True if import was attempted, False otherwise.
|
||||
"""
|
||||
if not isinstance(d, dict):
|
||||
return False
|
||||
|
|
@ -454,7 +496,7 @@ class Command(BaseCommand):
|
|||
self.import_to_db(data, file_path=file_path)
|
||||
return
|
||||
|
||||
self.stdout.write(self.style.WARNING(f"No valid drop campaign data found in {file_path.name}"))
|
||||
tqdm.write(self.style.WARNING(f"No valid drop campaign data found in {file_path.name}"))
|
||||
|
||||
def import_to_db(self, campaign_data: dict[str, Any], file_path: Path) -> None:
|
||||
"""Import drop campaign data into the database with retry logic for SQLite locks.
|
||||
|
|
@ -467,7 +509,7 @@ class Command(BaseCommand):
|
|||
game: Game = self.game_update_or_create(campaign_data=campaign_data)
|
||||
organization: Organization | None = self.owner_update_or_create(campaign_data=campaign_data)
|
||||
|
||||
if organization:
|
||||
if organization and game.owner != organization:
|
||||
game.owner = organization
|
||||
game.save(update_fields=["owner"])
|
||||
|
||||
|
|
@ -476,14 +518,12 @@ class Command(BaseCommand):
|
|||
for drop_data in campaign_data.get("timeBasedDrops", []):
|
||||
self._process_time_based_drop(drop_data, drop_campaign, file_path)
|
||||
|
||||
self.stdout.write(self.style.SUCCESS(f"Successfully imported drop campaign {drop_campaign.name} (ID: {drop_campaign.id})"))
|
||||
|
||||
def _process_time_based_drop(self, drop_data: dict[str, Any], drop_campaign: DropCampaign, file_path: Path) -> None:
|
||||
time_based_drop: TimeBasedDrop = self.create_time_based_drop(drop_campaign=drop_campaign, drop_data=drop_data)
|
||||
|
||||
benefit_edges: list[dict[str, Any]] = drop_data.get("benefitEdges", [])
|
||||
if not benefit_edges:
|
||||
self.stdout.write(self.style.WARNING(f"No benefit edges found for drop {time_based_drop.name} (ID: {time_based_drop.id})"))
|
||||
tqdm.write(self.style.WARNING(f"No benefit edges found for drop {time_based_drop.name} (ID: {time_based_drop.id})"))
|
||||
self.move_file(file_path, Path("no_benefit_edges") / file_path.name)
|
||||
return
|
||||
|
||||
|
|
@ -499,30 +539,31 @@ class Command(BaseCommand):
|
|||
}
|
||||
|
||||
# Run .strip() on all string fields to remove leading/trailing whitespace
|
||||
for key, value in benefit_defaults.items():
|
||||
for key, value in list(benefit_defaults.items()):
|
||||
if isinstance(value, str):
|
||||
benefit_defaults[key] = value.strip()
|
||||
|
||||
# Filter out None values to avoid overwriting with them
|
||||
benefit_defaults = {k: v for k, v in benefit_defaults.items() if v is not None}
|
||||
|
||||
benefit, _ = DropBenefit.objects.update_or_create(
|
||||
id=benefit_data["id"],
|
||||
defaults=benefit_defaults,
|
||||
)
|
||||
# Use cached create/update for benefits
|
||||
benefit = self._get_or_create_benefit(benefit_data["id"], benefit_defaults)
|
||||
|
||||
# Cache benefit image if available and not already cached
|
||||
if (not benefit.image_file) and benefit.image_asset_url:
|
||||
rel_path: str | None = cache_remote_image(benefit.image_asset_url, "benefits/images")
|
||||
if rel_path:
|
||||
benefit.image_file.name = rel_path
|
||||
benefit.save(update_fields=["image_file"])
|
||||
|
||||
DropBenefitEdge.objects.update_or_create(
|
||||
drop=time_based_drop,
|
||||
benefit=benefit,
|
||||
defaults={"entitlement_limit": benefit_edge.get("entitlementLimit", 1)},
|
||||
)
|
||||
try:
|
||||
with transaction.atomic():
|
||||
drop_benefit_edge, created = DropBenefitEdge.objects.update_or_create(
|
||||
drop=time_based_drop,
|
||||
benefit=benefit,
|
||||
defaults={"entitlement_limit": benefit_edge.get("entitlementLimit", 1)},
|
||||
)
|
||||
if created:
|
||||
tqdm.write(f"Added {drop_benefit_edge}")
|
||||
except MultipleObjectsReturned as e:
|
||||
msg = f"Error: Multiple DropBenefitEdge objects found for drop {time_based_drop.id} and benefit {benefit.id}. Cannot update or create."
|
||||
raise CommandError(msg) from e
|
||||
except (IntegrityError, DatabaseError, TypeError, ValueError) as e:
|
||||
msg = f"Database or validation error creating DropBenefitEdge for drop {time_based_drop.id} and benefit {benefit.id}: {e}"
|
||||
raise CommandError(msg) from e
|
||||
|
||||
def create_time_based_drop(self, drop_campaign: DropCampaign, drop_data: dict[str, Any]) -> TimeBasedDrop:
|
||||
"""Creates or updates a TimeBasedDrop instance based on the provided drop data.
|
||||
|
|
@ -537,9 +578,11 @@ class Command(BaseCommand):
|
|||
- "startAt" (str, optional): ISO 8601 datetime string for when the drop starts.
|
||||
- "endAt" (str, optional): ISO 8601 datetime string for when the drop ends.
|
||||
|
||||
Raises:
|
||||
CommandError: If there is a database error or multiple objects are returned.
|
||||
|
||||
Returns:
|
||||
TimeBasedDrop: The created or updated TimeBasedDrop instance.
|
||||
|
||||
"""
|
||||
time_based_drop_defaults: dict[str, Any] = {
|
||||
"campaign": drop_campaign,
|
||||
|
|
@ -551,35 +594,182 @@ class Command(BaseCommand):
|
|||
}
|
||||
|
||||
# Run .strip() on all string fields to remove leading/trailing whitespace
|
||||
for key, value in time_based_drop_defaults.items():
|
||||
for key, value in list(time_based_drop_defaults.items()):
|
||||
if isinstance(value, str):
|
||||
time_based_drop_defaults[key] = value.strip()
|
||||
|
||||
# Filter out None values to avoid overwriting with them
|
||||
time_based_drop_defaults = {k: v for k, v in time_based_drop_defaults.items() if v is not None}
|
||||
|
||||
time_based_drop, created = TimeBasedDrop.objects.update_or_create(id=drop_data["id"], defaults=time_based_drop_defaults)
|
||||
if created:
|
||||
self.stdout.write(self.style.SUCCESS(f"Successfully imported time-based drop {time_based_drop.name} (ID: {time_based_drop.id})"))
|
||||
try:
|
||||
with transaction.atomic():
|
||||
time_based_drop, created = TimeBasedDrop.objects.update_or_create(id=drop_data["id"], defaults=time_based_drop_defaults)
|
||||
if created:
|
||||
tqdm.write(f"Added {time_based_drop}")
|
||||
except MultipleObjectsReturned as e:
|
||||
msg = f"Error: Multiple TimeBasedDrop objects found for drop {drop_data['id']}. Cannot update or create."
|
||||
raise CommandError(msg) from e
|
||||
except (IntegrityError, DatabaseError, TypeError, ValueError) as e:
|
||||
msg = f"Database or validation error creating TimeBasedDrop for drop {drop_data['id']}: {e}"
|
||||
raise CommandError(msg) from e
|
||||
|
||||
return time_based_drop
|
||||
|
||||
def drop_campaign_update_or_get(
|
||||
def _get_or_create_cached(
|
||||
self,
|
||||
campaign_data: dict[str, Any],
|
||||
game: Game,
|
||||
) -> DropCampaign:
|
||||
"""Update or create a drop campaign.
|
||||
model_name: str,
|
||||
model_class: type[Game | Organization | DropCampaign | Channel | DropBenefit],
|
||||
obj_id: str | int,
|
||||
defaults: dict[str, Any] | None = None,
|
||||
) -> Game | Organization | DropCampaign | Channel | DropBenefit | str | int | None:
|
||||
"""Generic get-or-create that uses the in-memory cache and writes only if needed.
|
||||
|
||||
This implementation is thread-safe and transaction-aware.
|
||||
|
||||
Args:
|
||||
campaign_data: The drop campaign data to import.
|
||||
game: The game this drop campaign is for.
|
||||
organization: The company that owns the game. If None, the campaign will not have an owner.
|
||||
model_name: The name of the model (used for cache and lock).
|
||||
model_class: The Django model class.
|
||||
obj_id: The ID of the object to get or create.
|
||||
defaults: A dictionary of fields to set on creation or update.
|
||||
|
||||
Returns:
|
||||
Returns the DropCampaign object.
|
||||
The retrieved or created object.
|
||||
"""
|
||||
sid = str(obj_id)
|
||||
defaults = defaults or {}
|
||||
|
||||
lock = self._cache_locks.get(model_name)
|
||||
if lock is None:
|
||||
# Fallback for models without a dedicated cache/lock
|
||||
obj, created = model_class.objects.update_or_create(id=obj_id, defaults=defaults)
|
||||
if created:
|
||||
tqdm.write(f"Added {obj}")
|
||||
return obj
|
||||
|
||||
with lock:
|
||||
cache = getattr(self, f"_{model_name}_cache", None)
|
||||
if cache is None:
|
||||
cache = {}
|
||||
setattr(self, f"_{model_name}_cache", cache)
|
||||
|
||||
# First, check the cache.
|
||||
cached_obj = cache.get(sid)
|
||||
if cached_obj:
|
||||
return cached_obj
|
||||
|
||||
# Not in cache, so we need to go to the database.
|
||||
# Use get_or_create which is safer in a race. It might still fail if two threads
|
||||
# try to create at the exact same time, so we wrap it.
|
||||
try:
|
||||
obj, created = model_class.objects.get_or_create(id=obj_id, defaults=defaults)
|
||||
except IntegrityError:
|
||||
# Another thread created it between our `get` and `create` attempt.
|
||||
# The object is guaranteed to exist now, so we can just fetch it.
|
||||
obj = model_class.objects.get(id=obj_id)
|
||||
created = False
|
||||
|
||||
if not created:
|
||||
# The object already existed, check if our data is newer and update if needed.
|
||||
changed = False
|
||||
update_fields = []
|
||||
for key, val in defaults.items():
|
||||
if hasattr(obj, key) and getattr(obj, key) != val:
|
||||
setattr(obj, key, val)
|
||||
changed = True
|
||||
update_fields.append(key)
|
||||
if changed:
|
||||
obj.save(update_fields=update_fields)
|
||||
|
||||
# IMPORTANT: Defer the cache update until the transaction is successful.
|
||||
# This is the key to preventing the race condition.
|
||||
transaction.on_commit(lambda: cache.update({sid: obj}))
|
||||
|
||||
if created:
|
||||
tqdm.write(f"Added {obj}")
|
||||
|
||||
return obj
|
||||
|
||||
def _get_or_create_benefit(self, benefit_id: str | int, defaults: dict[str, Any]) -> DropBenefit:
|
||||
return self._get_or_create_cached("benefit", DropBenefit, benefit_id, defaults) # pyright: ignore[reportReturnType]
|
||||
|
||||
def game_update_or_create(self, campaign_data: dict[str, Any]) -> Game:
|
||||
"""Update or create a game with caching.
|
||||
|
||||
Args:
|
||||
campaign_data: The campaign data containing game information.
|
||||
|
||||
Raises:
|
||||
TypeError: If the retrieved object is not a Game instance.
|
||||
|
||||
Returns:
|
||||
The retrieved or created Game object.
|
||||
"""
|
||||
game_data: dict[str, Any] = campaign_data["game"]
|
||||
|
||||
game_defaults: dict[str, Any] = {
|
||||
"name": game_data.get("name"),
|
||||
"display_name": game_data.get("displayName"),
|
||||
"box_art": game_data.get("boxArtURL"),
|
||||
"slug": game_data.get("slug"),
|
||||
}
|
||||
# Filter out None values to avoid overwriting with them
|
||||
game_defaults = {k: v for k, v in game_defaults.items() if v is not None}
|
||||
|
||||
game: Game | Organization | DropCampaign | Channel | DropBenefit | str | int | None = self._get_or_create_cached(
|
||||
model_name="game",
|
||||
model_class=Game,
|
||||
obj_id=game_data["id"],
|
||||
defaults=game_defaults,
|
||||
)
|
||||
if not isinstance(game, Game):
|
||||
msg = "Expected a Game instance from _get_or_create_cached"
|
||||
raise TypeError(msg)
|
||||
|
||||
return game
|
||||
|
||||
def owner_update_or_create(self, campaign_data: dict[str, Any]) -> Organization | None:
|
||||
"""Update or create an organization with caching.
|
||||
|
||||
Args:
|
||||
campaign_data: The campaign data containing owner information.
|
||||
|
||||
Raises:
|
||||
TypeError: If the retrieved object is not an Organization instance.
|
||||
|
||||
Returns:
|
||||
The retrieved or created Organization object, or None if no owner data is present.
|
||||
"""
|
||||
org_data: dict[str, Any] = campaign_data.get("owner", {})
|
||||
if org_data:
|
||||
org_defaults: dict[str, Any] = {"name": org_data.get("name")}
|
||||
org_defaults = {k: v.strip() if isinstance(v, str) else v for k, v in org_defaults.items() if v is not None}
|
||||
|
||||
owner = self._get_or_create_cached(
|
||||
model_name="org",
|
||||
model_class=Organization,
|
||||
obj_id=org_data["id"],
|
||||
defaults=org_defaults,
|
||||
)
|
||||
if not isinstance(owner, Organization):
|
||||
msg = "Expected an Organization instance from _get_or_create_cached"
|
||||
raise TypeError(msg)
|
||||
|
||||
return owner
|
||||
return None
|
||||
|
||||
def drop_campaign_update_or_get(self, campaign_data: dict[str, Any], game: Game) -> DropCampaign:
|
||||
"""Update or create a drop campaign with caching and channel handling.
|
||||
|
||||
Args:
|
||||
campaign_data: The campaign data containing drop campaign information.
|
||||
game: The associated Game object.
|
||||
|
||||
Raises:
|
||||
TypeError: If the retrieved object is not a DropCampaign instance.
|
||||
|
||||
Returns:
|
||||
The retrieved or created DropCampaign object.
|
||||
"""
|
||||
# Extract allow data from campaign_data
|
||||
allow_data = campaign_data.get("allow", {})
|
||||
allow_is_enabled = allow_data.get("isEnabled")
|
||||
|
||||
|
|
@ -595,18 +785,24 @@ class Command(BaseCommand):
|
|||
"is_account_connected": campaign_data.get("self", {}).get("isAccountConnected"),
|
||||
"allow_is_enabled": allow_is_enabled,
|
||||
}
|
||||
|
||||
# Run .strip() on all string fields to remove leading/trailing whitespace
|
||||
for key, value in drop_campaign_defaults.items():
|
||||
for key, value in list(drop_campaign_defaults.items()):
|
||||
if isinstance(value, str):
|
||||
drop_campaign_defaults[key] = value.strip()
|
||||
|
||||
# Filter out None values to avoid overwriting with them
|
||||
drop_campaign_defaults = {k: v for k, v in drop_campaign_defaults.items() if v is not None}
|
||||
|
||||
drop_campaign, created = DropCampaign.objects.update_or_create(
|
||||
id=campaign_data["id"],
|
||||
drop_campaign = self._get_or_create_cached(
|
||||
model_name="campaign",
|
||||
model_class=DropCampaign,
|
||||
obj_id=campaign_data["id"],
|
||||
defaults=drop_campaign_defaults,
|
||||
)
|
||||
if not isinstance(drop_campaign, DropCampaign):
|
||||
msg = "Expected a DropCampaign instance from _get_or_create_cached"
|
||||
raise TypeError(msg)
|
||||
|
||||
# Handle allow_channels (many-to-many relationship)
|
||||
allow_channels: list[dict[str, str]] = allow_data.get("channels", [])
|
||||
|
|
@ -625,87 +821,23 @@ class Command(BaseCommand):
|
|||
# Filter out None values
|
||||
channel_defaults = {k: v for k, v in channel_defaults.items() if v is not None}
|
||||
|
||||
channel, _ = Channel.objects.update_or_create(
|
||||
id=channel_data["id"],
|
||||
# Use cached helper for channels
|
||||
channel = self._get_or_create_cached(
|
||||
model_name="channel",
|
||||
model_class=Channel,
|
||||
obj_id=channel_data["id"],
|
||||
defaults=channel_defaults,
|
||||
)
|
||||
if not isinstance(channel, Channel):
|
||||
msg = "Expected a Channel instance from _get_or_create_cached"
|
||||
raise TypeError(msg)
|
||||
|
||||
channel_objects.append(channel)
|
||||
|
||||
# Set the many-to-many relationship
|
||||
drop_campaign.allow_channels.set(channel_objects)
|
||||
# Set the many-to-many relationship (save only if different)
|
||||
current_ids = set(drop_campaign.allow_channels.values_list("id", flat=True))
|
||||
new_ids = {ch.id for ch in channel_objects}
|
||||
if current_ids != new_ids:
|
||||
drop_campaign.allow_channels.set(channel_objects)
|
||||
|
||||
if created:
|
||||
self.stdout.write(self.style.SUCCESS(f"Created new drop campaign: {drop_campaign.name} (ID: {drop_campaign.id})"))
|
||||
|
||||
# Cache campaign image if available and not already cached
|
||||
if (not drop_campaign.image_file) and drop_campaign.image_url:
|
||||
rel_path: str | None = cache_remote_image(drop_campaign.image_url, "campaigns/images")
|
||||
if rel_path:
|
||||
drop_campaign.image_file.name = rel_path
|
||||
drop_campaign.save(update_fields=["image_file"]) # type: ignore[list-item]
|
||||
return drop_campaign
|
||||
|
||||
def owner_update_or_create(self, campaign_data: dict[str, Any]) -> Organization | None:
|
||||
"""Update or create an organization.
|
||||
|
||||
Args:
|
||||
campaign_data: The drop campaign data to import.
|
||||
|
||||
Returns:
|
||||
Returns the Organization object.
|
||||
"""
|
||||
org_data: dict[str, Any] = campaign_data.get("owner", {})
|
||||
if org_data:
|
||||
org_defaults: dict[str, Any] = {"name": org_data.get("name")}
|
||||
|
||||
# Run .strip() on all string fields to remove leading/trailing whitespace
|
||||
for key, value in org_defaults.items():
|
||||
if isinstance(value, str):
|
||||
org_defaults[key] = value.strip()
|
||||
|
||||
# Filter out None values to avoid overwriting with them
|
||||
org_defaults = {k: v for k, v in org_defaults.items() if v is not None}
|
||||
|
||||
organization, created = Organization.objects.update_or_create(
|
||||
id=org_data["id"],
|
||||
defaults=org_defaults,
|
||||
)
|
||||
if created:
|
||||
self.stdout.write(self.style.SUCCESS(f"Created new organization: {organization.name} (ID: {organization.id})"))
|
||||
return organization
|
||||
return None
|
||||
|
||||
def game_update_or_create(self, campaign_data: dict[str, Any]) -> Game:
|
||||
"""Update or create a game.
|
||||
|
||||
Args:
|
||||
campaign_data: The drop campaign data to import.
|
||||
|
||||
Returns:
|
||||
Returns the Game object.
|
||||
"""
|
||||
game_data: dict[str, Any] = campaign_data["game"]
|
||||
|
||||
game_defaults: dict[str, Any] = {
|
||||
"name": game_data.get("name"),
|
||||
"display_name": game_data.get("displayName"),
|
||||
"box_art": game_data.get("boxArtURL"),
|
||||
"slug": game_data.get("slug"),
|
||||
}
|
||||
# Filter out None values to avoid overwriting with them
|
||||
game_defaults = {k: v for k, v in game_defaults.items() if v is not None}
|
||||
|
||||
game, created = Game.objects.update_or_create(
|
||||
id=game_data["id"],
|
||||
defaults=game_defaults,
|
||||
)
|
||||
if created:
|
||||
self.stdout.write(self.style.SUCCESS(f"Created new game: {game.display_name} (ID: {game.id})"))
|
||||
|
||||
# Cache game box art if available and not already cached
|
||||
if (not game.box_art_file) and game.box_art:
|
||||
rel_path: str | None = cache_remote_image(game.box_art, "games/box_art")
|
||||
if rel_path:
|
||||
game.box_art_file.name = rel_path
|
||||
game.save(update_fields=["box_art_file"])
|
||||
return game
|
||||
|
|
|
|||
|
|
@ -1,40 +0,0 @@
|
|||
"""Management command to update PostgreSQL search vectors."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from django.contrib.postgres.search import SearchVector
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
from twitch.models import DropBenefit, DropCampaign, Game, Organization, TimeBasedDrop
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
"""Update search vectors for existing records."""
|
||||
|
||||
help = "Update PostgreSQL search vectors for existing records"
|
||||
|
||||
def handle(self, *_args, **_options) -> None:
|
||||
"""Update search vectors for all models."""
|
||||
self.stdout.write("Updating search vectors...")
|
||||
|
||||
# Update Organizations
|
||||
org_count = Organization.objects.update(search_vector=SearchVector("name"))
|
||||
self.stdout.write(self.style.SUCCESS(f"Successfully updated search vectors for {org_count} organizations"))
|
||||
|
||||
# Update Games
|
||||
game_count = Game.objects.update(search_vector=SearchVector("name", "display_name"))
|
||||
self.stdout.write(self.style.SUCCESS(f"Successfully updated search vectors for {game_count} games"))
|
||||
|
||||
# Update DropCampaigns
|
||||
campaign_count = DropCampaign.objects.update(search_vector=SearchVector("name", "description"))
|
||||
self.stdout.write(self.style.SUCCESS(f"Successfully updated search vectors for {campaign_count} campaigns"))
|
||||
|
||||
# Update TimeBasedDrops
|
||||
drop_count = TimeBasedDrop.objects.update(search_vector=SearchVector("name"))
|
||||
self.stdout.write(self.style.SUCCESS(f"Successfully updated search vectors for {drop_count} time-based drops"))
|
||||
|
||||
# Update DropBenefits
|
||||
benefit_count = DropBenefit.objects.update(search_vector=SearchVector("name"))
|
||||
self.stdout.write(self.style.SUCCESS(f"Successfully updated search vectors for {benefit_count} drop benefits"))
|
||||
|
||||
self.stdout.write(self.style.SUCCESS("All search vectors updated."))
|
||||
Loading…
Add table
Add a link
Reference in a new issue