Update importer

This commit is contained in:
2025-05-01 15:26:33 +02:00
parent 73f1870431
commit b1bd57bcc2
3 changed files with 207 additions and 96 deletions

View File

@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
import json import json
import logging
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@ -11,80 +12,171 @@ from django.utils.dateparse import parse_datetime
from core.models import Benefit, DropCampaign, Game, Organization, TimeBasedDrop from core.models import Benefit, DropCampaign, Game, Organization, TimeBasedDrop
# Configure logger for this module
logger = logging.getLogger(__name__)
class Command(BaseCommand): class Command(BaseCommand):
"""Imports Twitch Drop campaign data from a specified JSON file into the database. """Imports Twitch Drop campaign data from a directory of JSON files into the database."""
This command reads a JSON file containing Twitch Drop campaign information, help = "Imports Twitch Drop campaign data from a directory of JSON files."
typically obtained from Twitch's GQL endpoint or similar sources. It parses
the JSON, expecting a structure containing details about the campaign,
associated game, owner organization, time-based drops, and their benefits.
The command uses a database transaction to ensure atomicity, meaning either def add_arguments(self, parser: CommandParser) -> None: # noqa: PLR6301
all data is imported successfully, or no changes are made to the database """Add command line arguments for the command.
if an error occurs during the import process.
It performs the following steps:
1. Parses the command-line argument for the path to the JSON file.
2. Opens and reads the JSON file, handling potential file-related errors.
3. Navigates the JSON structure to find the core 'dropCampaign' data and
the associated Twitch User ID.
4. Within a transaction:
a. Processes Game data: Creates or updates a `Game` record.
b. Processes Owner Organization data: Creates or updates an `Organization` record.
c. Processes Drop Campaign data: Creates or updates a `DropCampaign` record,
linking it to the Game and Organization. Parses and stores start/end times.
d. Processes Time Based Drops: Iterates through `timeBasedDrops` within the campaign.
i. For each drop: Creates or updates a `TimeBasedDrop` record, linking it
to the `DropCampaign`. Parses and stores start/end times and required watch time.
ii. Processes Benefits within each drop: Iterates through `benefitEdges`.
- Creates or updates `Benefit` records associated with the drop.
- Handles potential missing related Game or Organization for the benefit
by creating minimal placeholder records if necessary.
- Associates the created/updated `Benefit` objects with the current
`TimeBasedDrop` via a ManyToMany relationship.
5. Provides feedback to the console about created or updated records and any warnings
or errors encountered.
6. Raises `CommandError` for critical issues like file not found, JSON errors,
missing essential data (e.g., campaign ID), or unexpected exceptions during processing.
Args: Args:
json_file (str): The filesystem path to the JSON file containing the parser (CommandParser): The command parser to add arguments to.
Twitch Drop campaign data.
Example Usage:
python manage.py import_twitch_drops /path/to/your/twitch_drops.json
""" """
parser.add_argument("json_dir", type=str, help="Path to the directory containing Twitch Drop JSON files")
parser.add_argument(
"--pattern",
type=str,
default="*.json",
help="Glob pattern to filter JSON files (default: *.json)",
)
help = "Imports Twitch Drop campaign data from a specific JSON file structure." def handle(self, *args: tuple[str, ...], **options: dict[str, Any]) -> None:
"""Import data to DB from multiple JSON files in a directory.
def add_arguments(self, parser: CommandParser) -> None: # noqa: D102
parser.add_argument("json_file", type=str, help="Path to the Twitch Drop JSON file")
def handle(self, *args: Any, **options) -> None: # noqa: ANN003, ANN401, ARG002, C901, PLR0912, PLR0915
"""Import data to DB.
Args: Args:
*args: Positional arguments. *args (tuple[str, ...]): Positional arguments passed to the command.
**options: Keyword arguments containing the command-line options. **options (dict[str, Any]): Keyword arguments passed to the command.
Raises: Raises:
CommandError: If the JSON file is not found, cannot be decoded, CommandError: If the directory does not exist or is not a directory.
or if there are issues with the JSON structure.
""" """
json_file_path = options["json_file"] self.stdout.write(self.style.NOTICE("Starting import process..."))
self.stdout.write(f"Starting import from {json_file_path}...") self.stdout.write(self.style.NOTICE(f"Arguments: {args}; Options: {options}"))
json_dir_path = options["json_dir"]
pattern = options["pattern"]
dir_path = Path(json_dir_path) # pyright: ignore[reportArgumentType]
# Check if directory exists
if not dir_path.exists():
msg = f"Error: Directory not found at {json_dir_path}"
logger.error(msg)
raise CommandError(msg)
if not dir_path.is_dir():
msg = f"Error: {json_dir_path} is not a directory"
logger.error(msg)
raise CommandError(msg)
# Find JSON files in the directory
json_files = list(dir_path.glob(pattern)) # pyright: ignore[reportArgumentType]
if not json_files:
msg = f"No JSON files found in {json_dir_path} matching pattern '{pattern}'"
self.stdout.write(self.style.WARNING(msg))
logger.warning(msg)
return
self.stdout.write(f"Found {len(json_files)} JSON files to process in {json_dir_path}")
logger.info("Found %s JSON files to process in %s", len(json_files), json_dir_path)
# Tracking statistics
total_stats = {
"files_processed": 0,
"files_with_errors": 0,
"games_created": 0,
"games_updated": 0,
"orgs_created": 0,
"orgs_updated": 0,
"campaigns_created": 0,
"campaigns_updated": 0,
"drops_created": 0,
"drops_updated": 0,
"benefits_created": 0,
"benefits_updated": 0,
}
# Process each file
for json_file in sorted(json_files):
try:
file_stats = self._process_json_file(json_file)
total_stats["files_processed"] += 1
# Update totals
for key, value in file_stats.items():
if key in total_stats:
total_stats[key] += value
except CommandError as e:
self.stdout.write(self.style.ERROR(str(e)))
total_stats["files_with_errors"] += 1
logger.exception("Error processing %s", json_file)
except Exception as e:
msg = f"Unexpected error processing {json_file}: {e}"
self.stdout.write(self.style.ERROR(msg))
logger.exception(msg)
total_stats["files_with_errors"] += 1
# Print summary
self.stdout.write(self.style.SUCCESS("Import process completed."))
self.stdout.write(f"Files processed: {total_stats['files_processed']}")
self.stdout.write(f"Files with errors: {total_stats['files_with_errors']}")
self.stdout.write(f"Games created: {total_stats['games_created']}, updated: {total_stats['games_updated']}")
self.stdout.write(
f"Organizations created: {total_stats['orgs_created']}, updated: {total_stats['orgs_updated']}",
)
self.stdout.write(
f"Campaigns created: {total_stats['campaigns_created']}, updated: {total_stats['campaigns_updated']}",
)
self.stdout.write(f"Drops created: {total_stats['drops_created']}, updated: {total_stats['drops_updated']}")
self.stdout.write(
f"Benefits created: {total_stats['benefits_created']}, updated: {total_stats['benefits_updated']}",
)
logger.info(
"Import completed. Processed: %s, Errors: %s",
total_stats["files_processed"],
total_stats["files_with_errors"],
)
def _process_json_file(self, json_file: Path) -> dict[str, int]: # noqa: C901, PLR0912, PLR0914, PLR0915
"""Process a single JSON file and import its data to the database.
Args:
json_file: Path object pointing to the JSON file.
Returns:
Dict with counts of created and updated records.
Raises:
CommandError: If the JSON file cannot be decoded or if there are
issues with the JSON structure.
"""
self.stdout.write(f"Processing file: {json_file}")
logger.info("Processing file: %s", json_file)
# Initialize stats for this file
stats = {
"games_created": 0,
"games_updated": 0,
"orgs_created": 0,
"orgs_updated": 0,
"campaigns_created": 0,
"campaigns_updated": 0,
"drops_created": 0,
"drops_updated": 0,
"benefits_created": 0,
"benefits_updated": 0,
}
try: try:
with Path(json_file_path).open(encoding="utf-8") as f: with json_file.open(encoding="utf-8") as f:
raw_data = json.load(f) raw_data = json.load(f)
except FileNotFoundError as e: except FileNotFoundError as e:
msg = f"Error: File not found at {json_file_path}" msg = f"Error: File not found at {json_file}"
logger.exception(msg)
raise CommandError(msg) from e raise CommandError(msg) from e
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
msg = f"Error: Could not decode JSON from {json_file_path}" msg = f"Error: Could not decode JSON from {json_file}"
logger.exception(msg)
raise CommandError(msg) from e raise CommandError(msg) from e
except Exception as e: except Exception as e:
msg = f"Error reading file: {e}" msg = f"Error reading file: {e}"
logger.exception(msg)
raise CommandError(msg) from e raise CommandError(msg) from e
# Navigate to the relevant part of the JSON structure # Navigate to the relevant part of the JSON structure
@ -93,14 +185,17 @@ class Command(BaseCommand):
campaign_data = user_data.get("dropCampaign") campaign_data = user_data.get("dropCampaign")
if not campaign_data: if not campaign_data:
msg = "Error: 'dropCampaign' key not found or is null in the JSON data." msg = f"Error: 'dropCampaign' key not found or is null in the JSON data: {json_file}"
logger.error(msg)
raise CommandError(msg) raise CommandError(msg)
except AttributeError as e: except AttributeError as e:
msg = "Error: Unexpected JSON structure. Could not find 'data' or 'user'." msg = f"Error: Unexpected JSON structure in {json_file}. Could not find 'data' or 'user'."
logger.exception(msg)
raise CommandError(msg) from e raise CommandError(msg) from e
try: # Process the campaign data within a transaction
try: # noqa: PLR1702
# Use a transaction to ensure atomicity # Use a transaction to ensure atomicity
with transaction.atomic(): with transaction.atomic():
# --- 1. Process Game --- # --- 1. Process Game ---
@ -117,10 +212,13 @@ class Command(BaseCommand):
) )
if created: if created:
self.stdout.write(self.style.SUCCESS(f"Created Game: {game_obj.display_name}")) self.stdout.write(self.style.SUCCESS(f"Created Game: {game_obj.display_name}"))
stats["games_created"] += 1
else: else:
self.stdout.write(f"Updated/Found Game: {game_obj.display_name}") self.stdout.write(f"Updated/Found Game: {game_obj.display_name}")
stats["games_updated"] += 1
else: else:
self.stdout.write(self.style.WARNING("No game data found in campaign.")) self.stdout.write(self.style.WARNING("No game data found in campaign."))
logger.warning("No game data found in campaign.")
# --- 2. Process Owner Organization --- # --- 2. Process Owner Organization ---
owner_data = campaign_data.get("owner") owner_data = campaign_data.get("owner")
@ -133,15 +231,19 @@ class Command(BaseCommand):
) )
if created: if created:
self.stdout.write(self.style.SUCCESS(f"Created Organization: {owner_obj.name}")) self.stdout.write(self.style.SUCCESS(f"Created Organization: {owner_obj.name}"))
stats["orgs_created"] += 1
else: else:
self.stdout.write(f"Updated/Found Organization: {owner_obj.name}") self.stdout.write(f"Updated/Found Organization: {owner_obj.name}")
stats["orgs_updated"] += 1
else: else:
self.stdout.write(self.style.WARNING("No owner organization data found in campaign.")) self.stdout.write(self.style.WARNING("No owner organization data found in campaign."))
logger.warning("No owner organization data found in campaign.")
# --- 3. Process Drop Campaign --- # --- 3. Process Drop Campaign ---
campaign_id = campaign_data.get("id") campaign_id = campaign_data.get("id")
if not campaign_id: if not campaign_id:
msg = "Error: Campaign ID is missing." msg = "Error: Campaign ID is missing."
logger.error(msg)
raise CommandError(msg) # noqa: TRY301 raise CommandError(msg) # noqa: TRY301
start_at_str = campaign_data.get("startAt") start_at_str = campaign_data.get("startAt")
@ -151,6 +253,7 @@ class Command(BaseCommand):
if not start_at_dt or not end_at_dt: if not start_at_dt or not end_at_dt:
self.stdout.write(self.style.WARNING(f"Campaign {campaign_id} missing start or end date.")) self.stdout.write(self.style.WARNING(f"Campaign {campaign_id} missing start or end date."))
logger.warning("Campaign %s missing start or end date.", campaign_id)
# Decide if you want to skip or handle this case differently # Decide if you want to skip or handle this case differently
campaign_defaults = { campaign_defaults = {
@ -171,18 +274,22 @@ class Command(BaseCommand):
) )
if created: if created:
self.stdout.write(self.style.SUCCESS(f"Created Campaign: {campaign_obj.name}")) self.stdout.write(self.style.SUCCESS(f"Created Campaign: {campaign_obj.name}"))
stats["campaigns_created"] += 1
else: else:
self.stdout.write(f"Updated/Found Campaign: {campaign_obj.name}") self.stdout.write(f"Updated/Found Campaign: {campaign_obj.name}")
stats["campaigns_updated"] += 1
# --- 4. Process Time Based Drops --- # --- 4. Process Time Based Drops ---
time_drops_data = campaign_data.get("timeBasedDrops", []) time_drops_data = campaign_data.get("timeBasedDrops", [])
if not time_drops_data: if not time_drops_data:
self.stdout.write(self.style.NOTICE("No timeBasedDrops found in campaign data.")) self.stdout.write(self.style.NOTICE("No timeBasedDrops found in campaign data."))
logger.info("No timeBasedDrops found in campaign data.")
for drop_data in time_drops_data: for drop_data in time_drops_data:
drop_id = drop_data.get("id") drop_id = drop_data.get("id")
if not drop_id: if not drop_id:
self.stdout.write(self.style.WARNING("Skipping drop with missing ID.")) self.stdout.write(self.style.WARNING("Skipping drop with missing ID."))
logger.warning("Skipping drop with missing ID.")
continue continue
drop_start_str = drop_data.get("startAt") drop_start_str = drop_data.get("startAt")
@ -192,6 +299,7 @@ class Command(BaseCommand):
if not drop_start_dt or not drop_end_dt: if not drop_start_dt or not drop_end_dt:
self.stdout.write(self.style.WARNING(f"Drop {drop_id} missing start or end date. Skipping.")) self.stdout.write(self.style.WARNING(f"Drop {drop_id} missing start or end date. Skipping."))
logger.warning("Drop %s missing start or end date. Skipping.", drop_id)
continue continue
drop_defaults = { drop_defaults = {
@ -206,8 +314,10 @@ class Command(BaseCommand):
drop_obj, created = TimeBasedDrop.objects.update_or_create(drop_id=drop_id, defaults=drop_defaults) drop_obj, created = TimeBasedDrop.objects.update_or_create(drop_id=drop_id, defaults=drop_defaults)
if created: if created:
self.stdout.write(self.style.SUCCESS(f" Created Time Drop: {drop_obj.name}")) self.stdout.write(self.style.SUCCESS(f" Created Time Drop: {drop_obj.name}"))
stats["drops_created"] += 1
else: else:
self.stdout.write(f" Updated/Found Time Drop: {drop_obj.name}") self.stdout.write(f" Updated/Found Time Drop: {drop_obj.name}")
stats["drops_updated"] += 1
# --- 5. Process Benefits within the Drop --- # --- 5. Process Benefits within the Drop ---
benefits_data = drop_data.get("benefitEdges", []) benefits_data = drop_data.get("benefitEdges", [])
@ -216,6 +326,7 @@ class Command(BaseCommand):
benefit_info = edge.get("benefit") benefit_info = edge.get("benefit")
if not benefit_info or not benefit_info.get("id"): if not benefit_info or not benefit_info.get("id"):
self.stdout.write(self.style.WARNING(" Skipping benefit edge with missing data or ID.")) self.stdout.write(self.style.WARNING(" Skipping benefit edge with missing data or ID."))
logger.warning("Skipping benefit edge with missing data or ID.")
continue continue
benefit_id = benefit_info["id"] benefit_id = benefit_info["id"]
@ -230,41 +341,47 @@ class Command(BaseCommand):
try: try:
benefit_game_obj = Game.objects.get(game_id=benefit_game_data["id"]) benefit_game_obj = Game.objects.get(game_id=benefit_game_data["id"])
except Game.DoesNotExist: except Game.DoesNotExist:
self.stdout.write( warning_msg = (
self.style.WARNING( f"Game {benefit_game_data.get('name')} for benefit {benefit_id} "
f" Game {benefit_game_data.get('name')} for benefit {benefit_id} not found. Creating minimally.", # noqa: E501 f"not found. Creating minimally."
),
) )
self.stdout.write(self.style.WARNING(f" {warning_msg}"))
logger.warning(warning_msg)
# Optionally create a minimal game entry here if desired # Optionally create a minimal game entry here if desired
benefit_game_obj, _ = Game.objects.update_or_create( benefit_game_obj, game_created = Game.objects.update_or_create(
game_id=benefit_game_data["id"], game_id=benefit_game_data["id"],
defaults={"display_name": benefit_game_data.get("name", "Unknown Game")}, defaults={"display_name": benefit_game_data.get("name", "Unknown Game")},
) )
if game_created:
stats["games_created"] += 1
benefit_owner_obj = None benefit_owner_obj = None
if benefit_owner_data and benefit_owner_data.get("id"): if benefit_owner_data and benefit_owner_data.get("id"):
try: try:
benefit_owner_obj = Organization.objects.get(org_id=benefit_owner_data["id"]) benefit_owner_obj = Organization.objects.get(org_id=benefit_owner_data["id"])
except Organization.DoesNotExist: except Organization.DoesNotExist:
self.stdout.write( warning_msg = (
self.style.WARNING( f"Organization {benefit_owner_data.get('name')} for benefit {benefit_id} "
f" Organization {benefit_owner_data.get('name')} for benefit {benefit_id} not found. Creating minimally.", # noqa: E501 f"not found. Creating minimally."
),
) )
benefit_owner_obj, _ = Organization.objects.update_or_create( self.stdout.write(self.style.WARNING(f" {warning_msg}"))
logger.warning(warning_msg)
benefit_owner_obj, org_created = Organization.objects.update_or_create(
org_id=benefit_owner_data["id"], org_id=benefit_owner_data["id"],
defaults={"name": benefit_owner_data.get("name", "Unknown Org")}, defaults={"name": benefit_owner_data.get("name", "Unknown Org")},
) )
if org_created:
stats["orgs_created"] += 1
benefit_defaults = { benefit_defaults = {
"name": benefit_info.get("name"), "name": benefit_info.get("name"),
"image_asset_url": benefit_info.get("imageAssetURL"), "image_asset_url": benefit_info.get("imageAssetURL"),
"entitlement_limit": edge.get("entitlementLimit", 1), # Get limit from edge "entitlement_limit": edge.get("entitlementLimit", 1), # Get limit from edge
"distribution_type": benefit_info.get("distributionType"), "distribution_type": benefit_info.get("distributionType"),
"created_at": benefit_created_dt or timezone.now(), # Provide a default if missing "twitch_created_at": benefit_created_dt or timezone.now(), # Provide a default if missing
"game": benefit_game_obj, "game": benefit_game_obj,
"owner_organization": benefit_owner_obj, "owner_organization": benefit_owner_obj,
# 'is_ios_available': benefit_info.get('isIosAvailable', False) "is_ios_available": benefit_info.get("isIosAvailable", False),
} }
benefit_obj, b_created = Benefit.objects.update_or_create( benefit_obj, b_created = Benefit.objects.update_or_create(
@ -273,21 +390,28 @@ class Command(BaseCommand):
) )
if b_created: if b_created:
self.stdout.write(self.style.SUCCESS(f" Created Benefit: {benefit_obj.name}")) self.stdout.write(self.style.SUCCESS(f" Created Benefit: {benefit_obj.name}"))
stats["benefits_created"] += 1
else: else:
self.stdout.write(f" Updated/Found Benefit: {benefit_obj.name}") self.stdout.write(f" Updated/Found Benefit: {benefit_obj.name}")
stats["benefits_updated"] += 1
benefit_objs.append(benefit_obj) benefit_objs.append(benefit_obj)
# Set the ManyToMany relationship for the drop # Set the ManyToMany relationship for the drop
if benefit_objs: if benefit_objs:
drop_obj.benefits.set(benefit_objs) drop_obj.benefits.set(benefit_objs)
self.stdout.write(f" Associated {len(benefit_objs)} benefits with drop {drop_obj.name}.") drops_msg = f"Associated {len(benefit_objs)} benefits with drop {drop_obj.name}."
self.stdout.write(f" {drops_msg}")
logger.info("Associated %s benefits with drop %s.", len(benefit_objs), drop_obj.name)
except KeyError as e: except KeyError as e:
msg = f"Error: Missing expected key in JSON data - {e}" msg = f"Error: Missing expected key in JSON data - {e}"
logger.exception(msg)
raise CommandError(msg) from e raise CommandError(msg) from e
except Exception as e: except Exception as e:
# The transaction will be rolled back automatically on exception # The transaction will be rolled back automatically on exception
msg = f"An error occurred during import: {e}" msg = f"An error occurred during import: {e}"
logger.exception(msg)
raise CommandError(msg) from e raise CommandError(msg) from e
self.stdout.write(self.style.SUCCESS("Import process completed successfully.")) # Return statistics from this file
return stats

View File

@ -30,40 +30,24 @@ def get_home(request: HttpRequest) -> HttpResponse:
Returns: Returns:
HttpResponse: The response object HttpResponse: The response object
""" """
now = timezone.now() now: timezone.datetime = timezone.now()
grouped_drops = defaultdict(list) grouped_drops = defaultdict(list)
# Query for active drops, efficiently fetching related campaign and game
# Also prefetch benefits if you need them in the template
current_drops_qs = ( current_drops_qs = (
TimeBasedDrop.objects.filter(start_at__lte=now, end_at__gte=now) TimeBasedDrop.objects.filter(start_at__lte=now, end_at__gte=now)
.select_related( .select_related("campaign__game")
"campaign__game", # Follows ForeignKey relationships campaign -> game .prefetch_related("benefits")
) .order_by("campaign__game__display_name", "name")
.prefetch_related(
"benefits", # Efficiently fetches ManyToMany benefits
)
.order_by(
"campaign__game__display_name", # Order by game name first
"name", # Then by drop name
)
) )
# Group the drops by game in Python
for drop in current_drops_qs: for drop in current_drops_qs:
# Check if the drop has an associated campaign and game
if drop.campaign and drop.campaign.game: if drop.campaign and drop.campaign.game:
game = drop.campaign.game game: Game = drop.campaign.game
grouped_drops[game].append(drop) grouped_drops[game].append(drop)
else: else:
# Handle drops without a game (optional, based on your data integrity) logger.warning("Drop %s does not have an associated game or campaign.", drop.name)
# You could group them under a 'None' key or log a warning
# grouped_drops[None].append(drop)
pass # Or ignore them
context = { context = {"grouped_drops": dict(grouped_drops)}
"grouped_drops": dict(grouped_drops), # Convert defaultdict back to dict for template if preferred
}
return render(request, "index.html", context) return render(request, "index.html", context)

View File

@ -17,6 +17,9 @@ dev = ["pytest", "pytest-django"]
[tool.ruff] [tool.ruff]
lint.select = ["ALL"] lint.select = ["ALL"]
preview = true
unsafe-fixes = true
fix = true
lint.pydocstyle.convention = "google" lint.pydocstyle.convention = "google"
lint.isort.required-imports = ["from __future__ import annotations"] lint.isort.required-imports = ["from __future__ import annotations"]
line-length = 120 line-length = 120