Improve import command

This commit is contained in:
Joakim Hellsén 2026-01-05 18:46:46 +01:00
commit 1d6c52325c
No known key found for this signature in database
30 changed files with 2628 additions and 554 deletions

View file

@ -52,7 +52,10 @@ def parse_date(value: str | None) -> datetime | None:
"RETURN_AS_TIMEZONE_AWARE": True,
"CACHE_SIZE_LIMIT": 0,
}
dt: datetime | None = dateparser.parse(date_string=value, settings=dateparser_settings) # pyright: ignore[reportArgumentType]
dt: datetime | None = dateparser.parse(
date_string=value,
settings=dateparser_settings, # pyright: ignore[reportArgumentType]
)
if not dt:
return None
@ -63,7 +66,7 @@ def parse_date(value: str | None) -> datetime | None:
class Command(BaseCommand):
"""Import Twitch drop campaign data from a JSON file or directory of JSON files."""
"""Import Twitch drop campaign data from JSON."""
help = "Import Twitch drop campaign data from a JSON file or directory"
requires_migrations_checks = True
@ -110,7 +113,7 @@ class Command(BaseCommand):
parser.add_argument(
"--no-preload",
action="store_true",
help="Do not preload existing DB objects into memory (default: preload).",
help="Do not preload existing DB objects into memory.",
)
def handle(self, **options) -> None:
@ -126,7 +129,6 @@ class Command(BaseCommand):
AttributeError: If expected attributes are missing in the data.
KeyError: If expected keys are missing in the data.
IndexError: If list indices are out of range in the data.
"""
paths: list[str] = options["paths"]
processed_dir: str = options["processed_dir"]
@ -136,7 +138,9 @@ class Command(BaseCommand):
# Preload DB objects into caches (unless disabled)
if not no_preload:
try:
self.stdout.write("Preloading existing database objects into memory...")
self.stdout.write(
"Preloading existing database objects into memory...",
)
self._preload_caches()
self.stdout.write(
f"Preloaded {len(self._game_cache)} games, "
@ -147,7 +151,8 @@ class Command(BaseCommand):
)
except (FileNotFoundError, OSError, RuntimeError):
# If preload fails for any reason, continue without it
self.stdout.write(self.style.WARNING("Preloading caches failed — continuing without preload."))
msg = "Warning: Preloading caches failed — continuing without preload."
self.stdout.write(self.style.WARNING(msg))
self.stdout.write(self.style.ERROR(traceback.format_exc()))
self._game_cache = {}
self._organization_cache = {}
@ -167,37 +172,77 @@ class Command(BaseCommand):
processed_path: Path = path / processed_dir
processed_path.mkdir(exist_ok=True)
self.process_drops(continue_on_error=continue_on_error, path=path, processed_path=processed_path)
self.process_drops(
continue_on_error=continue_on_error,
path=path,
processed_path=processed_path,
)
except CommandError as e:
if not continue_on_error:
raise
self.stdout.write(self.style.ERROR(f"Error processing path {p}: {e}"))
except (ValueError, TypeError, AttributeError, KeyError, IndexError):
self.stdout.write(
self.style.ERROR(f"Error processing path {p}: {e}"),
)
except (
ValueError,
TypeError,
AttributeError,
KeyError,
IndexError,
):
if not continue_on_error:
raise
self.stdout.write(self.style.ERROR(f"Data error processing path {p}"))
self.stdout.write(
self.style.ERROR(f"Data error processing path {p}"),
)
self.stdout.write(self.style.ERROR(traceback.format_exc()))
except KeyboardInterrupt:
# Gracefully handle Ctrl+C
self.stdout.write(self.style.WARNING("Interrupted by user, exiting import."))
self.stdout.write(
self.style.WARNING("Interrupted by user, exiting import."),
)
return
def _preload_caches(self) -> None:
"""Load existing DB objects into in-memory caches to avoid repeated queries."""
# These queries may be heavy if DB is huge — safe because optional via --no-preload
"""Load DB objects into in-memory caches to avoid repeated queries."""
with self._cache_locks["game"]:
self._game_cache = {str(g.twitch_id): g for g in Game.objects.all()}
with self._cache_locks["org"]:
self._organization_cache = {str(o.twitch_id): o for o in Organization.objects.all()}
with self._cache_locks["campaign"]:
self._drop_campaign_cache = {str(c.twitch_id): c for c in DropCampaign.objects.all()}
with self._cache_locks["channel"]:
self._channel_cache = {str(ch.twitch_id): ch for ch in Channel.objects.all()}
with self._cache_locks["benefit"]:
self._benefit_cache = {str(b.twitch_id): b for b in DropBenefit.objects.all()}
self._game_cache = {} # Clear existing cache
for game_instance in Game.objects.all():
twitch_id = str(game_instance.twitch_id)
self._game_cache[twitch_id] = game_instance
def process_drops(self, *, continue_on_error: bool, path: Path, processed_path: Path) -> None:
with self._cache_locks["org"]:
self._organization_cache = {}
for organization_instance in Organization.objects.all():
twitch_id = str(organization_instance.twitch_id)
self._organization_cache[twitch_id] = organization_instance
with self._cache_locks["campaign"]:
self._drop_campaign_cache = {}
for drop_campaign_instance in DropCampaign.objects.all():
twitch_id = str(drop_campaign_instance.twitch_id)
self._drop_campaign_cache[twitch_id] = drop_campaign_instance
with self._cache_locks["channel"]:
self._channel_cache = {}
for channel_instance in Channel.objects.all():
twitch_id = str(channel_instance.twitch_id)
self._channel_cache[twitch_id] = channel_instance
with self._cache_locks["benefit"]:
self._benefit_cache = {}
for benefit_instance in DropBenefit.objects.all():
twitch_id = str(benefit_instance.twitch_id)
self._benefit_cache[twitch_id] = benefit_instance
def process_drops(
self,
*,
continue_on_error: bool,
path: Path,
processed_path: Path,
) -> None:
"""Process drops from a file or directory.
Args:
@ -233,7 +278,13 @@ class Command(BaseCommand):
msg: str = f"Path {path} does not exist"
raise CommandError(msg)
def _process_directory(self, *, directory: Path, processed_path: Path, continue_on_error: bool) -> None:
def _process_directory(
self,
*,
directory: Path,
processed_path: Path,
continue_on_error: bool,
) -> None:
"""Process all JSON files in a directory using parallel processing.
Args:
@ -252,7 +303,9 @@ class Command(BaseCommand):
"""
json_files: list[Path] = list(directory.glob("*.json"))
if not json_files:
self.stdout.write(self.style.WARNING(f"No JSON files found in {directory}"))
self.stdout.write(
self.style.WARNING(f"No JSON files found in {directory}"),
)
return
total_files: int = len(json_files)
@ -261,10 +314,19 @@ class Command(BaseCommand):
with concurrent.futures.ThreadPoolExecutor() as executor:
try:
future_to_file: dict[concurrent.futures.Future[None], Path] = {
executor.submit(self._process_file, json_file, processed_path): json_file for json_file in json_files
executor.submit(
self._process_file,
json_file,
processed_path,
): json_file
for json_file in json_files
}
# Wrap the as_completed iterator with tqdm for a progress bar
for future in tqdm(concurrent.futures.as_completed(future_to_file), total=total_files, desc="Processing files"):
for future in tqdm(
concurrent.futures.as_completed(future_to_file),
total=total_files,
desc="Processing files",
):
json_file: Path = future_to_file[future]
try:
future.result()
@ -273,20 +335,42 @@ class Command(BaseCommand):
# To stop all processing, we shut down the executor and re-raise
executor.shutdown(wait=False, cancel_futures=True)
raise
self.stdout.write(self.style.ERROR(f"Error processing {json_file}: {e}"))
except (ValueError, TypeError, AttributeError, KeyError, IndexError):
self.stdout.write(
self.style.ERROR(
f"Error processing {json_file}: {e}",
),
)
except (
ValueError,
TypeError,
AttributeError,
KeyError,
IndexError,
):
if not continue_on_error:
# To stop all processing, we shut down the executor and re-raise
executor.shutdown(wait=False, cancel_futures=True)
raise
self.stdout.write(self.style.ERROR(f"Data error processing {json_file}"))
self.stdout.write(self.style.ERROR(traceback.format_exc()))
self.stdout.write(
self.style.ERROR(
f"Data error processing {json_file}",
),
)
self.stdout.write(
self.style.ERROR(traceback.format_exc()),
)
msg: str = f"Processed {total_files} JSON files in {directory}. Moved processed files to {processed_path}."
msg: str = (
f"Processed {total_files} JSON files in {directory}. Moved processed files to {processed_path}."
)
self.stdout.write(self.style.SUCCESS(msg))
except KeyboardInterrupt:
self.stdout.write(self.style.WARNING("Interruption received, shutting down threads immediately..."))
self.stdout.write(
self.style.WARNING(
"Interruption received, shutting down threads immediately...",
),
)
executor.shutdown(wait=False, cancel_futures=True)
# Re-raise the exception to allow the main `handle` method to catch it and exit
raise
@ -331,7 +415,9 @@ class Command(BaseCommand):
target_dir.mkdir(parents=True, exist_ok=True)
self.move_file(file_path, target_dir / file_path.name)
tqdm.write(f"Moved {file_path} to {target_dir} (matched '{keyword}')")
tqdm.write(
f"Moved {file_path} to {target_dir} (matched '{keyword}')",
)
return
# Some responses have errors:
@ -341,7 +427,9 @@ class Command(BaseCommand):
actual_error_dir: Path = processed_path / "actual_error"
actual_error_dir.mkdir(parents=True, exist_ok=True)
self.move_file(file_path, actual_error_dir / file_path.name)
tqdm.write(f"Moved {file_path} to {actual_error_dir} (contains Twitch errors)")
tqdm.write(
f"Moved {file_path} to {actual_error_dir} (contains Twitch errors)",
)
return
# If file has "__typename": "BroadcastSettings" move it to the "broadcast_settings" directory
@ -360,7 +448,9 @@ class Command(BaseCommand):
and data["data"]["channel"]["viewerDropCampaigns"] is None
):
file_path.unlink()
tqdm.write(f"Removed {file_path} (only contains empty viewerDropCampaigns)")
tqdm.write(
f"Removed {file_path} (only contains empty viewerDropCampaigns)",
)
return
# If file only contains {"data": {"user": null}} remove the file
@ -377,11 +467,18 @@ class Command(BaseCommand):
tqdm.write(f"Removed {file_path} (only contains game data)")
return
# If file has "__typename": "DropCurrentSession" move it to the "drop_current_session" directory so we can process it separately.
if isinstance(data, dict) and data.get("data", {}).get("currentUser", {}).get("dropCurrentSession", {}).get("__typename") == "DropCurrentSession":
# If file has "__typename": "DropCurrentSession" move it to the "drop_current_session" directory so we can process it separately. # noqa: E501
if (
isinstance(data, dict)
and data.get("data", {}).get("currentUser", {}).get("dropCurrentSession", {}).get("__typename")
== "DropCurrentSession"
):
drop_current_session_dir: Path = processed_path / "drop_current_session"
drop_current_session_dir.mkdir(parents=True, exist_ok=True)
self.move_file(file_path, drop_current_session_dir / file_path.name)
self.move_file(
file_path,
drop_current_session_dir / file_path.name,
)
return
# If file is a list with one item: {"data": {"user": null}}, remove it
@ -407,7 +504,10 @@ class Command(BaseCommand):
# Move file to "we_should_double_check" directory for manual review
we_should_double_check_dir: Path = processed_path / "we_should_double_check"
we_should_double_check_dir.mkdir(parents=True, exist_ok=True)
self.move_file(file_path, we_should_double_check_dir / file_path.name)
self.move_file(
file_path,
we_should_double_check_dir / file_path.name,
)
raise CommandError(msg)
self.move_file(file_path, processed_path)
@ -426,19 +526,33 @@ class Command(BaseCommand):
if f1.read() != f2.read():
new_name: Path = processed_path / f"{file_path.stem}_duplicate{file_path.suffix}"
shutil.move(str(file_path), str(new_name))
tqdm.write(f"Moved {file_path!s} to {new_name!s} (content differs)")
tqdm.write(
f"Moved {file_path!s} to {new_name!s} (content differs)",
)
else:
tqdm.write(f"{file_path!s} already exists in {processed_path!s}, removing original file.")
tqdm.write(
f"{file_path!s} already exists in {processed_path!s}, removing original file.",
)
file_path.unlink()
except FileNotFoundError:
tqdm.write(f"{file_path!s} not found when handling duplicate case, skipping.")
tqdm.write(
f"{file_path!s} not found when handling duplicate case, skipping.",
)
except FileNotFoundError:
tqdm.write(f"{file_path!s} not found, skipping.")
except (PermissionError, OSError, shutil.Error) as e:
self.stdout.write(self.style.ERROR(f"Error moving {file_path!s} to {processed_path!s}: {e}"))
self.stdout.write(
self.style.ERROR(
f"Error moving {file_path!s} to {processed_path!s}: {e}",
),
)
traceback.print_exc()
def import_drop_campaign(self, data: dict[str, Any], file_path: Path) -> None:
def import_drop_campaign(
self,
data: dict[str, Any],
file_path: Path,
) -> None:
"""Find and import drop campaign data from various JSON structures."""
# Add this check: If this is a known "empty" response, ignore it silently.
if (
@ -475,7 +589,9 @@ class Command(BaseCommand):
# Structure: {"data": {"currentUser": {"inventory": {"dropCampaignsInProgress": [...]}}}}
if "inventory" in current_user and "dropCampaignsInProgress" in current_user["inventory"]:
campaigns_found.extend(current_user["inventory"]["dropCampaignsInProgress"])
campaigns_found.extend(
current_user["inventory"]["dropCampaignsInProgress"],
)
# Structure: {"data": {"channel": {"viewerDropCampaigns": [...]}}}
if "channel" in d and d["channel"] and "viewerDropCampaigns" in d["channel"]:
@ -507,9 +623,17 @@ class Command(BaseCommand):
self.import_to_db(data, file_path=file_path)
return
tqdm.write(self.style.WARNING(f"No valid drop campaign data found in {file_path.name}"))
tqdm.write(
self.style.WARNING(
f"No valid drop campaign data found in {file_path.name}",
),
)
def import_to_db(self, campaign_data: dict[str, Any], file_path: Path) -> None:
def import_to_db(
self,
campaign_data: dict[str, Any],
file_path: Path,
) -> None:
"""Import drop campaign data into the database with retry logic for SQLite locks.
Args:
@ -517,25 +641,51 @@ class Command(BaseCommand):
file_path: The path to the file being processed.
"""
with transaction.atomic():
game: Game = self.game_update_or_create(campaign_data=campaign_data)
organization: Organization | None = self.owner_update_or_create(campaign_data=campaign_data)
game: Game = self.game_update_or_create(
campaign_data=campaign_data,
)
organization: Organization | None = self.owner_update_or_create(
campaign_data=campaign_data,
)
if organization and game.owner != organization:
game.owner = organization
game.save(update_fields=["owner"])
drop_campaign: DropCampaign = self.drop_campaign_update_or_get(campaign_data=campaign_data, game=game)
drop_campaign: DropCampaign = self.drop_campaign_update_or_get(
campaign_data=campaign_data,
game=game,
)
for drop_data in campaign_data.get("timeBasedDrops", []):
self._process_time_based_drop(drop_data, drop_campaign, file_path)
self._process_time_based_drop(
drop_data,
drop_campaign,
file_path,
)
def _process_time_based_drop(self, drop_data: dict[str, Any], drop_campaign: DropCampaign, file_path: Path) -> None:
time_based_drop: TimeBasedDrop = self.create_time_based_drop(drop_campaign=drop_campaign, drop_data=drop_data)
def _process_time_based_drop(
self,
drop_data: dict[str, Any],
drop_campaign: DropCampaign,
file_path: Path,
) -> None:
time_based_drop: TimeBasedDrop = self.create_time_based_drop(
drop_campaign=drop_campaign,
drop_data=drop_data,
)
benefit_edges: list[dict[str, Any]] = drop_data.get("benefitEdges", [])
if not benefit_edges:
tqdm.write(self.style.WARNING(f"No benefit edges found for drop {time_based_drop.name} (ID: {time_based_drop.twitch_id})"))
self.move_file(file_path, Path("no_benefit_edges") / file_path.name)
tqdm.write(
self.style.WARNING(
f"No benefit edges found for drop {time_based_drop.name} (ID: {time_based_drop.twitch_id})",
),
)
self.move_file(
file_path,
Path("no_benefit_edges") / file_path.name,
)
return
for benefit_edge in benefit_edges:
@ -558,14 +708,22 @@ class Command(BaseCommand):
benefit_defaults = {k: v for k, v in benefit_defaults.items() if v is not None}
# Use cached create/update for benefits
benefit = self._get_or_create_benefit(benefit_data["id"], benefit_defaults)
benefit = self._get_or_create_benefit(
benefit_data["id"],
benefit_defaults,
)
try:
with transaction.atomic():
drop_benefit_edge, created = DropBenefitEdge.objects.update_or_create(
drop=time_based_drop,
benefit=benefit,
defaults={"entitlement_limit": benefit_edge.get("entitlementLimit", 1)},
defaults={
"entitlement_limit": benefit_edge.get(
"entitlementLimit",
1,
),
},
)
if created:
tqdm.write(f"Added {drop_benefit_edge}")
@ -573,10 +731,14 @@ class Command(BaseCommand):
msg = f"Error: Multiple DropBenefitEdge objects found for drop {time_based_drop.twitch_id} and benefit {benefit.twitch_id}. Cannot update or create." # noqa: E501
raise CommandError(msg) from e
except (IntegrityError, DatabaseError, TypeError, ValueError) as e:
msg = f"Database or validation error creating DropBenefitEdge for drop {time_based_drop.twitch_id} and benefit {benefit.twitch_id}: {e}"
msg = f"Database or validation error creating DropBenefitEdge for drop {time_based_drop.twitch_id} and benefit {benefit.twitch_id}: {e}" # noqa: E501
raise CommandError(msg) from e
def create_time_based_drop(self, drop_campaign: DropCampaign, drop_data: dict[str, Any]) -> TimeBasedDrop:
def create_time_based_drop(
self,
drop_campaign: DropCampaign,
drop_data: dict[str, Any],
) -> TimeBasedDrop:
"""Creates or updates a TimeBasedDrop instance based on the provided drop data.
Args:
@ -598,7 +760,9 @@ class Command(BaseCommand):
time_based_drop_defaults: dict[str, Any] = {
"campaign": drop_campaign,
"name": drop_data.get("name"),
"required_minutes_watched": drop_data.get("requiredMinutesWatched"),
"required_minutes_watched": drop_data.get(
"requiredMinutesWatched",
),
"required_subs": drop_data.get("requiredSubs"),
"start_at": parse_date(drop_data.get("startAt")),
"end_at": parse_date(drop_data.get("endAt")),
@ -614,7 +778,10 @@ class Command(BaseCommand):
try:
with transaction.atomic():
time_based_drop, created = TimeBasedDrop.objects.update_or_create(id=drop_data["id"], defaults=time_based_drop_defaults)
time_based_drop, created = TimeBasedDrop.objects.update_or_create(
id=drop_data["id"],
defaults=time_based_drop_defaults,
)
if created:
tqdm.write(f"Added {time_based_drop}")
except MultipleObjectsReturned as e:
@ -652,7 +819,10 @@ class Command(BaseCommand):
lock = self._cache_locks.get(model_name)
if lock is None:
# Fallback for models without a dedicated cache/lock
obj, created = model_class.objects.update_or_create(id=obj_id, defaults=defaults)
obj, created = model_class.objects.update_or_create(
id=obj_id,
defaults=defaults,
)
if created:
tqdm.write(f"Added {obj}")
return obj
@ -672,7 +842,10 @@ class Command(BaseCommand):
# Use get_or_create which is safer in a race. It might still fail if two threads
# try to create at the exact same time, so we wrap it.
try:
obj, created = model_class.objects.get_or_create(id=obj_id, defaults=defaults)
obj, created = model_class.objects.get_or_create(
id=obj_id,
defaults=defaults,
)
except IntegrityError:
# Another thread created it between our `get` and `create` attempt.
# The object is guaranteed to exist now, so we can just fetch it.
@ -700,8 +873,17 @@ class Command(BaseCommand):
return obj
def _get_or_create_benefit(self, benefit_id: str | int, defaults: dict[str, Any]) -> DropBenefit:
return self._get_or_create_cached("benefit", DropBenefit, benefit_id, defaults) # pyright: ignore[reportReturnType]
def _get_or_create_benefit(
self,
benefit_id: str | int,
defaults: dict[str, Any],
) -> DropBenefit:
return self._get_or_create_cached(
"benefit",
DropBenefit,
benefit_id,
defaults,
) # pyright: ignore[reportReturnType]
def game_update_or_create(self, campaign_data: dict[str, Any]) -> Game:
"""Update or create a game with caching.
@ -726,11 +908,13 @@ class Command(BaseCommand):
# Filter out None values to avoid overwriting with them
game_defaults = {k: v for k, v in game_defaults.items() if v is not None}
game: Game | Organization | DropCampaign | Channel | DropBenefit | str | int | None = self._get_or_create_cached(
model_name="game",
model_class=Game,
obj_id=game_data["id"],
defaults=game_defaults,
game: Game | Organization | DropCampaign | Channel | DropBenefit | str | int | None = (
self._get_or_create_cached(
model_name="game",
model_class=Game,
obj_id=game_data["id"],
defaults=game_defaults,
)
)
if not isinstance(game, Game):
msg = "Expected a Game instance from _get_or_create_cached"
@ -738,7 +922,10 @@ class Command(BaseCommand):
return game
def owner_update_or_create(self, campaign_data: dict[str, Any]) -> Organization | None:
def owner_update_or_create(
self,
campaign_data: dict[str, Any],
) -> Organization | None:
"""Update or create an organization with caching.
Args:
@ -768,7 +955,11 @@ class Command(BaseCommand):
return owner
return None
def drop_campaign_update_or_get(self, campaign_data: dict[str, Any], game: Game) -> DropCampaign:
def drop_campaign_update_or_get(
self,
campaign_data: dict[str, Any],
game: Game,
) -> DropCampaign:
"""Update or create a drop campaign with caching and channel handling.
Args:
@ -791,9 +982,18 @@ class Command(BaseCommand):
"details_url": campaign_data.get("detailsURL"),
"account_link_url": campaign_data.get("accountLinkURL"),
"image_url": campaign_data.get("imageURL"),
"start_at": parse_date(campaign_data.get("startAt") or campaign_data.get("startsAt")),
"end_at": parse_date(campaign_data.get("endAt") or campaign_data.get("endsAt")),
"is_account_connected": campaign_data.get("self", {}).get("isAccountConnected"),
"start_at": parse_date(
campaign_data.get("startAt") or campaign_data.get("startsAt"),
),
"end_at": parse_date(
campaign_data.get("endAt") or campaign_data.get("endsAt"),
),
"is_account_connected": (
campaign_data.get(
"self",
{},
).get("isAccountConnected")
),
"allow_is_enabled": allow_is_enabled,
}
@ -846,7 +1046,9 @@ class Command(BaseCommand):
channel_objects.append(channel)
# Set the many-to-many relationship (save only if different)
current_ids = set(drop_campaign.allow_channels.values_list("id", flat=True))
current_ids = set(
drop_campaign.allow_channels.values_list("id", flat=True),
)
new_ids = {ch.twitch_id for ch in channel_objects}
if current_ids != new_ids:
drop_campaign.allow_channels.set(channel_objects)