Refactor models

This commit is contained in:
2025-05-01 02:41:25 +02:00
parent d137ad61f0
commit d7b31e1d42
17 changed files with 704 additions and 1392 deletions

View File

View File

View File

@ -0,0 +1,293 @@
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from django.core.management.base import BaseCommand, CommandError, CommandParser
from django.db import transaction
from django.utils import timezone
from django.utils.dateparse import parse_datetime
from core.models import Benefit, DropCampaign, Game, Organization, TimeBasedDrop
class Command(BaseCommand):
"""Imports Twitch Drop campaign data from a specified JSON file into the database.
This command reads a JSON file containing Twitch Drop campaign information,
typically obtained from Twitch's GQL endpoint or similar sources. It parses
the JSON, expecting a structure containing details about the campaign,
associated game, owner organization, time-based drops, and their benefits.
The command uses a database transaction to ensure atomicity, meaning either
all data is imported successfully, or no changes are made to the database
if an error occurs during the import process.
It performs the following steps:
1. Parses the command-line argument for the path to the JSON file.
2. Opens and reads the JSON file, handling potential file-related errors.
3. Navigates the JSON structure to find the core 'dropCampaign' data and
the associated Twitch User ID.
4. Within a transaction:
a. Processes Game data: Creates or updates a `Game` record.
b. Processes Owner Organization data: Creates or updates an `Organization` record.
c. Processes Drop Campaign data: Creates or updates a `DropCampaign` record,
linking it to the Game and Organization. Parses and stores start/end times.
d. Processes Time Based Drops: Iterates through `timeBasedDrops` within the campaign.
i. For each drop: Creates or updates a `TimeBasedDrop` record, linking it
to the `DropCampaign`. Parses and stores start/end times and required watch time.
ii. Processes Benefits within each drop: Iterates through `benefitEdges`.
- Creates or updates `Benefit` records associated with the drop.
- Handles potential missing related Game or Organization for the benefit
by creating minimal placeholder records if necessary.
- Associates the created/updated `Benefit` objects with the current
`TimeBasedDrop` via a ManyToMany relationship.
5. Provides feedback to the console about created or updated records and any warnings
or errors encountered.
6. Raises `CommandError` for critical issues like file not found, JSON errors,
missing essential data (e.g., campaign ID), or unexpected exceptions during processing.
Args:
json_file (str): The filesystem path to the JSON file containing the
Twitch Drop campaign data.
Example Usage:
python manage.py import_twitch_drops /path/to/your/twitch_drops.json
"""
help = "Imports Twitch Drop campaign data from a specific JSON file structure."
def add_arguments(self, parser: CommandParser) -> None: # noqa: D102
parser.add_argument("json_file", type=str, help="Path to the Twitch Drop JSON file")
def handle(self, *args: Any, **options) -> None: # noqa: ANN003, ANN401, ARG002, C901, PLR0912, PLR0915
"""Import data to DB.
Args:
*args: Positional arguments.
**options: Keyword arguments containing the command-line options.
Raises:
CommandError: If the JSON file is not found, cannot be decoded,
or if there are issues with the JSON structure.
"""
json_file_path = options["json_file"]
self.stdout.write(f"Starting import from {json_file_path}...")
try:
with Path(json_file_path).open(encoding="utf-8") as f:
raw_data = json.load(f)
except FileNotFoundError as e:
msg = f"Error: File not found at {json_file_path}"
raise CommandError(msg) from e
except json.JSONDecodeError as e:
msg = f"Error: Could not decode JSON from {json_file_path}"
raise CommandError(msg) from e
except Exception as e:
msg = f"Error reading file: {e}"
raise CommandError(msg) from e
# Navigate to the relevant part of the JSON structure
try:
user_data = raw_data.get("data", {}).get("user", {})
campaign_data = user_data.get("dropCampaign")
if not campaign_data:
msg = "Error: 'dropCampaign' key not found or is null in the JSON data."
raise CommandError(msg)
except AttributeError as e:
msg = "Error: Unexpected JSON structure. Could not find 'data' or 'user'."
raise CommandError(msg) from e
try:
# Use a transaction to ensure atomicity
with transaction.atomic():
# --- 1. Process Game ---
game_data = campaign_data.get("game")
game_obj = None
if game_data and game_data.get("id"):
game_defaults = {
"slug": game_data.get("slug"),
"display_name": game_data.get("displayName"),
}
game_obj, created = Game.objects.update_or_create(
game_id=game_data["id"],
defaults=game_defaults,
)
if created:
self.stdout.write(self.style.SUCCESS(f"Created Game: {game_obj.display_name}"))
else:
self.stdout.write(f"Updated/Found Game: {game_obj.display_name}")
else:
self.stdout.write(self.style.WARNING("No game data found in campaign."))
# --- 2. Process Owner Organization ---
owner_data = campaign_data.get("owner")
owner_obj = None
if owner_data and owner_data.get("id"):
owner_defaults = {"name": owner_data.get("name")}
owner_obj, created = Organization.objects.update_or_create(
org_id=owner_data["id"],
defaults=owner_defaults,
)
if created:
self.stdout.write(self.style.SUCCESS(f"Created Organization: {owner_obj.name}"))
else:
self.stdout.write(f"Updated/Found Organization: {owner_obj.name}")
else:
self.stdout.write(self.style.WARNING("No owner organization data found in campaign."))
# --- 3. Process Drop Campaign ---
campaign_id = campaign_data.get("id")
if not campaign_id:
msg = "Error: Campaign ID is missing."
raise CommandError(msg) # noqa: TRY301
start_at_str = campaign_data.get("startAt")
end_at_str = campaign_data.get("endAt")
start_at_dt = parse_datetime(start_at_str) if start_at_str else None
end_at_dt = parse_datetime(end_at_str) if end_at_str else None
if not start_at_dt or not end_at_dt:
self.stdout.write(self.style.WARNING(f"Campaign {campaign_id} missing start or end date."))
# Decide if you want to skip or handle this case differently
campaign_defaults = {
"name": campaign_data.get("name"),
"description": campaign_data.get("description"),
"status": campaign_data.get("status"),
"start_at": start_at_dt,
"end_at": end_at_dt,
"image_url": campaign_data.get("imageURL"),
"details_url": campaign_data.get("detailsURL"),
"account_link_url": campaign_data.get("accountLinkURL"),
"game": game_obj,
"owner": owner_obj,
}
campaign_obj, created = DropCampaign.objects.update_or_create(
campaign_id=campaign_id,
defaults=campaign_defaults,
)
if created:
self.stdout.write(self.style.SUCCESS(f"Created Campaign: {campaign_obj.name}"))
else:
self.stdout.write(f"Updated/Found Campaign: {campaign_obj.name}")
# --- 4. Process Time Based Drops ---
time_drops_data = campaign_data.get("timeBasedDrops", [])
if not time_drops_data:
self.stdout.write(self.style.NOTICE("No timeBasedDrops found in campaign data."))
for drop_data in time_drops_data:
drop_id = drop_data.get("id")
if not drop_id:
self.stdout.write(self.style.WARNING("Skipping drop with missing ID."))
continue
drop_start_str = drop_data.get("startAt")
drop_end_str = drop_data.get("endAt")
drop_start_dt = parse_datetime(drop_start_str) if drop_start_str else None
drop_end_dt = parse_datetime(drop_end_str) if drop_end_str else None
if not drop_start_dt or not drop_end_dt:
self.stdout.write(self.style.WARNING(f"Drop {drop_id} missing start or end date. Skipping."))
continue
drop_defaults = {
"name": drop_data.get("name"),
"start_at": drop_start_dt,
"end_at": drop_end_dt,
"required_minutes_watched": drop_data.get("requiredMinutesWatched"),
"campaign": campaign_obj,
# Add required_subs if needed: 'required_subs': drop_data.get('requiredSubs', 0)
}
drop_obj, created = TimeBasedDrop.objects.update_or_create(drop_id=drop_id, defaults=drop_defaults)
if created:
self.stdout.write(self.style.SUCCESS(f" Created Time Drop: {drop_obj.name}"))
else:
self.stdout.write(f" Updated/Found Time Drop: {drop_obj.name}")
# --- 5. Process Benefits within the Drop ---
benefits_data = drop_data.get("benefitEdges", [])
benefit_objs = []
for edge in benefits_data:
benefit_info = edge.get("benefit")
if not benefit_info or not benefit_info.get("id"):
self.stdout.write(self.style.WARNING(" Skipping benefit edge with missing data or ID."))
continue
benefit_id = benefit_info["id"]
benefit_game_data = benefit_info.get("game")
benefit_owner_data = benefit_info.get("ownerOrganization")
benefit_created_str = benefit_info.get("createdAt")
benefit_created_dt = parse_datetime(benefit_created_str) if benefit_created_str else None
benefit_game_obj = None
if benefit_game_data and benefit_game_data.get("id"):
# Assuming game ID is unique, get it directly
try:
benefit_game_obj = Game.objects.get(game_id=benefit_game_data["id"])
except Game.DoesNotExist:
self.stdout.write(
self.style.WARNING(
f" Game {benefit_game_data.get('name')} for benefit {benefit_id} not found. Creating minimally.", # noqa: E501
),
)
# Optionally create a minimal game entry here if desired
benefit_game_obj, _ = Game.objects.update_or_create(
game_id=benefit_game_data["id"],
defaults={"display_name": benefit_game_data.get("name", "Unknown Game")},
)
benefit_owner_obj = None
if benefit_owner_data and benefit_owner_data.get("id"):
try:
benefit_owner_obj = Organization.objects.get(org_id=benefit_owner_data["id"])
except Organization.DoesNotExist:
self.stdout.write(
self.style.WARNING(
f" Organization {benefit_owner_data.get('name')} for benefit {benefit_id} not found. Creating minimally.", # noqa: E501
),
)
benefit_owner_obj, _ = Organization.objects.update_or_create(
org_id=benefit_owner_data["id"],
defaults={"name": benefit_owner_data.get("name", "Unknown Org")},
)
benefit_defaults = {
"name": benefit_info.get("name"),
"image_asset_url": benefit_info.get("imageAssetURL"),
"entitlement_limit": edge.get("entitlementLimit", 1), # Get limit from edge
"distribution_type": benefit_info.get("distributionType"),
"created_at": benefit_created_dt or timezone.now(), # Provide a default if missing
"game": benefit_game_obj,
"owner_organization": benefit_owner_obj,
# 'is_ios_available': benefit_info.get('isIosAvailable', False)
}
benefit_obj, b_created = Benefit.objects.update_or_create(
benefit_id=benefit_id,
defaults=benefit_defaults,
)
if b_created:
self.stdout.write(self.style.SUCCESS(f" Created Benefit: {benefit_obj.name}"))
else:
self.stdout.write(f" Updated/Found Benefit: {benefit_obj.name}")
benefit_objs.append(benefit_obj)
# Set the ManyToMany relationship for the drop
if benefit_objs:
drop_obj.benefits.set(benefit_objs)
self.stdout.write(f" Associated {len(benefit_objs)} benefits with drop {drop_obj.name}.")
except KeyError as e:
msg = f"Error: Missing expected key in JSON data - {e}"
raise CommandError(msg) from e
except Exception as e:
# The transaction will be rolled back automatically on exception
msg = f"An error occurred during import: {e}"
raise CommandError(msg) from e
self.stdout.write(self.style.SUCCESS("Import process completed successfully."))