Make Owner optional; use dateparser to parse dates; use json-repair to read JSON

This commit is contained in:
Joakim Hellsén 2025-09-01 21:50:38 +02:00
commit 6d5f014134
12 changed files with 858 additions and 453 deletions

View file

@ -32,8 +32,8 @@ class TimeBasedDropInline(admin.TabularInline):
class DropCampaignAdmin(admin.ModelAdmin):
"""Admin configuration for DropCampaign model."""
list_display = ("id", "name", "game", "owner", "start_at", "end_at", "is_active")
list_filter = ("game", "owner")
list_display = ("id", "name", "game", "start_at", "end_at", "is_active")
list_filter = ("game",)
search_fields = ("id", "name", "description")
inlines = [TimeBasedDropInline]
readonly_fields = ("created_at", "updated_at")
@ -71,11 +71,9 @@ class DropBenefitAdmin(admin.ModelAdmin):
list_display = (
"id",
"name",
"game",
"owner_organization",
"distribution_type",
"entitlement_limit",
"created_at",
)
list_filter = ("game", "owner_organization", "distribution_type")
list_filter = ("distribution_type",)
search_fields = ("id", "name")

View file

@ -1,18 +1,16 @@
from __future__ import annotations
import json
import logging
import re
import shutil
import traceback
from pathlib import Path
from typing import TYPE_CHECKING, Any
import orjson
import dateparser
import json_repair
from django.core.management.base import BaseCommand, CommandError, CommandParser
from django.db import transaction
from django.utils import timezone
from django.utils.dateparse import parse_datetime
from twitch.models import DropBenefit, DropBenefitEdge, DropCampaign, Game, Organization, TimeBasedDrop
@ -23,6 +21,30 @@ if TYPE_CHECKING:
logger: logging.Logger = logging.getLogger(__name__)
def parse_date(value: str | None) -> datetime | None:
"""Parse a datetime string into a timezone-aware datetime using dateparser.
Args:
value: The datetime string to parse.
Returns:
A timezone-aware datetime object or None if parsing fails.
"""
value = (value or "").strip()
if not value or value == "None":
return None
dt: datetime | None = dateparser.parse(value, settings={"RETURN_AS_TIMEZONE_AWARE": True})
if not dt:
return None
# Ensure aware in Django's current timezone
if timezone.is_naive(dt):
dt = timezone.make_aware(dt, timezone.get_current_timezone())
return dt
class Command(BaseCommand):
"""Import Twitch drop campaign data from a JSON file or directory of JSON files."""
@ -96,19 +118,6 @@ class Command(BaseCommand):
self._process_file(json_file, processed_path)
except CommandError as e:
self.stdout.write(self.style.ERROR(f"Error processing {json_file}: {e}"))
except (orjson.JSONDecodeError, json.JSONDecodeError):
# Attempt to clean trailing broken JSON and retry parsing
try:
self.clean_file(json_file)
self.stdout.write(self.style.SUCCESS(f"Cleaned JSON in '{json_file.name}', retrying import."))
# re-process the cleaned file
self._process_file(json_file, processed_path)
except (orjson.JSONDecodeError, json.JSONDecodeError):
# Still invalid after cleanup, move to broken_json
broken_json_dir: Path = processed_path / "broken_json"
broken_json_dir.mkdir(parents=True, exist_ok=True)
self.stdout.write(self.style.WARNING(f"Invalid JSON in '{json_file}', even after cleanup. Moving to '{broken_json_dir}'."))
self.move_file(json_file, broken_json_dir / json_file.name)
except (ValueError, TypeError, AttributeError, KeyError, IndexError):
self.stdout.write(self.style.ERROR(f"Data error processing {json_file}"))
self.stdout.write(self.style.ERROR(traceback.format_exc()))
@ -119,6 +128,9 @@ class Command(BaseCommand):
def _process_file(self, file_path: Path, processed_path: Path) -> None:
"""Process a single JSON file.
Raises:
CommandError: If the file isn't a JSON file or has an invalid JSON structure.
Args:
file_path: Path to the JSON file.
processed_path: Subdirectory to move processed files to.
@ -126,7 +138,7 @@ class Command(BaseCommand):
raw_bytes: bytes = file_path.read_bytes()
raw_text: str = raw_bytes.decode("utf-8")
data = orjson.loads(raw_bytes)
data = json_repair.loads(raw_text)
broken_dir: Path = processed_path / "broken"
broken_dir.mkdir(parents=True, exist_ok=True)
@ -222,8 +234,11 @@ class Command(BaseCommand):
if isinstance(data, list):
for _item in data:
self.import_drop_campaign(_item, file_path=file_path)
else:
elif isinstance(data, dict):
self.import_drop_campaign(data, file_path=file_path)
else:
msg: str = f"Invalid JSON structure in {file_path}: Expected dict or list at top level"
raise CommandError(msg)
self.move_file(file_path, processed_path)
@ -341,71 +356,52 @@ class Command(BaseCommand):
"""
with transaction.atomic():
game: Game = self.game_update_or_create(campaign_data=campaign_data)
organization: Organization | None = self.owner_update_or_create(campaign_data=campaign_data)
organization: Organization | None = self.owner_update_or_create(campaign_data=campaign_data, file_path=file_path)
if organization is None:
self.stdout.write(self.style.WARNING("No organization found for this campaign, skipping drop campaign import."))
return
if organization:
game.owner = organization
game.save(update_fields=["owner"])
drop_campaign: DropCampaign = self.drop_campaign_update_or_get(
campaign_data=campaign_data,
game=game,
organization=organization,
)
drop_campaign: DropCampaign = self.drop_campaign_update_or_get(campaign_data=campaign_data, game=game)
for drop_data in campaign_data.get("timeBasedDrops", []):
time_based_drop: TimeBasedDrop = self.create_time_based_drop(drop_campaign=drop_campaign, drop_data=drop_data)
self._process_time_based_drop(drop_data, drop_campaign, file_path)
benefit_edges: list[dict[str, Any]] = drop_data.get("benefitEdges", [])
if not benefit_edges:
self.stdout.write(self.style.WARNING(f"No benefit edges found for drop {time_based_drop.name} (ID: {time_based_drop.id})"))
self.move_file(file_path, Path("no_benefit_edges") / file_path.name)
continue
for benefit_edge in benefit_edges:
benefit_defaults: dict[str, Any] = {}
benefit_data: dict[str, Any] = benefit_edge["benefit"]
benefit_name: str = str(benefit_data.get("name")).strip()
if benefit_name and benefit_name != "None":
benefit_defaults["name"] = benefit_name
img_asset: str = str(benefit_data.get("imageAssetURL")).strip()
if img_asset and img_asset != "None":
benefit_defaults["image_asset_url"] = img_asset
created_at: str = str(benefit_data.get("createdAt")).strip()
if created_at and created_at != "None":
benefit_defaults["created_at"] = created_at
ent_limit: int | None = benefit_data.get("entitlementLimit")
if ent_limit is not None:
benefit_defaults["entitlement_limit"] = ent_limit
ios_avail: bool | None = benefit_data.get("isIosAvailable")
if ios_avail is not None:
benefit_defaults["is_ios_available"] = ios_avail
dist_type: str | None = benefit_data.get("distributionType")
if dist_type is not None:
benefit_defaults["distribution_type"] = dist_type
benefit_defaults["game"] = game
benefit_defaults["owner_organization"] = organization
benefit, _ = DropBenefit.objects.update_or_create(
id=benefit_data["id"],
defaults=benefit_defaults,
)
DropBenefitEdge.objects.update_or_create(
drop=time_based_drop,
benefit=benefit,
defaults={
"entitlement_limit": benefit_edge.get("entitlementLimit", 1),
},
)
self.stdout.write(self.style.SUCCESS(f"Successfully imported drop campaign {drop_campaign.name} (ID: {drop_campaign.id})"))
def _process_time_based_drop(self, drop_data: dict[str, Any], drop_campaign: DropCampaign, file_path: Path) -> None:
time_based_drop: TimeBasedDrop = self.create_time_based_drop(drop_campaign=drop_campaign, drop_data=drop_data)
benefit_edges: list[dict[str, Any]] = drop_data.get("benefitEdges", [])
if not benefit_edges:
self.stdout.write(self.style.WARNING(f"No benefit edges found for drop {time_based_drop.name} (ID: {time_based_drop.id})"))
self.move_file(file_path, Path("no_benefit_edges") / file_path.name)
return
for benefit_edge in benefit_edges:
benefit_data: dict[str, Any] = benefit_edge["benefit"]
benefit_defaults = {
"name": benefit_data.get("name"),
"image_asset_url": benefit_data.get("imageAssetURL"),
"created_at": parse_date(benefit_data.get("createdAt")),
"entitlement_limit": benefit_data.get("entitlementLimit"),
"is_ios_available": benefit_data.get("isIosAvailable"),
"distribution_type": benefit_data.get("distributionType"),
}
# Filter out None values to avoid overwriting with them
benefit_defaults = {k: v for k, v in benefit_defaults.items() if v is not None}
benefit, _ = DropBenefit.objects.update_or_create(
id=benefit_data["id"],
defaults=benefit_defaults,
)
DropBenefitEdge.objects.update_or_create(
drop=time_based_drop,
benefit=benefit,
defaults={"entitlement_limit": benefit_edge.get("entitlementLimit", 1)},
)
def create_time_based_drop(self, drop_campaign: DropCampaign, drop_data: dict[str, Any]) -> TimeBasedDrop:
"""Creates or updates a TimeBasedDrop instance based on the provided drop data.
@ -423,49 +419,18 @@ class Command(BaseCommand):
TimeBasedDrop: The created or updated TimeBasedDrop instance.
"""
defaults: dict[str, Any] = {}
time_based_drop_defaults: dict[str, Any] = {
"campaign": drop_campaign,
"name": drop_data.get("name"),
"required_minutes_watched": drop_data.get("requiredMinutesWatched"),
"required_subs": drop_data.get("requiredSubs"),
"start_at": parse_date(drop_data.get("startAt")),
"end_at": parse_date(drop_data.get("endAt")),
}
# Filter out None values to avoid overwriting with them
time_based_drop_defaults = {k: v for k, v in time_based_drop_defaults.items() if v is not None}
name: str = drop_data.get("name", "")
if name:
defaults["name"] = name.strip()
# "requiredMinutesWatched": 240
required_minutes_watched: int = drop_data.get("requiredMinutesWatched", 0)
if required_minutes_watched:
defaults["required_minutes_watched"] = int(required_minutes_watched)
# "requiredSubs": 1,
required_subs: int = drop_data.get("requiredSubs", 0)
if required_subs:
defaults["required_subs"] = int(required_subs)
# "startAt": "2025-08-08T07:00:00Z",
# Model field is DateTimeField
start_at: str | None = drop_data.get("startAt")
if start_at:
# Convert to timezone-aware datetime
parsed_start_at: datetime | None = parse_datetime(start_at)
if parsed_start_at and timezone.is_naive(parsed_start_at):
parsed_start_at = timezone.make_aware(parsed_start_at)
if parsed_start_at:
defaults["start_at"] = parsed_start_at
# "endAt": "2025-02-04T10:59:59.999Z",
# Model field is DateTimeField
end_at: str | None = drop_data.get("endAt")
if end_at:
# Convert to timezone-aware datetime
parsed_end_at: datetime | None = parse_datetime(end_at)
if parsed_end_at and timezone.is_naive(parsed_end_at):
parsed_end_at = timezone.make_aware(parsed_end_at)
if parsed_end_at:
defaults["end_at"] = parsed_end_at
defaults["campaign"] = drop_campaign
time_based_drop, created = TimeBasedDrop.objects.update_or_create(id=drop_data["id"], defaults=defaults)
time_based_drop, created = TimeBasedDrop.objects.update_or_create(id=drop_data["id"], defaults=time_based_drop_defaults)
if created:
self.stdout.write(self.style.SUCCESS(f"Successfully imported time-based drop {time_based_drop.name} (ID: {time_based_drop.id})"))
@ -475,7 +440,6 @@ class Command(BaseCommand):
self,
campaign_data: dict[str, Any],
game: Game,
organization: Organization | None,
) -> DropCampaign:
"""Update or create a drop campaign.
@ -487,51 +451,33 @@ class Command(BaseCommand):
Returns:
Returns the DropCampaign object.
"""
defaults: dict[str, Any] = {}
name = campaign_data.get("name")
if name is not None:
defaults["name"] = name
desc = campaign_data.get("description")
if desc is not None:
defaults["description"] = desc.replace("\\n", "\n")
details = campaign_data.get("detailsURL")
if details is not None:
defaults["details_url"] = details
acct_link = campaign_data.get("accountLinkURL")
if acct_link is not None:
defaults["account_link_url"] = acct_link
img = campaign_data.get("imageURL")
if img is not None:
defaults["image_url"] = img
start = campaign_data.get("startAt")
if start is not None:
defaults["start_at"] = start
end = campaign_data.get("endAt")
if end is not None:
defaults["end_at"] = end
is_conn = campaign_data.get("self", {}).get("isAccountConnected")
if is_conn is not None:
defaults["is_account_connected"] = is_conn
defaults["game"] = game
if organization:
defaults["owner"] = organization
drop_campaign_defaults: dict[str, Any] = {
"game": game,
"name": campaign_data.get("name"),
"description": campaign_data.get("description"),
"details_url": campaign_data.get("detailsURL"),
"account_link_url": campaign_data.get("accountLinkURL"),
"image_url": campaign_data.get("imageURL"),
"start_at": parse_date(campaign_data.get("startAt") or campaign_data.get("startsAt")),
"end_at": parse_date(campaign_data.get("endAt") or campaign_data.get("endsAt")),
"is_account_connected": campaign_data.get("self", {}).get("isAccountConnected"),
}
# Filter out None values to avoid overwriting with them
drop_campaign_defaults = {k: v for k, v in drop_campaign_defaults.items() if v is not None}
drop_campaign, created = DropCampaign.objects.update_or_create(
id=campaign_data["id"],
defaults=defaults,
defaults=drop_campaign_defaults,
)
if created:
self.stdout.write(self.style.SUCCESS(f"Created new drop campaign: {drop_campaign.name} (ID: {drop_campaign.id})"))
return drop_campaign
def owner_update_or_create(self, campaign_data: dict[str, Any], file_path: Path) -> Organization | None:
def owner_update_or_create(self, campaign_data: dict[str, Any]) -> Organization | None:
"""Update or create an organization.
Args:
campaign_data: The drop campaign data to import.
file_path: Optional path to the file being processed, used for error handling.
Returns:
Returns the Organization object.
@ -540,37 +486,20 @@ class Command(BaseCommand):
if not org_data:
self.stdout.write(self.style.WARNING("No owner data found in campaign data. Attempting to find organization by game."))
# Try to find an organization by the game if possible
game_id: str | None = campaign_data.get("game", {}).get("id")
if game_id:
game: Game | None = Game.objects.filter(id=game_id).first()
if game:
if game.organizations.exists():
org: Organization | None = game.organizations.first()
if org:
self.stdout.write(self.style.SUCCESS(f"Found organization '{org.name}' for game '{game.display_name}'"))
return org
else:
self.stdout.write(self.style.WARNING(f"No game found with id '{game_id}' when looking up organization."))
organization: Organization | None = None
if org_data:
org_defaults: dict[str, Any] = {"name": org_data.get("name")}
# Filter out None values to avoid overwriting with them
org_defaults = {k: v for k, v in org_defaults.items() if v is not None}
# If not found, move the file for manual review
self.stdout.write(self.style.WARNING("No organization found for this campaign, moving file for review."))
todo_dir: Path = Path("check_these_please")
todo_dir.mkdir(parents=True, exist_ok=True)
self.move_file(
file_path,
todo_dir / file_path.name,
organization, created = Organization.objects.update_or_create(
id=org_data["id"],
defaults=org_defaults,
)
return None
organization, created = Organization.objects.update_or_create(
id=org_data["id"],
defaults={"name": org_data["name"]},
)
if created:
self.stdout.write(self.style.SUCCESS(f"Created new organization: {organization.name} (ID: {organization.id})"))
return organization
if created:
self.stdout.write(self.style.SUCCESS(f"Created new organization: {organization.name} (ID: {organization.id})"))
return organization
return None
def game_update_or_create(self, campaign_data: dict[str, Any]) -> Game:
"""Update or create a game.
@ -628,21 +557,3 @@ class Command(BaseCommand):
if changed_fields:
obj.save(update_fields=changed_fields)
return obj, created
def clean_file(self, path: Path) -> None:
"""Strip trailing broken JSON after the last 'extensions' block."""
text: str = path.read_text(encoding="utf-8")
# Handle extensions block at end of a JSON array
cleaned: str = re.sub(
r'(?s),?\s*"extensions"\s*:\s*\{.*?\}\s*\}\s*\]\s*$',
"}]",
text,
)
if cleaned == text:
# Fallback for standalone extensions block
cleaned = re.sub(
r'(?s),?\s*"extensions"\s*:\s*\{.*?\}\s*$',
"}",
text,
)
path.write_text(cleaned, encoding="utf-8")

View file

@ -0,0 +1,272 @@
# Generated by Django 5.2.5 on 2025-09-01 17:01
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('twitch', '0005_alter_timebaseddrop_end_at_and_more'),
]
operations = [
migrations.AlterModelOptions(
name='dropbenefit',
options={'ordering': ['-created_at']},
),
migrations.AlterModelOptions(
name='dropcampaign',
options={'ordering': ['-start_at']},
),
migrations.AlterModelOptions(
name='game',
options={'ordering': ['display_name']},
),
migrations.AlterModelOptions(
name='organization',
options={'ordering': ['name']},
),
migrations.AlterModelOptions(
name='timebaseddrop',
options={'ordering': ['start_at']},
),
migrations.RemoveIndex(
model_name='dropbenefit',
name='twitch_drop_game_id_a9209e_idx',
),
migrations.RemoveIndex(
model_name='dropbenefit',
name='twitch_drop_owner_o_45b4cc_idx',
),
migrations.RemoveIndex(
model_name='dropcampaign',
name='twitch_drop_game_id_868e70_idx',
),
migrations.RemoveIndex(
model_name='dropcampaign',
name='twitch_drop_owner_i_37241d_idx',
),
migrations.RemoveIndex(
model_name='game',
name='twitch_game_box_art_498a89_idx',
),
migrations.RemoveIndex(
model_name='timebaseddrop',
name='twitch_time_campaig_bbe349_idx',
),
migrations.AlterUniqueTogether(
name='dropbenefitedge',
unique_together=set(),
),
migrations.RemoveField(
model_name='dropbenefit',
name='game',
),
migrations.RemoveField(
model_name='dropbenefit',
name='owner_organization',
),
migrations.RemoveField(
model_name='dropcampaign',
name='owner',
),
migrations.AddField(
model_name='game',
name='owner',
field=models.ForeignKey(blank=True, help_text='The organization that owns this game.', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='games', to='twitch.organization', verbose_name='Organization'),
),
migrations.AlterField(
model_name='dropbenefit',
name='created_at',
field=models.DateTimeField(db_index=True, help_text='Timestamp when the benefit was created. This is from Twitch API and not auto-generated.', null=True),
),
migrations.AlterField(
model_name='dropbenefit',
name='distribution_type',
field=models.CharField(blank=True, db_index=True, default='', help_text='Type of distribution for this benefit.', max_length=50),
),
migrations.AlterField(
model_name='dropbenefit',
name='entitlement_limit',
field=models.PositiveIntegerField(default=1, help_text='Maximum number of times this benefit can be earned.'),
),
migrations.AlterField(
model_name='dropbenefit',
name='id',
field=models.CharField(help_text='Unique Twitch identifier for the benefit.', max_length=64, primary_key=True, serialize=False),
),
migrations.AlterField(
model_name='dropbenefit',
name='image_asset_url',
field=models.URLField(blank=True, default='', help_text="URL to the benefit's image asset.", max_length=500),
),
migrations.AlterField(
model_name='dropbenefit',
name='is_ios_available',
field=models.BooleanField(default=False, help_text='Whether the benefit is available on iOS.'),
),
migrations.AlterField(
model_name='dropbenefit',
name='name',
field=models.CharField(blank=True, db_index=True, default='N/A', help_text='Name of the drop benefit.', max_length=255),
),
migrations.AlterField(
model_name='dropbenefitedge',
name='benefit',
field=models.ForeignKey(help_text='The benefit in this relationship.', on_delete=django.db.models.deletion.CASCADE, to='twitch.dropbenefit'),
),
migrations.AlterField(
model_name='dropbenefitedge',
name='drop',
field=models.ForeignKey(help_text='The time-based drop in this relationship.', on_delete=django.db.models.deletion.CASCADE, to='twitch.timebaseddrop'),
),
migrations.AlterField(
model_name='dropbenefitedge',
name='entitlement_limit',
field=models.PositiveIntegerField(default=1, help_text='Max times this benefit can be claimed for this drop.'),
),
migrations.AlterField(
model_name='dropcampaign',
name='account_link_url',
field=models.URLField(blank=True, default='', help_text='URL to link a Twitch account for the campaign.', max_length=500),
),
migrations.AlterField(
model_name='dropcampaign',
name='created_at',
field=models.DateTimeField(auto_now_add=True, db_index=True, help_text='Timestamp when this campaign record was created.'),
),
migrations.AlterField(
model_name='dropcampaign',
name='description',
field=models.TextField(blank=True, help_text='Detailed description of the campaign.'),
),
migrations.AlterField(
model_name='dropcampaign',
name='details_url',
field=models.URLField(blank=True, default='', help_text='URL with campaign details.', max_length=500),
),
migrations.AlterField(
model_name='dropcampaign',
name='end_at',
field=models.DateTimeField(blank=True, db_index=True, help_text='Datetime when the campaign ends.', null=True),
),
migrations.AlterField(
model_name='dropcampaign',
name='game',
field=models.ForeignKey(help_text='Game associated with this campaign.', on_delete=django.db.models.deletion.CASCADE, related_name='drop_campaigns', to='twitch.game', verbose_name='Game'),
),
migrations.AlterField(
model_name='dropcampaign',
name='id',
field=models.CharField(help_text='Unique Twitch identifier for the campaign.', max_length=255, primary_key=True, serialize=False),
),
migrations.AlterField(
model_name='dropcampaign',
name='image_url',
field=models.URLField(blank=True, default='', help_text='URL to an image representing the campaign.', max_length=500),
),
migrations.AlterField(
model_name='dropcampaign',
name='is_account_connected',
field=models.BooleanField(default=False, help_text='Indicates if the user account is linked.'),
),
migrations.AlterField(
model_name='dropcampaign',
name='name',
field=models.CharField(db_index=True, help_text='Name of the drop campaign.', max_length=255),
),
migrations.AlterField(
model_name='dropcampaign',
name='start_at',
field=models.DateTimeField(blank=True, db_index=True, help_text='Datetime when the campaign starts.', null=True),
),
migrations.AlterField(
model_name='dropcampaign',
name='updated_at',
field=models.DateTimeField(auto_now=True, help_text='Timestamp when this campaign record was last updated.'),
),
migrations.AlterField(
model_name='game',
name='box_art',
field=models.URLField(blank=True, default='', max_length=500, verbose_name='Box art URL'),
),
migrations.AlterField(
model_name='game',
name='display_name',
field=models.CharField(blank=True, db_index=True, default='', max_length=255, verbose_name='Display name'),
),
migrations.AlterField(
model_name='game',
name='id',
field=models.CharField(max_length=64, primary_key=True, serialize=False, verbose_name='Game ID'),
),
migrations.AlterField(
model_name='game',
name='name',
field=models.CharField(blank=True, db_index=True, default='', max_length=255, verbose_name='Name'),
),
migrations.AlterField(
model_name='game',
name='slug',
field=models.CharField(blank=True, db_index=True, default='', help_text='Short unique identifier for the game.', max_length=200, verbose_name='Slug'),
),
migrations.AlterField(
model_name='organization',
name='id',
field=models.CharField(help_text='The unique Twitch identifier for the organization.', max_length=255, primary_key=True, serialize=False, verbose_name='Organization ID'),
),
migrations.AlterField(
model_name='organization',
name='name',
field=models.CharField(db_index=True, help_text='Display name of the organization.', max_length=255, unique=True, verbose_name='Name'),
),
migrations.AlterField(
model_name='timebaseddrop',
name='benefits',
field=models.ManyToManyField(help_text='Benefits unlocked by this drop.', related_name='drops', through='twitch.DropBenefitEdge', to='twitch.dropbenefit'),
),
migrations.AlterField(
model_name='timebaseddrop',
name='campaign',
field=models.ForeignKey(help_text='The campaign this drop belongs to.', on_delete=django.db.models.deletion.CASCADE, related_name='time_based_drops', to='twitch.dropcampaign'),
),
migrations.AlterField(
model_name='timebaseddrop',
name='end_at',
field=models.DateTimeField(blank=True, db_index=True, help_text='Datetime when this drop expires.', null=True),
),
migrations.AlterField(
model_name='timebaseddrop',
name='id',
field=models.CharField(help_text='Unique Twitch identifier for the time-based drop.', max_length=64, primary_key=True, serialize=False),
),
migrations.AlterField(
model_name='timebaseddrop',
name='name',
field=models.CharField(db_index=True, help_text='Name of the time-based drop.', max_length=255),
),
migrations.AlterField(
model_name='timebaseddrop',
name='required_minutes_watched',
field=models.PositiveIntegerField(blank=True, db_index=True, help_text='Minutes required to watch before earning this drop.', null=True),
),
migrations.AlterField(
model_name='timebaseddrop',
name='required_subs',
field=models.PositiveIntegerField(default=0, help_text='Number of subscriptions required to unlock this drop.'),
),
migrations.AlterField(
model_name='timebaseddrop',
name='start_at',
field=models.DateTimeField(blank=True, db_index=True, help_text='Datetime when this drop becomes available.', null=True),
),
migrations.AddConstraint(
model_name='dropbenefitedge',
constraint=models.UniqueConstraint(fields=('drop', 'benefit'), name='unique_drop_benefit'),
),
migrations.AddConstraint(
model_name='game',
constraint=models.UniqueConstraint(fields=('slug',), name='unique_game_slug'),
),
]

View file

@ -0,0 +1,17 @@
# Generated by Django 5.2.5 on 2025-09-01 17:06
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('twitch', '0006_alter_dropbenefit_options_alter_dropcampaign_options_and_more'),
]
operations = [
migrations.RemoveConstraint(
model_name='game',
name='unique_game_slug',
),
]

View file

@ -3,6 +3,7 @@ from __future__ import annotations
import logging
import re
from typing import TYPE_CHECKING, ClassVar
from urllib.parse import urlsplit, urlunsplit
from django.db import models
from django.utils import timezone
@ -15,21 +16,83 @@ if TYPE_CHECKING:
logger: logging.Logger = logging.getLogger("ttvdrops")
class Organization(models.Model):
"""Represents an organization on Twitch that can own drop campaigns."""
id = models.CharField(
max_length=255,
primary_key=True,
verbose_name="Organization ID",
help_text="The unique Twitch identifier for the organization.",
)
name = models.CharField(
max_length=255,
db_index=True,
unique=True,
verbose_name="Name",
help_text="Display name of the organization.",
)
class Meta:
ordering = ["name"]
indexes: ClassVar[list] = [
models.Index(fields=["name"]),
]
def __str__(self) -> str:
"""Return a string representation of the organization."""
return self.name or self.id
class Game(models.Model):
"""Represents a game on Twitch."""
id = models.TextField(primary_key=True)
slug = models.TextField(blank=True, default="", db_index=True)
name = models.TextField(blank=True, default="", db_index=True)
display_name = models.TextField(blank=True, default="", db_index=True)
box_art = models.URLField(max_length=500, blank=True, default="")
id = models.CharField(max_length=64, primary_key=True, verbose_name="Game ID")
slug = models.CharField(
max_length=200,
blank=True,
default="",
db_index=True,
verbose_name="Slug",
help_text="Short unique identifier for the game.",
)
name = models.CharField(
max_length=255,
blank=True,
default="",
db_index=True,
verbose_name="Name",
)
display_name = models.CharField(
max_length=255,
blank=True,
default="",
db_index=True,
verbose_name="Display name",
)
box_art = models.URLField(
max_length=500,
blank=True,
default="",
verbose_name="Box art URL",
)
owner = models.ForeignKey(
Organization,
on_delete=models.SET_NULL,
related_name="games",
null=True,
blank=True,
verbose_name="Organization",
help_text="The organization that owns this game.",
)
class Meta:
ordering = ["display_name"]
indexes: ClassVar[list] = [
models.Index(fields=["slug"]),
models.Index(fields=["display_name"]),
models.Index(fields=["name"]),
models.Index(fields=["box_art"]),
]
def __str__(self) -> str:
@ -41,78 +104,106 @@ class Game(models.Model):
self.name,
)
return f"{self.display_name} ({self.name})"
return self.name or self.slug or self.id
return self.display_name or self.name or self.slug or self.id
@property
def organizations(self) -> models.QuerySet[Organization]:
"""Return all organizations that have drop campaigns for this game."""
return Organization.objects.filter(drop_campaigns__game=self).distinct()
"""Return all organizations that own games with campaigns for this game."""
return Organization.objects.filter(games__drop_campaigns__game=self).distinct()
@property
def box_art_base_url(self) -> str:
"""Return the base box art URL without size suffix.
Twitch box art URLs often include size suffixes like '-120x160.jpg'.
This property returns the base URL without the size suffix.
Examples:
'https://static-cdn.jtvnw.net/ttv-boxart/512710-120x160.jpg'
-> 'https://static-cdn.jtvnw.net/ttv-boxart/512710.jpg'
"""
"""Return the base box art URL without Twitch size suffixes."""
if not self.box_art:
return ""
# Remove size suffix pattern like '-120x160' from the filename
return re.sub(r"-\d+x\d+(\.jpg|\.png|\.jpeg|\.gif|\.webp)$", r"\1", self.box_art)
class Organization(models.Model):
"""Represents an organization on Twitch that can own drop campaigns."""
id = models.TextField(primary_key=True)
name = models.TextField(db_index=True)
class Meta:
indexes: ClassVar[list] = [
models.Index(fields=["name"]),
]
def __str__(self) -> str:
"""Return a string representation of the organization."""
return self.name
parts = urlsplit(self.box_art)
path = re.sub(
r"(-\d+x\d+)(\.(?:jpg|jpeg|png|gif|webp))$",
r"\2",
parts.path,
flags=re.IGNORECASE,
)
return urlunsplit((parts.scheme, parts.netloc, path, "", ""))
class DropCampaign(models.Model):
"""Represents a Twitch drop campaign."""
id = models.TextField(primary_key=True)
name = models.TextField(db_index=True)
description = models.TextField(blank=True)
details_url = models.URLField(max_length=500, blank=True, default="")
account_link_url = models.URLField(max_length=500, blank=True, default="")
image_url = models.URLField(max_length=500, blank=True, default="")
start_at = models.DateTimeField(db_index=True, null=True)
end_at = models.DateTimeField(db_index=True, null=True)
is_account_connected = models.BooleanField(default=False)
id = models.CharField(
max_length=255,
primary_key=True,
help_text="Unique Twitch identifier for the campaign.",
)
name = models.CharField(
max_length=255,
db_index=True,
help_text="Name of the drop campaign.",
)
description = models.TextField(
blank=True,
help_text="Detailed description of the campaign.",
)
details_url = models.URLField(
max_length=500,
blank=True,
default="",
help_text="URL with campaign details.",
)
account_link_url = models.URLField(
max_length=500,
blank=True,
default="",
help_text="URL to link a Twitch account for the campaign.",
)
image_url = models.URLField(
max_length=500,
blank=True,
default="",
help_text="URL to an image representing the campaign.",
)
start_at = models.DateTimeField(
db_index=True,
null=True,
blank=True,
help_text="Datetime when the campaign starts.",
)
end_at = models.DateTimeField(
db_index=True,
null=True,
blank=True,
help_text="Datetime when the campaign ends.",
)
is_account_connected = models.BooleanField(
default=False,
help_text="Indicates if the user account is linked.",
)
# Foreign keys
game = models.ForeignKey(Game, on_delete=models.CASCADE, related_name="drop_campaigns", db_index=True)
owner = models.ForeignKey(Organization, on_delete=models.CASCADE, related_name="drop_campaigns", db_index=True)
game = models.ForeignKey(
Game,
on_delete=models.CASCADE,
related_name="drop_campaigns",
verbose_name="Game",
help_text="Game associated with this campaign.",
)
# Tracking fields
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
created_at = models.DateTimeField(
auto_now_add=True,
db_index=True,
help_text="Timestamp when this campaign record was created.",
)
updated_at = models.DateTimeField(
auto_now=True,
help_text="Timestamp when this campaign record was last updated.",
)
class Meta:
ordering = ["-start_at"]
indexes: ClassVar[list] = [
models.Index(fields=["name"]),
models.Index(fields=["start_at", "end_at"]),
models.Index(fields=["game"]),
models.Index(fields=["owner"]),
]
def __str__(self) -> str:
"""Return a string representation of the drop campaign."""
return self.name
@property
@ -135,29 +226,20 @@ class DropCampaign(models.Model):
if not self.game or not self.game.display_name:
return self.name
# Try different variations of the game name
game_variations = [self.game.display_name]
# Add & to "and" conversion
if "&" in self.game.display_name:
game_variations.append(self.game.display_name.replace("&", "and"))
# Add "and" to & conversion
if "and" in self.game.display_name:
game_variations.append(self.game.display_name.replace("and", "&"))
# Check each variation
for game_name in game_variations:
if not self.name.startswith(game_name):
continue
# Check for different separators after the game name
for separator in [" - ", " | ", " "]:
prefix_to_check = game_name + separator
# Check if it's followed by a separator like " - "
if self.name[len(game_name) :].startswith(" - "):
return self.name[len(game_name) + 3 :].strip()
# Or just remove the game name if it's followed by a space
if len(self.name) > len(game_name) and self.name[len(game_name)] == " ":
return self.name[len(game_name) + 1 :].strip()
name: str = self.name
if name.startswith(prefix_to_check):
return name.removeprefix(prefix_to_check).strip()
return self.name
@ -165,25 +247,53 @@ class DropCampaign(models.Model):
class DropBenefit(models.Model):
"""Represents a benefit that can be earned from a drop."""
id = models.TextField(primary_key=True)
name = models.TextField(db_index=True, blank=True, default="N/A")
image_asset_url = models.URLField(max_length=500, blank=True, default="")
created_at = models.DateTimeField(db_index=True, null=True)
entitlement_limit = models.PositiveIntegerField(default=1)
is_ios_available = models.BooleanField(default=False)
distribution_type = models.TextField(db_index=True, blank=True, default="")
id = models.CharField(
max_length=64,
primary_key=True,
help_text="Unique Twitch identifier for the benefit.",
)
name = models.CharField(
max_length=255,
db_index=True,
blank=True,
default="N/A",
help_text="Name of the drop benefit.",
)
image_asset_url = models.URLField(
max_length=500,
blank=True,
default="",
help_text="URL to the benefit's image asset.",
)
created_at = models.DateTimeField(
null=True,
db_index=True,
help_text="Timestamp when the benefit was created. This is from Twitch API and not auto-generated.",
)
entitlement_limit = models.PositiveIntegerField(
default=1,
help_text="Maximum number of times this benefit can be earned.",
)
# Foreign keys
game = models.ForeignKey(Game, on_delete=models.CASCADE, related_name="drop_benefits", db_index=True)
owner_organization = models.ForeignKey(Organization, on_delete=models.CASCADE, related_name="drop_benefits", db_index=True)
# TODO(TheLovinator): Check if this should be default True or False # noqa: TD003
is_ios_available = models.BooleanField(
default=False,
help_text="Whether the benefit is available on iOS.",
)
distribution_type = models.CharField(
max_length=50,
db_index=True,
blank=True,
default="",
help_text="Type of distribution for this benefit.",
)
class Meta:
ordering = ["-created_at"]
indexes: ClassVar[list] = [
models.Index(fields=["name"]),
models.Index(fields=["created_at"]),
models.Index(fields=["distribution_type"]),
models.Index(fields=["game"]),
models.Index(fields=["owner_organization"]),
]
def __str__(self) -> str:
@ -194,22 +304,58 @@ class DropBenefit(models.Model):
class TimeBasedDrop(models.Model):
"""Represents a time-based drop in a drop campaign."""
id = models.TextField(primary_key=True)
name = models.TextField(db_index=True)
required_minutes_watched = models.PositiveIntegerField(db_index=True, null=True)
required_subs = models.PositiveIntegerField(default=0)
start_at = models.DateTimeField(db_index=True, null=True)
end_at = models.DateTimeField(db_index=True, null=True)
id = models.CharField(
max_length=64,
primary_key=True,
help_text="Unique Twitch identifier for the time-based drop.",
)
name = models.CharField(
max_length=255,
db_index=True,
help_text="Name of the time-based drop.",
)
required_minutes_watched = models.PositiveIntegerField(
db_index=True,
null=True,
blank=True,
help_text="Minutes required to watch before earning this drop.",
)
required_subs = models.PositiveIntegerField(
default=0,
help_text="Number of subscriptions required to unlock this drop.",
)
start_at = models.DateTimeField(
db_index=True,
null=True,
blank=True,
help_text="Datetime when this drop becomes available.",
)
end_at = models.DateTimeField(
db_index=True,
null=True,
blank=True,
help_text="Datetime when this drop expires.",
)
# Foreign keys
campaign = models.ForeignKey(DropCampaign, on_delete=models.CASCADE, related_name="time_based_drops", db_index=True)
benefits = models.ManyToManyField(DropBenefit, through="DropBenefitEdge", related_name="drops") # type: ignore[var-annotated]
campaign = models.ForeignKey(
DropCampaign,
on_delete=models.CASCADE,
related_name="time_based_drops",
help_text="The campaign this drop belongs to.",
)
benefits = models.ManyToManyField(
DropBenefit,
through="DropBenefitEdge",
related_name="drops",
help_text="Benefits unlocked by this drop.",
)
class Meta:
ordering = ["start_at"]
indexes: ClassVar[list] = [
models.Index(fields=["name"]),
models.Index(fields=["start_at", "end_at"]),
models.Index(fields=["campaign"]),
models.Index(fields=["required_minutes_watched"]),
]
@ -221,12 +367,25 @@ class TimeBasedDrop(models.Model):
class DropBenefitEdge(models.Model):
"""Represents the relationship between a TimeBasedDrop and a DropBenefit."""
drop = models.ForeignKey(TimeBasedDrop, on_delete=models.CASCADE, db_index=True)
benefit = models.ForeignKey(DropBenefit, on_delete=models.CASCADE, db_index=True)
entitlement_limit = models.PositiveIntegerField(default=1)
drop = models.ForeignKey(
TimeBasedDrop,
on_delete=models.CASCADE,
help_text="The time-based drop in this relationship.",
)
benefit = models.ForeignKey(
DropBenefit,
on_delete=models.CASCADE,
help_text="The benefit in this relationship.",
)
entitlement_limit = models.PositiveIntegerField(
default=1,
help_text="Max times this benefit can be claimed for this drop.",
)
class Meta:
unique_together = ("drop", "benefit")
constraints = [
models.UniqueConstraint(fields=("drop", "benefit"), name="unique_drop_benefit"),
]
indexes: ClassVar[list] = [
models.Index(fields=["drop", "benefit"]),
]

View file

@ -2,7 +2,7 @@ from __future__ import annotations
import datetime
import logging
from dataclasses import dataclass
from collections import OrderedDict, defaultdict
from typing import TYPE_CHECKING, Any, cast
from django.contrib import messages
@ -58,7 +58,8 @@ class OrgDetailView(DetailView):
else:
subscription = NotificationSubscription.objects.filter(user=user, organization=organization).first()
games: QuerySet[Game, Game] = Game.objects.filter(drop_campaigns__owner=organization).distinct()
games: QuerySet[Game, Game] = organization.games.all() # pyright: ignore[reportAttributeAccessIssue]
context.update({
"subscription": subscription,
"games": games,
@ -87,7 +88,7 @@ class DropCampaignListView(ListView):
if game_filter:
queryset = queryset.filter(game__id=game_filter)
return queryset.select_related("game", "owner").order_by("-start_at")
return queryset.select_related("game__owner").order_by("-start_at")
def get_context_data(self, **kwargs) -> dict[str, Any]:
"""Add additional context data.
@ -99,10 +100,10 @@ class DropCampaignListView(ListView):
dict: Context data.
"""
kwargs = cast("dict[str, Any]", kwargs)
context: dict[str, datetime.datetime | str | int | QuerySet[Game, Game] | None] = super().get_context_data(**kwargs)
context: dict[str, Any] = super().get_context_data(**kwargs)
context["games"] = Game.objects.all().order_by("display_name")
context["status_options"] = ["active", "upcoming", "expired"]
context["now"] = timezone.now()
context["selected_game"] = str(self.request.GET.get(key="game", default=""))
context["selected_per_page"] = self.paginate_by
@ -130,7 +131,7 @@ class DropCampaignDetailView(DetailView):
if queryset is None:
queryset = self.get_queryset()
queryset = queryset.select_related("game", "owner")
queryset = queryset.select_related("game__owner")
return super().get_object(queryset=queryset)
@ -162,12 +163,12 @@ class GamesGridView(ListView):
context_object_name = "games"
def get_queryset(self) -> QuerySet[Game]:
"""Get queryset of games, annotated with campaign counts to avoid N+1 queries.
"""Get queryset of all games, annotated with campaign counts.
Returns:
QuerySet[Game]: Queryset of games with annotations.
QuerySet: Annotated games queryset.
"""
now = timezone.now()
now: datetime.datetime = timezone.now()
return (
super()
.get_queryset()
@ -186,64 +187,40 @@ class GamesGridView(ListView):
)
def get_context_data(self, **kwargs) -> dict[str, Any]:
"""Add additional context data with games grouped by organization.
"""Add additional context data with games grouped by their owning organization in a highly optimized manner.
Args:
**kwargs: Additional keyword arguments.
**kwargs: Additional arguments.
Returns:
dict: Context data with games grouped by organization.
"""
@dataclass(frozen=True)
class OrganizationData:
id: str
name: str
context: dict[str, Any] = super().get_context_data(**kwargs)
games_by_org: dict[OrganizationData, list[dict[str, Game | dict[str, int]]]] = {}
now: datetime.datetime = timezone.now()
organizations_with_games: QuerySet[Organization, Organization] = Organization.objects.filter(drop_campaigns__isnull=False).distinct().order_by("name")
game_org_relations: QuerySet[DropCampaign, dict[str, Any]] = DropCampaign.objects.values("game_id", "owner_id", "owner__name").annotate(
campaign_count=Count("id", distinct=True),
active_count=Count("id", filter=Q(start_at__lte=now, end_at__gte=now), distinct=True),
games_with_campaigns: QuerySet[Game, Game] = (
Game.objects.filter(drop_campaigns__isnull=False)
.select_related("owner")
.annotate(
campaign_count=Count("drop_campaigns", distinct=True),
active_count=Count(
"drop_campaigns",
filter=Q(
drop_campaigns__start_at__lte=now,
drop_campaigns__end_at__gte=now,
),
distinct=True,
),
)
.order_by("owner__name", "display_name")
)
all_games: dict[str, Game] = {game.id: game for game in Game.objects.all()}
org_names: dict[str, str] = {org.id: org.name for org in organizations_with_games}
games_by_org: defaultdict[Organization, list[dict[str, Game]]] = defaultdict(list)
for game in games_with_campaigns:
if game.owner:
games_by_org[game.owner].append({"game": game})
game_org_map: dict[str, dict[str, Any]] = {}
for relation in game_org_relations:
org_id: str = relation["owner_id"]
game_id: str = relation["game_id"]
if org_id not in game_org_map:
game_org_map[org_id] = {}
if game_id not in game_org_map[org_id]:
game: Game | None = all_games.get(game_id)
if game:
game_org_map[org_id][game_id] = {
"game": game,
"campaign_count": relation["campaign_count"],
"active_count": relation["active_count"],
}
for org_id, games in game_org_map.items():
if org_id in org_names:
org_obj = OrganizationData(id=org_id, name=org_names[org_id])
games_by_org[org_obj] = list(games.values())
games_with_counts: list[dict[str, Game | dict[str, int]]] = []
for org_games in games_by_org.values():
games_with_counts.extend(org_games)
context["games_with_counts"] = games_with_counts
context["games_by_org"] = games_by_org
context["games_by_org"] = OrderedDict(sorted(games_by_org.items(), key=lambda item: item[0].name))
return context
@ -275,7 +252,7 @@ class GameDetailView(DetailView):
subscription = NotificationSubscription.objects.filter(user=user, game=game).first()
now: datetime.datetime = timezone.now()
all_campaigns: QuerySet[DropCampaign, DropCampaign] = DropCampaign.objects.filter(game=game).select_related("owner").order_by("-end_at")
all_campaigns: QuerySet[DropCampaign, DropCampaign] = DropCampaign.objects.filter(game=game).select_related("game__owner").order_by("-end_at")
active_campaigns: list[DropCampaign] = [
campaign
@ -295,7 +272,7 @@ class GameDetailView(DetailView):
"upcoming_campaigns": upcoming_campaigns,
"expired_campaigns": expired_campaigns,
"subscription": subscription,
"owner": active_campaigns[0].owner if active_campaigns else None,
"owner": game.owner,
"now": now,
})
@ -312,9 +289,9 @@ def dashboard(request: HttpRequest) -> HttpResponse:
HttpResponse: The rendered dashboard template.
"""
now: datetime.datetime = timezone.now()
active_campaigns: QuerySet[DropCampaign, DropCampaign] = (
active_campaigns: QuerySet[DropCampaign] = (
DropCampaign.objects.filter(start_at__lte=now, end_at__gte=now)
.select_related("game", "owner")
.select_related("game__owner")
.prefetch_related(
Prefetch(
"time_based_drops",
@ -326,8 +303,10 @@ def dashboard(request: HttpRequest) -> HttpResponse:
campaigns_by_org_game: dict[str, Any] = {}
for campaign in active_campaigns:
org_id: str = campaign.owner.id
org_name: str = campaign.owner.name
owner: Organization | None = campaign.game.owner
org_id: str = owner.id if owner else "unknown"
org_name: str = owner.name if owner else "Unknown"
game_id: str = campaign.game.id
game_name: str = campaign.game.display_name
@ -370,8 +349,6 @@ def debug_view(request: HttpRequest) -> HttpResponse:
Returns:
HttpResponse: Rendered debug template or redirect if unauthorized.
"""
# Was previously staff-only; now any authenticated user can view.
now = timezone.now()
# Games with no organizations (no campaigns linking to an org)
@ -380,12 +357,12 @@ def debug_view(request: HttpRequest) -> HttpResponse:
# Campaigns with missing or obviously broken images (empty or very short or not http)
broken_image_campaigns: QuerySet[DropCampaign, DropCampaign] = DropCampaign.objects.filter(
Q(image_url__isnull=True) | Q(image_url__exact="") | ~Q(image_url__startswith="http")
).select_related("game", "owner")
).select_related("game")
# Benefits with missing images
broken_benefit_images: QuerySet[DropBenefit, DropBenefit] = DropBenefit.objects.filter(
Q(image_asset_url__isnull=True) | Q(image_asset_url__exact="") | ~Q(image_asset_url__startswith="http")
).select_related("game", "owner_organization")
).prefetch_related(Prefetch("drops", queryset=TimeBasedDrop.objects.select_related("campaign__game")))
# Time-based drops without any benefits
drops_without_benefits: QuerySet[TimeBasedDrop, TimeBasedDrop] = TimeBasedDrop.objects.filter(benefits__isnull=True).select_related("campaign")
@ -393,7 +370,7 @@ def debug_view(request: HttpRequest) -> HttpResponse:
# Campaigns with invalid dates (start after end or missing either)
invalid_date_campaigns: QuerySet[DropCampaign, DropCampaign] = DropCampaign.objects.filter(
Q(start_at__gt=models.F("end_at")) | Q(start_at__isnull=True) | Q(end_at__isnull=True)
).select_related("game", "owner")
).select_related("game")
# Duplicate campaign names per game
duplicate_name_campaigns = DropCampaign.objects.values("game_id", "name").annotate(name_count=Count("id")).filter(name_count__gt=1).order_by("-name_count")