diff --git a/pyproject.toml b/pyproject.toml index f409b62..ed7b5f4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ dependencies = [ "python-dotenv>=1.1.1", "pygments>=2.19.2", "httpx>=0.28.1", + "pydantic>=2.12.5", ] [dependency-groups] diff --git a/scripts/__init__.py b/scripts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/twitch/feeds.py b/twitch/feeds.py index 2e1afd9..f50a1c7 100644 --- a/twitch/feeds.py +++ b/twitch/feeds.py @@ -201,7 +201,7 @@ class DropCampaignFeed(Feed): def item_guid(self, item: DropCampaign) -> str: """Return a unique identifier for each campaign.""" - return item.id + "@ttvdrops.com" + return item.twitch_id + "@ttvdrops.com" def item_author_name(self, item: DropCampaign) -> str: """Return the author name for the campaign, typically the game name.""" diff --git a/twitch/management/commands/better_import_drops.py b/twitch/management/commands/better_import_drops.py new file mode 100644 index 0000000..18a391e --- /dev/null +++ b/twitch/management/commands/better_import_drops.py @@ -0,0 +1,194 @@ +from __future__ import annotations + +import os +import sys +from concurrent.futures import ProcessPoolExecutor +from concurrent.futures import as_completed +from pathlib import Path + +from django.core.management.base import BaseCommand +from django.core.management.base import CommandError +from django.core.management.base import CommandParser +from pydantic import ValidationError + +from twitch.models import Channel +from twitch.models import DropBenefit +from twitch.models import DropCampaign +from twitch.models import Game +from twitch.models import Organization +from twitch.schemas import ViewerDropsDashboardPayload + + +def move_failed_validation_file(file_path: Path) -> Path: + """Moves a file that failed validation to a 'broken' subdirectory. + + Args: + file_path: Path to the file that failed validation + + Returns: + Path to the 'broken' directory where the file was moved + """ + broken_dir: Path = file_path.parent / "broken" + broken_dir.mkdir(parents=True, exist_ok=True) + + target_file: Path = broken_dir / file_path.name + file_path.rename(target_file) + + return broken_dir + + +class Command(BaseCommand): + """Import Twitch drop campaign data from a JSON file or directory of JSON files.""" + + help = "Import Twitch drop campaign data from a JSON file or directory" + requires_migrations_checks = True + + game_cache: dict[str, Game] = {} + organization_cache: dict[str, Organization] = {} + drop_campaign_cache: dict[str, DropCampaign] = {} + channel_cache: dict[str, Channel] = {} + benefit_cache: dict[str, DropBenefit] = {} + + def add_arguments(self, parser: CommandParser) -> None: + """Populate the command with arguments.""" + parser.add_argument("path", type=str, help="Path to JSON file or directory") + parser.add_argument("--recursive", action="store_true", help="Recursively search directories for JSON files") + parser.add_argument("--crash-on-error", action="store_true", help="Crash the command on first error instead of continuing") + + def pre_fill_cache(self) -> None: + """Load all existing IDs from DB into memory to avoid N+1 queries.""" + self.stdout.write("Pre-filling caches...") + self.game_cache = {str(g.twitch_id): g for g in Game.objects.all()} + self.stdout.write(f"\tGames: {len(self.game_cache)}") + + self.organization_cache = {str(o.twitch_id): o for o in Organization.objects.all()} + self.stdout.write(f"\tOrganizations: {len(self.organization_cache)}") + + self.drop_campaign_cache = {str(c.twitch_id): c for c in DropCampaign.objects.all()} + self.stdout.write(f"\tDrop Campaigns: {len(self.drop_campaign_cache)}") + + self.channel_cache = {str(ch.twitch_id): ch for ch in Channel.objects.all()} + self.stdout.write(f"\tChannels: {len(self.channel_cache)}") + + self.benefit_cache = {str(b.twitch_id): b for b in DropBenefit.objects.all()} + self.stdout.write(f"\tBenefits: {len(self.benefit_cache)}") + + def handle(self, *args, **options) -> None: # noqa: ARG002 + """Main entry point for the command. + + Raises: + CommandError: If the provided path does not exist. + """ + input_path: Path = Path(options["path"]).resolve() + + self.pre_fill_cache() + + try: + if input_path.is_file(): + self.process_file(file_path=input_path, options=options) + elif input_path.is_dir(): + self.process_json_files(input_path=input_path, options=options) + else: + msg: str = f"Path does not exist: {input_path}" + raise CommandError(msg) + except KeyboardInterrupt: + self.stdout.write(self.style.WARNING("\n\nInterrupted by user!")) + self.stdout.write(self.style.WARNING("Shutting down gracefully...")) + sys.exit(130) + + def process_json_files(self, input_path: Path, options: dict) -> None: + """Process multiple JSON files in a directory. + + Args: + input_path: Path to the directory containing JSON files + options: Command options + """ + json_files: list[Path] = self.collect_json_files(options, input_path) + self.stdout.write(f"Found {len(json_files)} JSON files to process") + + completed_count = 0 + with ProcessPoolExecutor() as executor: + futures = {executor.submit(self.process_file_worker, file_path, options): file_path for file_path in json_files} + + for future in as_completed(futures): + file_path: Path = futures[future] + try: + result: dict[str, bool | str] = future.result() + if result["success"]: + self.stdout.write(f"✓ {file_path}") + else: + self.stdout.write(f"✗ {file_path} -> {result['broken_dir']}/{file_path.name}") + + completed_count += 1 + except (OSError, ValueError, KeyError) as e: + self.stdout.write(f"✗ {file_path} (error: {e})") + completed_count += 1 + + self.stdout.write(f"Progress: {completed_count}/{len(json_files)} files processed") + self.stdout.write("") + + def collect_json_files(self, options: dict, input_path: Path) -> list[Path]: + """Collect JSON files from the specified directory. + + Args: + options: Command options + input_path: Path to the directory + + Returns: + List of JSON file paths + """ + json_files: list[Path] = [] + if options["recursive"]: + for root, _dirs, files in os.walk(input_path): + root_path = Path(root) + json_files.extend(root_path / file for file in files if file.endswith(".json")) + else: + json_files = [f for f in input_path.iterdir() if f.is_file() and f.suffix == ".json"] + return json_files + + @staticmethod + def process_file_worker(file_path: Path, options: dict) -> dict[str, bool | str]: + """Worker function for parallel processing of files. + + Args: + file_path: Path to the JSON file to process + options: Command options + + Raises: + ValidationError: If the JSON file fails validation + + Returns: + Dict with success status and optional broken_dir path + """ + try: + ViewerDropsDashboardPayload.model_validate_json(file_path.read_text(encoding="utf-8")) + except ValidationError: + if options["crash_on_error"]: + raise + + broken_dir: Path = move_failed_validation_file(file_path) + return {"success": False, "broken_dir": str(broken_dir)} + else: + return {"success": True} + + def process_file(self, file_path: Path, options: dict) -> None: + """Reads a JSON file and processes the campaign data. + + Args: + file_path: Path to the JSON file + options: Command options + + Raises: + ValidationError: If the JSON file fails validation + """ + self.stdout.write(f"Processing file: {file_path}") + + try: + _: ViewerDropsDashboardPayload = ViewerDropsDashboardPayload.model_validate_json(file_path.read_text(encoding="utf-8")) + self.stdout.write("\tProcessed drop campaigns") + except ValidationError: + if options["crash_on_error"]: + raise + + broken_dir: Path = move_failed_validation_file(file_path) + self.stdout.write(f"\tMoved to {broken_dir} (validation failed)") diff --git a/twitch/management/commands/import_drops.py b/twitch/management/commands/import_drops.py index 53b406a..35eeedb 100644 --- a/twitch/management/commands/import_drops.py +++ b/twitch/management/commands/import_drops.py @@ -187,15 +187,15 @@ class Command(BaseCommand): """Load existing DB objects into in-memory caches to avoid repeated queries.""" # These queries may be heavy if DB is huge — safe because optional via --no-preload with self._cache_locks["game"]: - self._game_cache = {str(g.id): g for g in Game.objects.all()} + self._game_cache = {str(g.twitch_id): g for g in Game.objects.all()} with self._cache_locks["org"]: - self._organization_cache = {str(o.id): o for o in Organization.objects.all()} + self._organization_cache = {str(o.twitch_id): o for o in Organization.objects.all()} with self._cache_locks["campaign"]: - self._drop_campaign_cache = {str(c.id): c for c in DropCampaign.objects.all()} + self._drop_campaign_cache = {str(c.twitch_id): c for c in DropCampaign.objects.all()} with self._cache_locks["channel"]: - self._channel_cache = {str(ch.id): ch for ch in Channel.objects.all()} + self._channel_cache = {str(ch.twitch_id): ch for ch in Channel.objects.all()} with self._cache_locks["benefit"]: - self._benefit_cache = {str(b.id): b for b in DropBenefit.objects.all()} + self._benefit_cache = {str(b.twitch_id): b for b in DropBenefit.objects.all()} def process_drops(self, *, continue_on_error: bool, path: Path, processed_path: Path) -> None: """Process drops from a file or directory. @@ -397,8 +397,8 @@ class Command(BaseCommand): return if isinstance(data, list): - for _item in data: - self.import_drop_campaign(_item, file_path=file_path) + for item in data: + self.import_drop_campaign(item, file_path=file_path) elif isinstance(data, dict): self.import_drop_campaign(data, file_path=file_path) else: @@ -534,7 +534,7 @@ class Command(BaseCommand): benefit_edges: list[dict[str, Any]] = drop_data.get("benefitEdges", []) if not benefit_edges: - tqdm.write(self.style.WARNING(f"No benefit edges found for drop {time_based_drop.name} (ID: {time_based_drop.id})")) + tqdm.write(self.style.WARNING(f"No benefit edges found for drop {time_based_drop.name} (ID: {time_based_drop.twitch_id})")) self.move_file(file_path, Path("no_benefit_edges") / file_path.name) return @@ -570,10 +570,10 @@ class Command(BaseCommand): if created: tqdm.write(f"Added {drop_benefit_edge}") except MultipleObjectsReturned as e: - msg = f"Error: Multiple DropBenefitEdge objects found for drop {time_based_drop.id} and benefit {benefit.id}. Cannot update or create." + msg = f"Error: Multiple DropBenefitEdge objects found for drop {time_based_drop.twitch_id} and benefit {benefit.twitch_id}. Cannot update or create." # noqa: E501 raise CommandError(msg) from e except (IntegrityError, DatabaseError, TypeError, ValueError) as e: - msg = f"Database or validation error creating DropBenefitEdge for drop {time_based_drop.id} and benefit {benefit.id}: {e}" + msg = f"Database or validation error creating DropBenefitEdge for drop {time_based_drop.twitch_id} and benefit {benefit.twitch_id}: {e}" raise CommandError(msg) from e def create_time_based_drop(self, drop_campaign: DropCampaign, drop_data: dict[str, Any]) -> TimeBasedDrop: @@ -847,7 +847,7 @@ class Command(BaseCommand): # Set the many-to-many relationship (save only if different) current_ids = set(drop_campaign.allow_channels.values_list("id", flat=True)) - new_ids = {ch.id for ch in channel_objects} + new_ids = {ch.twitch_id for ch in channel_objects} if current_ids != new_ids: drop_campaign.allow_channels.set(channel_objects) diff --git a/twitch/migrations/0001_initial.py b/twitch/migrations/0001_initial.py index a02a5b5..1511601 100644 --- a/twitch/migrations/0001_initial.py +++ b/twitch/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 5.2.7 on 2025-10-13 00:36 +# Generated by Django 5.2.8 on 2025-12-01 20:17 from __future__ import annotations import django.db.models.deletion @@ -7,11 +7,7 @@ from django.db import models class Migration(migrations.Migration): - """Initial migration. - - Args: - migrations (migrations.Migration): The base class for all migrations. - """ + """Initial Django migration for the twitch app schema.""" initial = True @@ -22,7 +18,7 @@ class Migration(migrations.Migration): name="Channel", fields=[ ( - "id", + "twitch_id", models.TextField(help_text="The unique Twitch identifier for the channel.", primary_key=True, serialize=False, verbose_name="Channel ID"), ), ("name", models.TextField(help_text="The lowercase username of the channel.", verbose_name="Username")), @@ -37,7 +33,7 @@ class Migration(migrations.Migration): migrations.CreateModel( name="DropBenefit", fields=[ - ("id", models.TextField(help_text="Unique Twitch identifier for the benefit.", primary_key=True, serialize=False)), + ("twitch_id", models.TextField(help_text="Unique Twitch identifier for the benefit.", primary_key=True, serialize=False)), ("name", models.TextField(blank=True, default="N/A", help_text="Name of the drop benefit.")), ("image_asset_url", models.URLField(blank=True, default="", help_text="URL to the benefit's image asset.", max_length=500)), ( @@ -61,7 +57,8 @@ class Migration(migrations.Migration): migrations.CreateModel( name="Game", fields=[ - ("id", models.TextField(primary_key=True, serialize=False, verbose_name="Game ID")), + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("twitch_id", models.TextField(unique=True, verbose_name="Twitch game ID")), ("slug", models.TextField(blank=True, default="", help_text="Short unique identifier for the game.", max_length=200, verbose_name="Slug")), ("name", models.TextField(blank=True, default="", verbose_name="Name")), ("display_name", models.TextField(blank=True, default="", verbose_name="Display name")), @@ -81,7 +78,7 @@ class Migration(migrations.Migration): name="Organization", fields=[ ( - "id", + "twitch_id", models.TextField( help_text="The unique Twitch identifier for the organization.", primary_key=True, @@ -113,7 +110,7 @@ class Migration(migrations.Migration): migrations.CreateModel( name="DropCampaign", fields=[ - ("id", models.TextField(help_text="Unique Twitch identifier for the campaign.", primary_key=True, serialize=False)), + ("twitch_id", models.TextField(help_text="Unique Twitch identifier for the campaign.", primary_key=True, serialize=False)), ("name", models.TextField(help_text="Name of the drop campaign.")), ("description", models.TextField(blank=True, help_text="Detailed description of the campaign.")), ("details_url", models.URLField(blank=True, default="", help_text="URL with campaign details.", max_length=500)), @@ -169,7 +166,7 @@ class Migration(migrations.Migration): migrations.CreateModel( name="TimeBasedDrop", fields=[ - ("id", models.TextField(help_text="Unique Twitch identifier for the time-based drop.", primary_key=True, serialize=False)), + ("twitch_id", models.TextField(help_text="Unique Twitch identifier for the time-based drop.", primary_key=True, serialize=False)), ("name", models.TextField(help_text="Name of the time-based drop.")), ( "required_minutes_watched", @@ -215,7 +212,7 @@ class Migration(migrations.Migration): migrations.CreateModel( name="TwitchGameData", fields=[ - ("id", models.TextField(primary_key=True, serialize=False, verbose_name="Twitch Game ID")), + ("twitch_id", models.TextField(primary_key=True, serialize=False, verbose_name="Twitch Game ID")), ("name", models.TextField(blank=True, default="", verbose_name="Name")), ( "box_art_url", diff --git a/twitch/models.py b/twitch/models.py index fd39b95..3c5182e 100644 --- a/twitch/models.py +++ b/twitch/models.py @@ -16,7 +16,7 @@ logger: logging.Logger = logging.getLogger("ttvdrops") class Organization(models.Model): """Represents an organization on Twitch that can own drop campaigns.""" - id = models.TextField( + twitch_id = models.TextField( primary_key=True, verbose_name="Organization ID", help_text="The unique Twitch identifier for the organization.", @@ -41,14 +41,14 @@ class Organization(models.Model): def __str__(self) -> str: """Return a string representation of the organization.""" - return self.name or self.id + return self.name or self.twitch_id # MARK: Game class Game(models.Model): """Represents a game on Twitch.""" - id = models.TextField(primary_key=True, verbose_name="Game ID") + twitch_id = models.TextField(verbose_name="Twitch game ID", unique=True) slug = models.TextField( max_length=200, blank=True, @@ -111,7 +111,7 @@ class Game(models.Model): self.name, ) return f"{self.display_name} ({self.name})" - return self.display_name or self.name or self.slug or self.id + return self.display_name or self.name or self.slug or self.twitch_id @property def organizations(self) -> models.QuerySet[Organization]: @@ -127,7 +127,7 @@ class Game(models.Model): return self.name if self.slug: return self.slug - return self.id + return self.twitch_id @property def twitch_directory_url(self) -> str: @@ -151,7 +151,7 @@ class TwitchGameData(models.Model): igdb_id: Optional IGDB id for the game """ - id = models.TextField(primary_key=True, verbose_name="Twitch Game ID") + twitch_id = models.TextField(primary_key=True, verbose_name="Twitch Game ID") game = models.ForeignKey( Game, on_delete=models.SET_NULL, @@ -179,14 +179,14 @@ class TwitchGameData(models.Model): ordering = ["name"] def __str__(self) -> str: - return self.name or self.id + return self.name or self.twitch_id # MARK: Channel class Channel(models.Model): """Represents a Twitch channel that can participate in drop campaigns.""" - id = models.TextField( + twitch_id = models.TextField( primary_key=True, verbose_name="Channel ID", help_text="The unique Twitch identifier for the channel.", @@ -214,14 +214,14 @@ class Channel(models.Model): def __str__(self) -> str: """Return a string representation of the channel.""" - return self.display_name or self.name or self.id + return self.display_name or self.name or self.twitch_id # MARK: DropCampaign class DropCampaign(models.Model): """Represents a Twitch drop campaign.""" - id = models.TextField( + twitch_id = models.TextField( primary_key=True, help_text="Unique Twitch identifier for the campaign.", ) @@ -356,7 +356,7 @@ class DropCampaign(models.Model): class DropBenefit(models.Model): """Represents a benefit that can be earned from a drop.""" - id = models.TextField( + twitch_id = models.TextField( primary_key=True, help_text="Unique Twitch identifier for the benefit.", ) @@ -419,7 +419,7 @@ class DropBenefit(models.Model): class TimeBasedDrop(models.Model): """Represents a time-based drop in a drop campaign.""" - id = models.TextField( + twitch_id = models.TextField( primary_key=True, help_text="Unique Twitch identifier for the time-based drop.", ) diff --git a/twitch/schemas.py b/twitch/schemas.py new file mode 100644 index 0000000..4756f51 --- /dev/null +++ b/twitch/schemas.py @@ -0,0 +1,131 @@ +from __future__ import annotations + +from typing import Literal + +from pydantic import BaseModel +from pydantic import Field + + +class Organization(BaseModel): + """Schema for Twitch Organization objects.""" + + twitch_id: str = Field(alias="id") + name: str + type_name: Literal["Organization"] = Field(alias="__typename") + + model_config = { + "extra": "forbid", + "validate_assignment": True, + "strict": True, + "populate_by_name": True, + } + + +class Game(BaseModel): + """Schema for Twitch Game objects.""" + + twitch_id: str = Field(alias="id") + display_name: str = Field(alias="displayName") + box_art_url: str = Field(alias="boxArtURL") + type_name: Literal["Game"] = Field(alias="__typename") + + model_config = { + "extra": "forbid", + "validate_assignment": True, + "strict": True, + "populate_by_name": True, + } + + +class DropCampaignSelfEdge(BaseModel): + """Schema for the 'self' edge on DropCampaign objects.""" + + is_account_connected: bool = Field(alias="isAccountConnected") + type_name: Literal["DropCampaignSelfEdge"] = Field(alias="__typename") + + model_config = { + "extra": "forbid", + "validate_assignment": True, + "strict": True, + "populate_by_name": True, + } + + +class DropCampaign(BaseModel): + """Schema for Twitch DropCampaign objects.""" + + twitch_id: str = Field(alias="id") + name: str + owner: Organization + game: Game + status: Literal["ACTIVE", "EXPIRED"] + start_at: str = Field(alias="startAt") + end_at: str = Field(alias="endAt") + details_url: str = Field(alias="detailsURL") + account_link_url: str = Field(alias="accountLinkURL") + self: DropCampaignSelfEdge + type_name: Literal["DropCampaign"] = Field(alias="__typename") + + model_config = { + "extra": "forbid", + "validate_assignment": True, + "strict": True, + "populate_by_name": True, + } + + +class CurrentUser(BaseModel): + """Schema for Twitch User objects.""" + + twitch_id: str = Field(alias="id") + login: str + drop_campaigns: list[DropCampaign] = Field(alias="dropCampaigns") + type_name: Literal["User"] = Field(alias="__typename") + + model_config = { + "extra": "forbid", + "validate_assignment": True, + "strict": True, + "populate_by_name": True, + } + + +class Data(BaseModel): + """Schema for the data field in Twitch API responses.""" + + current_user: CurrentUser = Field(alias="currentUser") + + model_config = { + "extra": "forbid", + "validate_assignment": True, + "strict": True, + "populate_by_name": True, + } + + +class Extensions(BaseModel): + """Schema for the extensions field in Twitch API responses.""" + + duration_milliseconds: int = Field(alias="durationMilliseconds") + operation_name: Literal["ViewerDropsDashboard"] = Field(alias="operationName") + request_id: str = Field(alias="requestID") + + model_config = { + "extra": "forbid", + "validate_assignment": True, + "strict": True, + "populate_by_name": True, + } + + +class ViewerDropsDashboardPayload(BaseModel): + """Schema for the ViewerDropsDashboard response.""" + + data: Data + extensions: Extensions + + model_config = { + "extra": "forbid", + "validate_assignment": True, + "strict": True, + } diff --git a/twitch/utils.py b/twitch/utils.py new file mode 100644 index 0000000..54ad823 --- /dev/null +++ b/twitch/utils.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +from functools import lru_cache +from typing import TYPE_CHECKING + +import dateparser +from django.utils import timezone + +if TYPE_CHECKING: + from datetime import datetime + + +@lru_cache(maxsize=40 * 1024) +def parse_date(value: str) -> datetime | None: + """Parse a datetime string into a timezone-aware datetime using dateparser. + + Args: + value: The datetime string to parse. + + Returns: + A timezone-aware datetime object or None if parsing fails. + """ + dateparser_settings: dict[str, bool | int] = { + "RETURN_AS_TIMEZONE_AWARE": True, + "CACHE_SIZE_LIMIT": 0, + } + dt: datetime | None = dateparser.parse(date_string=value, settings=dateparser_settings) # pyright: ignore[reportArgumentType] + if not dt: + return None + + # Ensure aware in Django's current timezone + if timezone.is_naive(dt): + dt = timezone.make_aware(dt, timezone.get_current_timezone()) + return dt diff --git a/twitch/views.py b/twitch/views.py index b6071a4..150c7f3 100644 --- a/twitch/views.py +++ b/twitch/views.py @@ -8,11 +8,13 @@ from collections import defaultdict from typing import TYPE_CHECKING from typing import Any +if TYPE_CHECKING: + from django.db.models.manager import BaseManager + from django.contrib.postgres.search import SearchQuery from django.contrib.postgres.search import SearchRank from django.contrib.postgres.search import SearchVector from django.core.serializers import serialize -from django.db.models import BaseManager from django.db.models import Count from django.db.models import F from django.db.models import Model @@ -546,9 +548,9 @@ def dashboard(request: HttpRequest) -> HttpResponse: for campaign in active_campaigns: owner: Organization | None = campaign.game.owner - org_id: str = owner.id if owner else "unknown" + org_id: str = owner.twitch_id if owner else "unknown" org_name: str = owner.name if owner else "Unknown" - game_id: str = campaign.game.id + game_id: str = campaign.game.twitch_id game_name: str = campaign.game.display_name if org_id not in campaigns_by_org_game: