Refactor import_drops command

This commit is contained in:
Joakim Hellsén 2025-08-12 17:09:28 +02:00
commit 5878ec186f
10 changed files with 495 additions and 88 deletions

5
.gitignore vendored
View file

@ -209,6 +209,7 @@ __marimo__/
# TTVDrops # TTVDrops
responses/ responses/
# Django static files # Directories tied to our project
# Django collects static files into a single directory for production use.
staticfiles/ staticfiles/
archive/
check_these_please/

3
.vscode/launch.json vendored
View file

@ -18,7 +18,8 @@
"request": "launch", "request": "launch",
"args": [ "args": [
"import_drops", "import_drops",
"C:\\Code\\TwitchDropsMiner\\responses" // C:\Code\TwitchDropsMiner\responses\processed\broken_json
"C:\\Code\\TwitchDropsMiner\\responses\\processed\\broken_json"
], ],
"django": false, "django": false,
"autoStartBrowser": false, "autoStartBrowser": false,

View file

@ -1,17 +1,23 @@
from __future__ import annotations from __future__ import annotations
import json import json
import re
import shutil import shutil
import traceback import traceback
from pathlib import Path from pathlib import Path
from typing import Any from typing import TYPE_CHECKING, Any
import orjson import orjson
from django.core.management.base import BaseCommand, CommandError, CommandParser from django.core.management.base import BaseCommand, CommandError, CommandParser
from django.db import transaction from django.db import transaction
from django.utils import timezone
from django.utils.dateparse import parse_datetime
from twitch.models import DropBenefit, DropBenefitEdge, DropCampaign, Game, Organization, TimeBasedDrop from twitch.models import DropBenefit, DropBenefitEdge, DropCampaign, Game, Organization, TimeBasedDrop
if TYPE_CHECKING:
from datetime import datetime
class Command(BaseCommand): class Command(BaseCommand):
"""Import Twitch drop campaign data from a JSON file or directory of JSON files.""" """Import Twitch drop campaign data from a JSON file or directory of JSON files."""
@ -78,10 +84,20 @@ class Command(BaseCommand):
except CommandError as e: except CommandError as e:
self.stdout.write(self.style.ERROR(f"Error processing {json_file}: {e}")) self.stdout.write(self.style.ERROR(f"Error processing {json_file}: {e}"))
except (orjson.JSONDecodeError, json.JSONDecodeError): except (orjson.JSONDecodeError, json.JSONDecodeError):
broken_json_dir: Path = processed_path / "broken_json" # Attempt to clean trailing broken JSON and retry parsing
broken_json_dir.mkdir(parents=True, exist_ok=True) try:
self.stdout.write(self.style.WARNING(f"Invalid JSON in '{json_file}'. Moving to '{broken_json_dir}'.")) self.clean_file(json_file)
self.move_file(json_file, broken_json_dir / json_file.name) self.stdout.write(self.style.SUCCESS(f"Cleaned JSON in '{json_file.name}', retrying import."))
# re-process the cleaned file
self._process_file(json_file, processed_path)
except (orjson.JSONDecodeError, json.JSONDecodeError):
# Still invalid after cleanup, move to broken_json
broken_json_dir: Path = processed_path / "broken_json"
broken_json_dir.mkdir(parents=True, exist_ok=True)
self.stdout.write(
self.style.WARNING(f"Invalid JSON in '{json_file}', even after cleanup. Moving to '{broken_json_dir}'.")
)
self.move_file(json_file, broken_json_dir / json_file.name)
except (ValueError, TypeError, AttributeError, KeyError, IndexError): except (ValueError, TypeError, AttributeError, KeyError, IndexError):
self.stdout.write(self.style.ERROR(f"Data error processing {json_file}")) self.stdout.write(self.style.ERROR(f"Data error processing {json_file}"))
self.stdout.write(self.style.ERROR(traceback.format_exc())) self.stdout.write(self.style.ERROR(traceback.format_exc()))
@ -89,46 +105,114 @@ class Command(BaseCommand):
msg: str = f"Processed {total_files} JSON files in {directory}. Moved processed files to {processed_path}." msg: str = f"Processed {total_files} JSON files in {directory}. Moved processed files to {processed_path}."
self.stdout.write(self.style.SUCCESS(msg)) self.stdout.write(self.style.SUCCESS(msg))
def _process_file(self, file_path: Path, processed_path: Path) -> None: def _process_file(self, file_path: Path, processed_path: Path) -> None: # noqa: C901, PLR0911, PLR0912
"""Process a single JSON file. """Process a single JSON file.
Args: Args:
file_path: Path to the JSON file. file_path: Path to the JSON file.
processed_path: Subdirectory to move processed files to. processed_path: Subdirectory to move processed files to.
""" """
data = orjson.loads(file_path.read_text(encoding="utf-8")) data = orjson.loads(file_path.read_bytes())
broken_dir: Path = processed_path / "broken" broken_dir: Path = processed_path / "broken"
broken_dir.mkdir(parents=True, exist_ok=True) broken_dir.mkdir(parents=True, exist_ok=True)
# Check for specific keywords that indicate the file is not a valid drop campaign response
# and move it to the "broken" directory.
# These keywords are based on common patterns in Twitch API responses that are not related to drop campaigns.
# If any of these keywords are found in the file, it is likely not a drop campaign response,
# and we move it to the broken directory.
probably_shit: list[str] = [ probably_shit: list[str] = [
"ChannelPointsContext", "ChannelPointsContext",
"ClaimCommunityPoints",
"DirectoryPage_Game",
"DropCurrentSessionContext", "DropCurrentSessionContext",
"DropsHighlightService_AvailableDrops", "DropsHighlightService_AvailableDrops",
"DropsPage_ClaimDropRewards", "DropsPage_ClaimDropRewards",
"Inventory", "Inventory",
"OnsiteNotifications_DeleteNotification",
"PlaybackAccessToken", "PlaybackAccessToken",
"streamPlaybackAccessToken", "streamPlaybackAccessToken",
"VideoPlayerStreamInfoOverlayChannel", "VideoPlayerStreamInfoOverlayChannel",
"OnsiteNotifications_DeleteNotification",
"DirectoryPage_Game",
] ]
for keyword in probably_shit: for keyword in probably_shit:
if f'"operationName": "{keyword}"' in file_path.read_text(): if f'"operationName": "{keyword}"' in data:
target_dir: Path = broken_dir / keyword target_dir: Path = broken_dir / keyword
target_dir.mkdir(parents=True, exist_ok=True) target_dir.mkdir(parents=True, exist_ok=True)
self.stdout.write(msg=f"Trying to move {file_path!s} to {target_dir / file_path.name!s}")
self.move_file(file_path, target_dir / file_path.name) self.move_file(file_path, target_dir / file_path.name)
self.stdout.write(f"Moved {file_path} to {target_dir} (matched '{keyword}')") self.stdout.write(f"Moved {file_path} to {target_dir} (matched '{keyword}')")
return return
# Some responses have errors:
# {"errors": [{"message": "service timeout", "path": ["currentUser", "dropCampaigns"]}]}
# Move them to the "actual_error" directory
if isinstance(data, dict) and data.get("errors"):
actual_error_dir: Path = processed_path / "actual_error"
actual_error_dir.mkdir(parents=True, exist_ok=True)
self.move_file(file_path, actual_error_dir / file_path.name)
self.stdout.write(f"Moved {file_path} to {actual_error_dir} (contains Twitch errors)")
return
# If file has "__typename": "BroadcastSettings" move it to the "broadcast_settings" directory
if '"__typename": "BroadcastSettings"' in data:
broadcast_settings_dir: Path = processed_path / "broadcast_settings"
broadcast_settings_dir.mkdir(parents=True, exist_ok=True)
self.move_file(file_path, broadcast_settings_dir / file_path.name)
return
# Remove files that only have a channel.viewerDropCampaigns and nothing more.
# This file is useless.
if (
isinstance(data, dict)
and data.get("data", {}).keys() == {"channel"}
and data["data"]["channel"].keys() == {"id", "viewerDropCampaigns", "__typename"}
and data["data"]["channel"]["viewerDropCampaigns"] is None
):
file_path.unlink()
self.stdout.write(f"Removed {file_path} (only contains empty viewerDropCampaigns)")
return
# If file only contains {"data": {"user": null}} remove the file
if isinstance(data, dict) and data.get("data", {}).keys() == {"user"} and data["data"]["user"] is None:
file_path.unlink()
self.stdout.write(f"Removed {file_path} (only contains empty user)")
return
# If file only contains {"data": {"game": {}}} remove the file
if isinstance(data, dict) and data.get("data", {}).keys() == {"game"} and len(data["data"]) == 1:
game_data = data["data"]["game"]
if isinstance(game_data, dict) and game_data.get("__typename") == "Game":
file_path.unlink()
self.stdout.write(f"Removed {file_path} (only contains game data)")
return
# If file has "__typename": "DropCurrentSession" move it to the "drop_current_session" directory so we can process it separately.
if (
isinstance(data, dict)
and data.get("data", {}).get("currentUser", {}).get("dropCurrentSession", {}).get("__typename") == "DropCurrentSession"
):
drop_current_session_dir: Path = processed_path / "drop_current_session"
drop_current_session_dir.mkdir(parents=True, exist_ok=True)
self.move_file(file_path, drop_current_session_dir / file_path.name)
return
# If file is a list with one item: {"data": {"user": null}}, remove it
if (
isinstance(data, list)
and len(data) == 1
and isinstance(data[0], dict)
and data[0].get("data", {}).keys() == {"user"}
and data[0]["data"]["user"] is None
):
file_path.unlink()
self.stdout.write(f"Removed {file_path} (list with one item: empty user)")
return
if isinstance(data, list): if isinstance(data, list):
for _item in data: for _item in data:
self.import_drop_campaign(_item) self.import_drop_campaign(_item, file_path=file_path)
else: else:
self.import_drop_campaign(data) self.import_drop_campaign(data, file_path=file_path)
self.move_file(file_path, processed_path) self.move_file(file_path, processed_path)
@ -150,11 +234,12 @@ class Command(BaseCommand):
self.stdout.write(self.style.ERROR(f"Error moving {file_path!s} to {processed_path!s}: {e}")) self.stdout.write(self.style.ERROR(f"Error moving {file_path!s} to {processed_path!s}: {e}"))
traceback.print_exc() traceback.print_exc()
def import_drop_campaign(self, data: dict[str, Any]) -> None: def import_drop_campaign(self, data: dict[str, Any], file_path: Path) -> None:
"""Find the key with all the data. """Find the key with all the data.
Args: Args:
data: The JSON data. data: The JSON data.
file_path: The path to the file being processed.
Raises: Raises:
CommandError: If the JSON structure is invalid. CommandError: If the JSON structure is invalid.
@ -165,27 +250,39 @@ class Command(BaseCommand):
if "user" in data["data"] and "dropCampaign" in data["data"]["user"]: if "user" in data["data"] and "dropCampaign" in data["data"]["user"]:
drop_campaign_data = data["data"]["user"]["dropCampaign"] drop_campaign_data = data["data"]["user"]["dropCampaign"]
self.import_to_db(drop_campaign_data) self.import_to_db(drop_campaign_data, file_path=file_path)
elif "currentUser" in data["data"] and "dropCampaigns" in data["data"]["currentUser"]: elif "currentUser" in data["data"] and "dropCampaigns" in data["data"]["currentUser"]:
campaigns = data["data"]["currentUser"]["dropCampaigns"] campaigns = data["data"]["currentUser"]["dropCampaigns"]
for drop_campaign_data in campaigns: for drop_campaign_data in campaigns:
self.import_to_db(drop_campaign_data) self.import_to_db(drop_campaign_data, file_path=file_path)
elif "viewerDropCampaigns" in data["data"].get("channel", {}):
campaigns = data["data"]["channel"]["viewerDropCampaigns"]
if isinstance(campaigns, list):
for drop_campaign_data in campaigns:
self.import_to_db(drop_campaign_data, file_path=file_path)
elif isinstance(campaigns, dict):
self.import_to_db(campaigns, file_path=file_path)
else: else:
msg = "Invalid JSON structure: Missing either data.user.dropCampaign or data.currentUser.dropCampaigns" msg = "Invalid JSON structure: Missing either data.user.dropCampaign or data.currentUser.dropCampaigns"
raise CommandError(msg) raise CommandError(msg)
def import_to_db(self, campaign_data: dict[str, Any]) -> None: def import_to_db(self, campaign_data: dict[str, Any], file_path: Path) -> None:
"""Import drop campaign data into the database with retry logic for SQLite locks. """Import drop campaign data into the database with retry logic for SQLite locks.
Args: Args:
campaign_data: The drop campaign data to import. campaign_data: The drop campaign data to import.
file_path: The path to the file being processed.
""" """
with transaction.atomic(): with transaction.atomic():
game: Game = self.game_update_or_create(campaign_data=campaign_data) game: Game = self.game_update_or_create(campaign_data=campaign_data)
organization: Organization = self.owner_update_or_create(campaign_data=campaign_data) organization: Organization | None = self.owner_update_or_create(campaign_data=campaign_data, file_path=file_path)
if organization is None:
self.stdout.write(self.style.WARNING("No organization found for this campaign, skipping drop campaign import."))
return
drop_campaign: DropCampaign = self.drop_campaign_update_or_get( drop_campaign: DropCampaign = self.drop_campaign_update_or_get(
campaign_data=campaign_data, campaign_data=campaign_data,
@ -194,32 +291,41 @@ class Command(BaseCommand):
) )
for drop_data in campaign_data.get("timeBasedDrops", []): for drop_data in campaign_data.get("timeBasedDrops", []):
time_based_drop, _ = TimeBasedDrop.objects.update_or_create( time_based_drop: TimeBasedDrop = self.create_time_based_drop(drop_campaign=drop_campaign, drop_data=drop_data)
id=drop_data["id"],
defaults={
"name": drop_data["name"],
"required_minutes_watched": drop_data["requiredMinutesWatched"],
"required_subs": drop_data.get("requiredSubs", 0),
"start_at": drop_data["startAt"],
"end_at": drop_data["endAt"],
"campaign": drop_campaign,
},
)
for benefit_edge in drop_data.get("benefitEdges", []): for benefit_edge in drop_data.get("benefitEdges", []):
benefit_data = benefit_edge["benefit"] benefit_defaults: dict[str, Any] = {}
benefit_data: dict[str, Any] = benefit_edge["benefit"]
benefit_name: str = str(benefit_data.get("name")).strip()
if benefit_name and benefit_name != "None":
benefit_defaults["name"] = benefit_name
img_asset: str = str(benefit_data.get("imageAssetURL")).strip()
if img_asset and img_asset != "None":
benefit_defaults["image_asset_url"] = img_asset
created_at: str = str(benefit_data.get("createdAt")).strip()
if created_at and created_at != "None":
benefit_defaults["created_at"] = created_at
ent_limit: int | None = benefit_data.get("entitlementLimit")
if ent_limit is not None:
benefit_defaults["entitlement_limit"] = ent_limit
ios_avail: bool | None = benefit_data.get("isIosAvailable")
if ios_avail is not None:
benefit_defaults["is_ios_available"] = ios_avail
dist_type: str | None = benefit_data.get("distributionType")
if dist_type is not None:
benefit_defaults["distribution_type"] = dist_type
benefit_defaults["game"] = game
benefit_defaults["owner_organization"] = organization
benefit, _ = DropBenefit.objects.update_or_create( benefit, _ = DropBenefit.objects.update_or_create(
id=benefit_data["id"], id=benefit_data["id"],
defaults={ defaults=benefit_defaults,
"name": benefit_data["name"],
"image_asset_url": benefit_data.get("imageAssetURL", ""),
"created_at": benefit_data["createdAt"],
"entitlement_limit": benefit_data.get("entitlementLimit", 1),
"is_ios_available": benefit_data.get("isIosAvailable", False),
"distribution_type": benefit_data["distributionType"],
"game": game,
"owner_organization": organization,
},
) )
DropBenefitEdge.objects.update_or_create( DropBenefitEdge.objects.update_or_create(
@ -231,46 +337,160 @@ class Command(BaseCommand):
) )
self.stdout.write(self.style.SUCCESS(f"Successfully imported drop campaign {drop_campaign.name} (ID: {drop_campaign.id})")) self.stdout.write(self.style.SUCCESS(f"Successfully imported drop campaign {drop_campaign.name} (ID: {drop_campaign.id})"))
def drop_campaign_update_or_get(self, campaign_data: dict[str, Any], game: Game, organization: Organization) -> DropCampaign: def create_time_based_drop(self, drop_campaign: DropCampaign, drop_data: dict[str, Any]) -> TimeBasedDrop: # noqa: C901
"""Creates or updates a TimeBasedDrop instance based on the provided drop data.
Args:
drop_campaign (DropCampaign): The campaign to which the drop belongs.
drop_data (dict[str, Any]): A dictionary containing drop information. Expected keys include:
- "id" (str): The unique identifier for the drop (required).
- "name" (str, optional): The name of the drop.
- "requiredMinutesWatched" (int, optional): Minutes required to earn the drop.
- "requiredSubs" (int, optional): Number of subscriptions required to earn the drop.
- "startAt" (str, optional): ISO 8601 datetime string for when the drop starts.
- "endAt" (str, optional): ISO 8601 datetime string for when the drop ends.
Returns:
TimeBasedDrop: The created or updated TimeBasedDrop instance.
"""
defaults: dict[str, Any] = {}
name: str = drop_data.get("name", "")
if name:
defaults["name"] = name.strip()
# "requiredMinutesWatched": 240
required_minutes_watched: int = drop_data.get("requiredMinutesWatched", 0)
if required_minutes_watched:
defaults["required_minutes_watched"] = int(required_minutes_watched)
# "requiredSubs": 1,
required_subs: int = drop_data.get("requiredSubs", 0)
if required_subs:
defaults["required_subs"] = int(required_subs)
# "startAt": "2025-08-08T07:00:00Z",
# Model field is DateTimeField
start_at: str | None = drop_data.get("startAt")
if start_at:
# Convert to timezone-aware datetime
parsed_start_at: datetime | None = parse_datetime(start_at)
if parsed_start_at and timezone.is_naive(parsed_start_at):
parsed_start_at = timezone.make_aware(parsed_start_at)
if parsed_start_at:
defaults["start_at"] = parsed_start_at
# "endAt": "2025-02-04T10:59:59.999Z",
# Model field is DateTimeField
end_at: str | None = drop_data.get("endAt")
if end_at:
# Convert to timezone-aware datetime
parsed_end_at: datetime | None = parse_datetime(end_at)
if parsed_end_at and timezone.is_naive(parsed_end_at):
parsed_end_at = timezone.make_aware(parsed_end_at)
if parsed_end_at:
defaults["end_at"] = parsed_end_at
defaults["campaign"] = drop_campaign
time_based_drop, created = TimeBasedDrop.objects.update_or_create(id=drop_data["id"], defaults=defaults)
if created:
self.stdout.write(
self.style.SUCCESS(f"Successfully imported time-based drop {time_based_drop.name} (ID: {time_based_drop.id})")
)
return time_based_drop
def drop_campaign_update_or_get(self, campaign_data: dict[str, Any], game: Game, organization: Organization | None) -> DropCampaign: # noqa: C901
"""Update or create a drop campaign. """Update or create a drop campaign.
Args: Args:
campaign_data: The drop campaign data to import. campaign_data: The drop campaign data to import.
game: The game this drop campaign is for. game: The game this drop campaign is for.
organization: The company that owns the game. organization: The company that owns the game. If None, the campaign will not have an owner.
Returns: Returns:
Returns the DropCampaign object. Returns the DropCampaign object.
""" """
defaults: dict[str, Any] = {}
name = campaign_data.get("name")
if name is not None:
defaults["name"] = name
desc = campaign_data.get("description")
if desc is not None:
defaults["description"] = desc.replace("\\n", "\n")
details = campaign_data.get("detailsURL")
if details is not None:
defaults["details_url"] = details
acct_link = campaign_data.get("accountLinkURL")
if acct_link is not None:
defaults["account_link_url"] = acct_link
img = campaign_data.get("imageURL")
if img is not None:
defaults["image_url"] = img
start = campaign_data.get("startAt")
if start is not None:
defaults["start_at"] = start
end = campaign_data.get("endAt")
if end is not None:
defaults["end_at"] = end
is_conn = campaign_data.get("self", {}).get("isAccountConnected")
if is_conn is not None:
defaults["is_account_connected"] = is_conn
defaults["game"] = game
if organization:
defaults["owner"] = organization
drop_campaign, created = DropCampaign.objects.update_or_create( drop_campaign, created = DropCampaign.objects.update_or_create(
id=campaign_data["id"], id=campaign_data["id"],
defaults={ defaults=defaults,
"name": campaign_data.get("name", ""),
"description": campaign_data.get("description", "").replace("\\n", "\n"),
"details_url": campaign_data.get("detailsURL", ""),
"account_link_url": campaign_data.get("accountLinkURL", ""),
"image_url": campaign_data.get("imageURL", ""),
"start_at": campaign_data.get("startAt"),
"end_at": campaign_data.get("endAt"),
"is_account_connected": campaign_data.get("self", {}).get("isAccountConnected", False),
"game": game,
"owner": organization,
},
) )
if created: if created:
self.stdout.write(self.style.SUCCESS(f"Created new drop campaign: {drop_campaign.name} (ID: {drop_campaign.id})")) self.stdout.write(self.style.SUCCESS(f"Created new drop campaign: {drop_campaign.name} (ID: {drop_campaign.id})"))
return drop_campaign return drop_campaign
def owner_update_or_create(self, campaign_data: dict[str, Any]) -> Organization: def owner_update_or_create(self, campaign_data: dict[str, Any], file_path: Path) -> Organization | None:
"""Update or create an organization. """Update or create an organization.
Args: Args:
campaign_data: The drop campaign data to import. campaign_data: The drop campaign data to import.
file_path: Optional path to the file being processed, used for error handling.
Returns: Returns:
Returns the Organization object. Returns the Organization object.
""" """
org_data: dict[str, Any] = campaign_data["owner"] org_data: dict[str, Any] = campaign_data.get("owner", {})
if not org_data:
self.stdout.write(self.style.WARNING("No owner data found in campaign data. Attempting to find organization by game."))
# Try to find an organization by the game if possible
game_id: str | None = campaign_data.get("game", {}).get("id")
if game_id:
game: Game | None = Game.objects.filter(id=game_id).first()
if game:
if game.organizations.exists():
org: Organization | None = game.organizations.first()
if org:
self.stdout.write(self.style.SUCCESS(f"Found organization '{org.name}' for game '{game.display_name}'"))
return org
else:
self.stdout.write(self.style.WARNING(f"No game found with id '{game_id}' when looking up organization."))
# If not found, move the file for manual review
self.stdout.write(self.style.WARNING("No organization found for this campaign, moving file for review."))
todo_dir: Path = Path("check_these_please")
todo_dir.mkdir(parents=True, exist_ok=True)
self.move_file(
file_path,
todo_dir / file_path.name,
)
return None
organization, created = Organization.objects.update_or_create( organization, created = Organization.objects.update_or_create(
id=org_data["id"], id=org_data["id"],
defaults={"name": org_data["name"]}, defaults={"name": org_data["name"]},
@ -289,14 +509,67 @@ class Command(BaseCommand):
Returns the Game object. Returns the Game object.
""" """
game_data: dict[str, Any] = campaign_data["game"] game_data: dict[str, Any] = campaign_data["game"]
game, created = Game.objects.update_or_create(
id=game_data["id"], box_art_url: str = str(game_data.get("boxArtURL", "")).strip()
defaults={ display_name: str = str(game_data.get("displayName", "")).strip()
"slug": game_data.get("slug", ""), slug: str = str(game_data.get("slug", "")).strip()
"display_name": game_data["displayName"],
"box_art": game_data.get("boxArtURL", ""), defaults: dict[str, Any] = {}
}, if box_art_url:
defaults["box_art"] = box_art_url
if display_name:
defaults["display_name"] = display_name
if slug:
defaults["slug"] = slug
game: Game
game, created = self.get_or_update_if_changed(
model=Game,
lookup={"id": game_data["id"]},
defaults=defaults,
) )
if created: if created:
self.stdout.write(self.style.SUCCESS(f"Created new game: {game.display_name} (ID: {game.id})")) self.stdout.write(self.style.SUCCESS(f"Created new game: {game.display_name} (ID: {game.id})"))
return game return game
def get_or_update_if_changed(self, model: type[Any], lookup: dict[str, Any], defaults: dict[str, Any]) -> tuple[Any, bool]:
"""Get or create and update model instance only when fields change.
Args:
model: The Django model class.
lookup: Field lookup dictionary for get_or_create.
defaults: Field defaults to update when changed.
Returns:
A tuple of (instance, created) where created is a bool.
"""
obj, created = model.objects.get_or_create(**lookup, defaults=defaults)
if not created:
changed_fields = []
for field, new_value in defaults.items():
if getattr(obj, field) != new_value:
setattr(obj, field, new_value)
changed_fields.append(field)
if changed_fields:
obj.save(update_fields=changed_fields)
return obj, created
def clean_file(self, path: Path) -> None:
"""Strip trailing broken JSON after the last 'extensions' block."""
text: str = path.read_text(encoding="utf-8")
# Handle extensions block at end of a JSON array
cleaned: str = re.sub(
r'(?s),?\s*"extensions"\s*:\s*\{.*?\}\s*\}\s*\]\s*$',
"}]",
text,
)
if cleaned == text:
# Fallback for standalone extensions block
cleaned = re.sub(
r'(?s),?\s*"extensions"\s*:\s*\{.*?\}\s*$',
"}",
text,
)
path.write_text(cleaned, encoding="utf-8")

View file

@ -0,0 +1,31 @@
# Generated by Django 5.2.5 on 2025-08-07 02:35
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('twitch', '0001_initial'),
]
operations = [
migrations.AddField(
model_name='game',
name='name',
field=models.TextField(blank=True, db_index=True, default=''),
),
migrations.AlterField(
model_name='game',
name='display_name',
field=models.TextField(blank=True, db_index=True, default=''),
),
migrations.AddIndex(
model_name='game',
index=models.Index(fields=['name'], name='twitch_game_name_c92c15_idx'),
),
migrations.AddIndex(
model_name='game',
index=models.Index(fields=['box_art'], name='twitch_game_box_art_498a89_idx'),
),
]

View file

@ -0,0 +1,28 @@
# Generated by Django 5.2.5 on 2025-08-10 20:11
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('twitch', '0002_game_name_alter_game_display_name_and_more'),
]
operations = [
migrations.AlterField(
model_name='dropbenefit',
name='created_at',
field=models.DateTimeField(db_index=True, null=True),
),
migrations.AlterField(
model_name='dropbenefit',
name='distribution_type',
field=models.TextField(blank=True, db_index=True, default=''),
),
migrations.AlterField(
model_name='dropbenefit',
name='name',
field=models.TextField(blank=True, db_index=True, default='N/A'),
),
]

View file

@ -0,0 +1,23 @@
# Generated by Django 5.2.5 on 2025-08-10 20:49
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('twitch', '0003_alter_dropbenefit_created_at_and_more'),
]
operations = [
migrations.AlterField(
model_name='dropcampaign',
name='end_at',
field=models.DateTimeField(db_index=True, null=True),
),
migrations.AlterField(
model_name='dropcampaign',
name='start_at',
field=models.DateTimeField(db_index=True, null=True),
),
]

View file

@ -0,0 +1,28 @@
# Generated by Django 5.2.5 on 2025-08-10 20:50
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('twitch', '0004_alter_dropcampaign_end_at_and_more'),
]
operations = [
migrations.AlterField(
model_name='timebaseddrop',
name='end_at',
field=models.DateTimeField(db_index=True, null=True),
),
migrations.AlterField(
model_name='timebaseddrop',
name='required_minutes_watched',
field=models.PositiveIntegerField(db_index=True, null=True),
),
migrations.AlterField(
model_name='timebaseddrop',
name='start_at',
field=models.DateTimeField(db_index=True, null=True),
),
]

View file

@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import logging
from typing import ClassVar from typing import ClassVar
from django.db import models from django.db import models
@ -7,24 +8,41 @@ from django.utils import timezone
from accounts.models import User from accounts.models import User
logger: logging.Logger = logging.getLogger("ttvdrops")
class Game(models.Model): class Game(models.Model):
"""Represents a game on Twitch.""" """Represents a game on Twitch."""
id = models.TextField(primary_key=True) id = models.TextField(primary_key=True)
slug = models.TextField(blank=True, default="", db_index=True) slug = models.TextField(blank=True, default="", db_index=True)
display_name = models.TextField(db_index=True) name = models.TextField(blank=True, default="", db_index=True)
display_name = models.TextField(blank=True, default="", db_index=True)
box_art = models.URLField(max_length=500, blank=True, default="") box_art = models.URLField(max_length=500, blank=True, default="")
class Meta: class Meta:
indexes: ClassVar[list] = [ indexes: ClassVar[list] = [
models.Index(fields=["slug"]), models.Index(fields=["slug"]),
models.Index(fields=["display_name"]), models.Index(fields=["display_name"]),
models.Index(fields=["name"]),
models.Index(fields=["box_art"]),
] ]
def __str__(self) -> str: def __str__(self) -> str:
"""Return a string representation of the game.""" """Return a string representation of the game."""
return self.display_name if (self.display_name and self.name) and (self.display_name != self.name):
logger.warning(
"Game display name '%s' does not match name '%s'.",
self.display_name,
self.name,
)
return f"{self.display_name} ({self.name})"
return self.name or self.slug or self.id
@property
def organizations(self) -> models.QuerySet[Organization]:
"""Return all organizations that have drop campaigns for this game."""
return Organization.objects.filter(drop_campaigns__game=self).distinct()
class Organization(models.Model): class Organization(models.Model):
@ -52,8 +70,8 @@ class DropCampaign(models.Model):
details_url = models.URLField(max_length=500, blank=True, default="") details_url = models.URLField(max_length=500, blank=True, default="")
account_link_url = models.URLField(max_length=500, blank=True, default="") account_link_url = models.URLField(max_length=500, blank=True, default="")
image_url = models.URLField(max_length=500, blank=True, default="") image_url = models.URLField(max_length=500, blank=True, default="")
start_at = models.DateTimeField(db_index=True) start_at = models.DateTimeField(db_index=True, null=True)
end_at = models.DateTimeField(db_index=True) end_at = models.DateTimeField(db_index=True, null=True)
is_account_connected = models.BooleanField(default=False) is_account_connected = models.BooleanField(default=False)
# Foreign keys # Foreign keys
@ -125,12 +143,12 @@ class DropBenefit(models.Model):
"""Represents a benefit that can be earned from a drop.""" """Represents a benefit that can be earned from a drop."""
id = models.TextField(primary_key=True) id = models.TextField(primary_key=True)
name = models.TextField(db_index=True) name = models.TextField(db_index=True, blank=True, default="N/A")
image_asset_url = models.URLField(max_length=500, blank=True, default="") image_asset_url = models.URLField(max_length=500, blank=True, default="")
created_at = models.DateTimeField(db_index=True) created_at = models.DateTimeField(db_index=True, null=True)
entitlement_limit = models.PositiveIntegerField(default=1) entitlement_limit = models.PositiveIntegerField(default=1)
is_ios_available = models.BooleanField(default=False) is_ios_available = models.BooleanField(default=False)
distribution_type = models.TextField(db_index=True) distribution_type = models.TextField(db_index=True, blank=True, default="")
# Foreign keys # Foreign keys
game = models.ForeignKey(Game, on_delete=models.CASCADE, related_name="drop_benefits", db_index=True) game = models.ForeignKey(Game, on_delete=models.CASCADE, related_name="drop_benefits", db_index=True)
@ -155,10 +173,10 @@ class TimeBasedDrop(models.Model):
id = models.TextField(primary_key=True) id = models.TextField(primary_key=True)
name = models.TextField(db_index=True) name = models.TextField(db_index=True)
required_minutes_watched = models.PositiveIntegerField(db_index=True) required_minutes_watched = models.PositiveIntegerField(db_index=True, null=True)
required_subs = models.PositiveIntegerField(default=0) required_subs = models.PositiveIntegerField(default=0)
start_at = models.DateTimeField(db_index=True) start_at = models.DateTimeField(db_index=True, null=True)
end_at = models.DateTimeField(db_index=True) end_at = models.DateTimeField(db_index=True, null=True)
# Foreign keys # Foreign keys
campaign = models.ForeignKey(DropCampaign, on_delete=models.CASCADE, related_name="time_based_drops", db_index=True) campaign = models.ForeignKey(DropCampaign, on_delete=models.CASCADE, related_name="time_based_drops", db_index=True)

View file

@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import datetime
import logging import logging
from dataclasses import dataclass from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, cast from typing import TYPE_CHECKING, Any, cast
@ -16,8 +17,6 @@ from django.views.generic import DetailView, ListView
from twitch.models import DropCampaign, Game, NotificationSubscription, Organization, TimeBasedDrop from twitch.models import DropCampaign, Game, NotificationSubscription, Organization, TimeBasedDrop
if TYPE_CHECKING: if TYPE_CHECKING:
import datetime
from django.db.models import QuerySet from django.db.models import QuerySet
from django.http import HttpRequest, HttpResponse from django.http import HttpRequest, HttpResponse
from django.http.response import HttpResponseRedirect from django.http.response import HttpResponseRedirect
@ -287,14 +286,19 @@ class GameDetailView(DetailView):
) )
active_campaigns: list[DropCampaign] = [ active_campaigns: list[DropCampaign] = [
campaign for campaign in all_campaigns if campaign.start_at <= now and campaign.end_at >= now campaign for campaign in all_campaigns if campaign.start_at <= now and campaign.end_at is not None and campaign.end_at >= now
] ]
active_campaigns.sort(key=lambda c: c.end_at) active_campaigns.sort(key=lambda c: c.end_at if c.end_at is not None else datetime.datetime.max.replace(tzinfo=datetime.UTC))
upcoming_campaigns: list[DropCampaign] = [campaign for campaign in all_campaigns if campaign.start_at > now] upcoming_campaigns: list[DropCampaign] = [
upcoming_campaigns.sort(key=lambda c: c.start_at) campaign for campaign in all_campaigns if campaign.start_at is not None and campaign.start_at > now
]
expired_campaigns: list[DropCampaign] = [campaign for campaign in all_campaigns if campaign.end_at < now] upcoming_campaigns.sort(key=lambda c: c.start_at if c.start_at is not None else datetime.datetime.max.replace(tzinfo=datetime.UTC))
expired_campaigns: list[DropCampaign] = [
campaign for campaign in all_campaigns if campaign.end_at is not None and campaign.end_at < now
]
context.update({ context.update({
"active_campaigns": active_campaigns, "active_campaigns": active_campaigns,

6
uv.lock generated
View file

@ -548,11 +548,11 @@ dev = [
[[package]] [[package]]
name = "types-pyyaml" name = "types-pyyaml"
version = "6.0.12.20250516" version = "6.0.12.20250809"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/4e/22/59e2aeb48ceeee1f7cd4537db9568df80d62bdb44a7f9e743502ea8aab9c/types_pyyaml-6.0.12.20250516.tar.gz", hash = "sha256:9f21a70216fc0fa1b216a8176db5f9e0af6eb35d2f2932acb87689d03a5bf6ba", size = 17378, upload-time = "2025-05-16T03:08:04.897Z" } sdist = { url = "https://files.pythonhosted.org/packages/36/21/52ffdbddea3c826bc2758d811ccd7f766912de009c5cf096bd5ebba44680/types_pyyaml-6.0.12.20250809.tar.gz", hash = "sha256:af4a1aca028f18e75297da2ee0da465f799627370d74073e96fee876524f61b5", size = 17385, upload-time = "2025-08-09T03:14:34.867Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/99/5f/e0af6f7f6a260d9af67e1db4f54d732abad514252a7a378a6c4d17dd1036/types_pyyaml-6.0.12.20250516-py3-none-any.whl", hash = "sha256:8478208feaeb53a34cb5d970c56a7cd76b72659442e733e268a94dc72b2d0530", size = 20312, upload-time = "2025-05-16T03:08:04.019Z" }, { url = "https://files.pythonhosted.org/packages/35/3e/0346d09d6e338401ebf406f12eaf9d0b54b315b86f1ec29e34f1a0aedae9/types_pyyaml-6.0.12.20250809-py3-none-any.whl", hash = "sha256:032b6003b798e7de1a1ddfeefee32fac6486bdfe4845e0ae0e7fb3ee4512b52f", size = 20277, upload-time = "2025-08-09T03:14:34.055Z" },
] ]
[[package]] [[package]]