WIP better import drops

This commit is contained in:
Joakim Hellsén 2025-12-01 21:38:28 +01:00
commit 69fa30748a
No known key found for this signature in database
10 changed files with 399 additions and 40 deletions

View file

@ -14,6 +14,7 @@ dependencies = [
"python-dotenv>=1.1.1", "python-dotenv>=1.1.1",
"pygments>=2.19.2", "pygments>=2.19.2",
"httpx>=0.28.1", "httpx>=0.28.1",
"pydantic>=2.12.5",
] ]
[dependency-groups] [dependency-groups]

0
scripts/__init__.py Normal file
View file

View file

@ -201,7 +201,7 @@ class DropCampaignFeed(Feed):
def item_guid(self, item: DropCampaign) -> str: def item_guid(self, item: DropCampaign) -> str:
"""Return a unique identifier for each campaign.""" """Return a unique identifier for each campaign."""
return item.id + "@ttvdrops.com" return item.twitch_id + "@ttvdrops.com"
def item_author_name(self, item: DropCampaign) -> str: def item_author_name(self, item: DropCampaign) -> str:
"""Return the author name for the campaign, typically the game name.""" """Return the author name for the campaign, typically the game name."""

View file

@ -0,0 +1,194 @@
from __future__ import annotations
import os
import sys
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
from pathlib import Path
from django.core.management.base import BaseCommand
from django.core.management.base import CommandError
from django.core.management.base import CommandParser
from pydantic import ValidationError
from twitch.models import Channel
from twitch.models import DropBenefit
from twitch.models import DropCampaign
from twitch.models import Game
from twitch.models import Organization
from twitch.schemas import ViewerDropsDashboardPayload
def move_failed_validation_file(file_path: Path) -> Path:
"""Moves a file that failed validation to a 'broken' subdirectory.
Args:
file_path: Path to the file that failed validation
Returns:
Path to the 'broken' directory where the file was moved
"""
broken_dir: Path = file_path.parent / "broken"
broken_dir.mkdir(parents=True, exist_ok=True)
target_file: Path = broken_dir / file_path.name
file_path.rename(target_file)
return broken_dir
class Command(BaseCommand):
"""Import Twitch drop campaign data from a JSON file or directory of JSON files."""
help = "Import Twitch drop campaign data from a JSON file or directory"
requires_migrations_checks = True
game_cache: dict[str, Game] = {}
organization_cache: dict[str, Organization] = {}
drop_campaign_cache: dict[str, DropCampaign] = {}
channel_cache: dict[str, Channel] = {}
benefit_cache: dict[str, DropBenefit] = {}
def add_arguments(self, parser: CommandParser) -> None:
"""Populate the command with arguments."""
parser.add_argument("path", type=str, help="Path to JSON file or directory")
parser.add_argument("--recursive", action="store_true", help="Recursively search directories for JSON files")
parser.add_argument("--crash-on-error", action="store_true", help="Crash the command on first error instead of continuing")
def pre_fill_cache(self) -> None:
"""Load all existing IDs from DB into memory to avoid N+1 queries."""
self.stdout.write("Pre-filling caches...")
self.game_cache = {str(g.twitch_id): g for g in Game.objects.all()}
self.stdout.write(f"\tGames: {len(self.game_cache)}")
self.organization_cache = {str(o.twitch_id): o for o in Organization.objects.all()}
self.stdout.write(f"\tOrganizations: {len(self.organization_cache)}")
self.drop_campaign_cache = {str(c.twitch_id): c for c in DropCampaign.objects.all()}
self.stdout.write(f"\tDrop Campaigns: {len(self.drop_campaign_cache)}")
self.channel_cache = {str(ch.twitch_id): ch for ch in Channel.objects.all()}
self.stdout.write(f"\tChannels: {len(self.channel_cache)}")
self.benefit_cache = {str(b.twitch_id): b for b in DropBenefit.objects.all()}
self.stdout.write(f"\tBenefits: {len(self.benefit_cache)}")
def handle(self, *args, **options) -> None: # noqa: ARG002
"""Main entry point for the command.
Raises:
CommandError: If the provided path does not exist.
"""
input_path: Path = Path(options["path"]).resolve()
self.pre_fill_cache()
try:
if input_path.is_file():
self.process_file(file_path=input_path, options=options)
elif input_path.is_dir():
self.process_json_files(input_path=input_path, options=options)
else:
msg: str = f"Path does not exist: {input_path}"
raise CommandError(msg)
except KeyboardInterrupt:
self.stdout.write(self.style.WARNING("\n\nInterrupted by user!"))
self.stdout.write(self.style.WARNING("Shutting down gracefully..."))
sys.exit(130)
def process_json_files(self, input_path: Path, options: dict) -> None:
"""Process multiple JSON files in a directory.
Args:
input_path: Path to the directory containing JSON files
options: Command options
"""
json_files: list[Path] = self.collect_json_files(options, input_path)
self.stdout.write(f"Found {len(json_files)} JSON files to process")
completed_count = 0
with ProcessPoolExecutor() as executor:
futures = {executor.submit(self.process_file_worker, file_path, options): file_path for file_path in json_files}
for future in as_completed(futures):
file_path: Path = futures[future]
try:
result: dict[str, bool | str] = future.result()
if result["success"]:
self.stdout.write(f"{file_path}")
else:
self.stdout.write(f"{file_path} -> {result['broken_dir']}/{file_path.name}")
completed_count += 1
except (OSError, ValueError, KeyError) as e:
self.stdout.write(f"{file_path} (error: {e})")
completed_count += 1
self.stdout.write(f"Progress: {completed_count}/{len(json_files)} files processed")
self.stdout.write("")
def collect_json_files(self, options: dict, input_path: Path) -> list[Path]:
"""Collect JSON files from the specified directory.
Args:
options: Command options
input_path: Path to the directory
Returns:
List of JSON file paths
"""
json_files: list[Path] = []
if options["recursive"]:
for root, _dirs, files in os.walk(input_path):
root_path = Path(root)
json_files.extend(root_path / file for file in files if file.endswith(".json"))
else:
json_files = [f for f in input_path.iterdir() if f.is_file() and f.suffix == ".json"]
return json_files
@staticmethod
def process_file_worker(file_path: Path, options: dict) -> dict[str, bool | str]:
"""Worker function for parallel processing of files.
Args:
file_path: Path to the JSON file to process
options: Command options
Raises:
ValidationError: If the JSON file fails validation
Returns:
Dict with success status and optional broken_dir path
"""
try:
ViewerDropsDashboardPayload.model_validate_json(file_path.read_text(encoding="utf-8"))
except ValidationError:
if options["crash_on_error"]:
raise
broken_dir: Path = move_failed_validation_file(file_path)
return {"success": False, "broken_dir": str(broken_dir)}
else:
return {"success": True}
def process_file(self, file_path: Path, options: dict) -> None:
"""Reads a JSON file and processes the campaign data.
Args:
file_path: Path to the JSON file
options: Command options
Raises:
ValidationError: If the JSON file fails validation
"""
self.stdout.write(f"Processing file: {file_path}")
try:
_: ViewerDropsDashboardPayload = ViewerDropsDashboardPayload.model_validate_json(file_path.read_text(encoding="utf-8"))
self.stdout.write("\tProcessed drop campaigns")
except ValidationError:
if options["crash_on_error"]:
raise
broken_dir: Path = move_failed_validation_file(file_path)
self.stdout.write(f"\tMoved to {broken_dir} (validation failed)")

View file

@ -187,15 +187,15 @@ class Command(BaseCommand):
"""Load existing DB objects into in-memory caches to avoid repeated queries.""" """Load existing DB objects into in-memory caches to avoid repeated queries."""
# These queries may be heavy if DB is huge — safe because optional via --no-preload # These queries may be heavy if DB is huge — safe because optional via --no-preload
with self._cache_locks["game"]: with self._cache_locks["game"]:
self._game_cache = {str(g.id): g for g in Game.objects.all()} self._game_cache = {str(g.twitch_id): g for g in Game.objects.all()}
with self._cache_locks["org"]: with self._cache_locks["org"]:
self._organization_cache = {str(o.id): o for o in Organization.objects.all()} self._organization_cache = {str(o.twitch_id): o for o in Organization.objects.all()}
with self._cache_locks["campaign"]: with self._cache_locks["campaign"]:
self._drop_campaign_cache = {str(c.id): c for c in DropCampaign.objects.all()} self._drop_campaign_cache = {str(c.twitch_id): c for c in DropCampaign.objects.all()}
with self._cache_locks["channel"]: with self._cache_locks["channel"]:
self._channel_cache = {str(ch.id): ch for ch in Channel.objects.all()} self._channel_cache = {str(ch.twitch_id): ch for ch in Channel.objects.all()}
with self._cache_locks["benefit"]: with self._cache_locks["benefit"]:
self._benefit_cache = {str(b.id): b for b in DropBenefit.objects.all()} self._benefit_cache = {str(b.twitch_id): b for b in DropBenefit.objects.all()}
def process_drops(self, *, continue_on_error: bool, path: Path, processed_path: Path) -> None: def process_drops(self, *, continue_on_error: bool, path: Path, processed_path: Path) -> None:
"""Process drops from a file or directory. """Process drops from a file or directory.
@ -397,8 +397,8 @@ class Command(BaseCommand):
return return
if isinstance(data, list): if isinstance(data, list):
for _item in data: for item in data:
self.import_drop_campaign(_item, file_path=file_path) self.import_drop_campaign(item, file_path=file_path)
elif isinstance(data, dict): elif isinstance(data, dict):
self.import_drop_campaign(data, file_path=file_path) self.import_drop_campaign(data, file_path=file_path)
else: else:
@ -534,7 +534,7 @@ class Command(BaseCommand):
benefit_edges: list[dict[str, Any]] = drop_data.get("benefitEdges", []) benefit_edges: list[dict[str, Any]] = drop_data.get("benefitEdges", [])
if not benefit_edges: if not benefit_edges:
tqdm.write(self.style.WARNING(f"No benefit edges found for drop {time_based_drop.name} (ID: {time_based_drop.id})")) tqdm.write(self.style.WARNING(f"No benefit edges found for drop {time_based_drop.name} (ID: {time_based_drop.twitch_id})"))
self.move_file(file_path, Path("no_benefit_edges") / file_path.name) self.move_file(file_path, Path("no_benefit_edges") / file_path.name)
return return
@ -570,10 +570,10 @@ class Command(BaseCommand):
if created: if created:
tqdm.write(f"Added {drop_benefit_edge}") tqdm.write(f"Added {drop_benefit_edge}")
except MultipleObjectsReturned as e: except MultipleObjectsReturned as e:
msg = f"Error: Multiple DropBenefitEdge objects found for drop {time_based_drop.id} and benefit {benefit.id}. Cannot update or create." msg = f"Error: Multiple DropBenefitEdge objects found for drop {time_based_drop.twitch_id} and benefit {benefit.twitch_id}. Cannot update or create." # noqa: E501
raise CommandError(msg) from e raise CommandError(msg) from e
except (IntegrityError, DatabaseError, TypeError, ValueError) as e: except (IntegrityError, DatabaseError, TypeError, ValueError) as e:
msg = f"Database or validation error creating DropBenefitEdge for drop {time_based_drop.id} and benefit {benefit.id}: {e}" msg = f"Database or validation error creating DropBenefitEdge for drop {time_based_drop.twitch_id} and benefit {benefit.twitch_id}: {e}"
raise CommandError(msg) from e raise CommandError(msg) from e
def create_time_based_drop(self, drop_campaign: DropCampaign, drop_data: dict[str, Any]) -> TimeBasedDrop: def create_time_based_drop(self, drop_campaign: DropCampaign, drop_data: dict[str, Any]) -> TimeBasedDrop:
@ -847,7 +847,7 @@ class Command(BaseCommand):
# Set the many-to-many relationship (save only if different) # Set the many-to-many relationship (save only if different)
current_ids = set(drop_campaign.allow_channels.values_list("id", flat=True)) current_ids = set(drop_campaign.allow_channels.values_list("id", flat=True))
new_ids = {ch.id for ch in channel_objects} new_ids = {ch.twitch_id for ch in channel_objects}
if current_ids != new_ids: if current_ids != new_ids:
drop_campaign.allow_channels.set(channel_objects) drop_campaign.allow_channels.set(channel_objects)

View file

@ -1,4 +1,4 @@
# Generated by Django 5.2.7 on 2025-10-13 00:36 # Generated by Django 5.2.8 on 2025-12-01 20:17
from __future__ import annotations from __future__ import annotations
import django.db.models.deletion import django.db.models.deletion
@ -7,11 +7,7 @@ from django.db import models
class Migration(migrations.Migration): class Migration(migrations.Migration):
"""Initial migration. """Initial Django migration for the twitch app schema."""
Args:
migrations (migrations.Migration): The base class for all migrations.
"""
initial = True initial = True
@ -22,7 +18,7 @@ class Migration(migrations.Migration):
name="Channel", name="Channel",
fields=[ fields=[
( (
"id", "twitch_id",
models.TextField(help_text="The unique Twitch identifier for the channel.", primary_key=True, serialize=False, verbose_name="Channel ID"), models.TextField(help_text="The unique Twitch identifier for the channel.", primary_key=True, serialize=False, verbose_name="Channel ID"),
), ),
("name", models.TextField(help_text="The lowercase username of the channel.", verbose_name="Username")), ("name", models.TextField(help_text="The lowercase username of the channel.", verbose_name="Username")),
@ -37,7 +33,7 @@ class Migration(migrations.Migration):
migrations.CreateModel( migrations.CreateModel(
name="DropBenefit", name="DropBenefit",
fields=[ fields=[
("id", models.TextField(help_text="Unique Twitch identifier for the benefit.", primary_key=True, serialize=False)), ("twitch_id", models.TextField(help_text="Unique Twitch identifier for the benefit.", primary_key=True, serialize=False)),
("name", models.TextField(blank=True, default="N/A", help_text="Name of the drop benefit.")), ("name", models.TextField(blank=True, default="N/A", help_text="Name of the drop benefit.")),
("image_asset_url", models.URLField(blank=True, default="", help_text="URL to the benefit's image asset.", max_length=500)), ("image_asset_url", models.URLField(blank=True, default="", help_text="URL to the benefit's image asset.", max_length=500)),
( (
@ -61,7 +57,8 @@ class Migration(migrations.Migration):
migrations.CreateModel( migrations.CreateModel(
name="Game", name="Game",
fields=[ fields=[
("id", models.TextField(primary_key=True, serialize=False, verbose_name="Game ID")), ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
("twitch_id", models.TextField(unique=True, verbose_name="Twitch game ID")),
("slug", models.TextField(blank=True, default="", help_text="Short unique identifier for the game.", max_length=200, verbose_name="Slug")), ("slug", models.TextField(blank=True, default="", help_text="Short unique identifier for the game.", max_length=200, verbose_name="Slug")),
("name", models.TextField(blank=True, default="", verbose_name="Name")), ("name", models.TextField(blank=True, default="", verbose_name="Name")),
("display_name", models.TextField(blank=True, default="", verbose_name="Display name")), ("display_name", models.TextField(blank=True, default="", verbose_name="Display name")),
@ -81,7 +78,7 @@ class Migration(migrations.Migration):
name="Organization", name="Organization",
fields=[ fields=[
( (
"id", "twitch_id",
models.TextField( models.TextField(
help_text="The unique Twitch identifier for the organization.", help_text="The unique Twitch identifier for the organization.",
primary_key=True, primary_key=True,
@ -113,7 +110,7 @@ class Migration(migrations.Migration):
migrations.CreateModel( migrations.CreateModel(
name="DropCampaign", name="DropCampaign",
fields=[ fields=[
("id", models.TextField(help_text="Unique Twitch identifier for the campaign.", primary_key=True, serialize=False)), ("twitch_id", models.TextField(help_text="Unique Twitch identifier for the campaign.", primary_key=True, serialize=False)),
("name", models.TextField(help_text="Name of the drop campaign.")), ("name", models.TextField(help_text="Name of the drop campaign.")),
("description", models.TextField(blank=True, help_text="Detailed description of the campaign.")), ("description", models.TextField(blank=True, help_text="Detailed description of the campaign.")),
("details_url", models.URLField(blank=True, default="", help_text="URL with campaign details.", max_length=500)), ("details_url", models.URLField(blank=True, default="", help_text="URL with campaign details.", max_length=500)),
@ -169,7 +166,7 @@ class Migration(migrations.Migration):
migrations.CreateModel( migrations.CreateModel(
name="TimeBasedDrop", name="TimeBasedDrop",
fields=[ fields=[
("id", models.TextField(help_text="Unique Twitch identifier for the time-based drop.", primary_key=True, serialize=False)), ("twitch_id", models.TextField(help_text="Unique Twitch identifier for the time-based drop.", primary_key=True, serialize=False)),
("name", models.TextField(help_text="Name of the time-based drop.")), ("name", models.TextField(help_text="Name of the time-based drop.")),
( (
"required_minutes_watched", "required_minutes_watched",
@ -215,7 +212,7 @@ class Migration(migrations.Migration):
migrations.CreateModel( migrations.CreateModel(
name="TwitchGameData", name="TwitchGameData",
fields=[ fields=[
("id", models.TextField(primary_key=True, serialize=False, verbose_name="Twitch Game ID")), ("twitch_id", models.TextField(primary_key=True, serialize=False, verbose_name="Twitch Game ID")),
("name", models.TextField(blank=True, default="", verbose_name="Name")), ("name", models.TextField(blank=True, default="", verbose_name="Name")),
( (
"box_art_url", "box_art_url",

View file

@ -16,7 +16,7 @@ logger: logging.Logger = logging.getLogger("ttvdrops")
class Organization(models.Model): class Organization(models.Model):
"""Represents an organization on Twitch that can own drop campaigns.""" """Represents an organization on Twitch that can own drop campaigns."""
id = models.TextField( twitch_id = models.TextField(
primary_key=True, primary_key=True,
verbose_name="Organization ID", verbose_name="Organization ID",
help_text="The unique Twitch identifier for the organization.", help_text="The unique Twitch identifier for the organization.",
@ -41,14 +41,14 @@ class Organization(models.Model):
def __str__(self) -> str: def __str__(self) -> str:
"""Return a string representation of the organization.""" """Return a string representation of the organization."""
return self.name or self.id return self.name or self.twitch_id
# MARK: Game # MARK: Game
class Game(models.Model): class Game(models.Model):
"""Represents a game on Twitch.""" """Represents a game on Twitch."""
id = models.TextField(primary_key=True, verbose_name="Game ID") twitch_id = models.TextField(verbose_name="Twitch game ID", unique=True)
slug = models.TextField( slug = models.TextField(
max_length=200, max_length=200,
blank=True, blank=True,
@ -111,7 +111,7 @@ class Game(models.Model):
self.name, self.name,
) )
return f"{self.display_name} ({self.name})" return f"{self.display_name} ({self.name})"
return self.display_name or self.name or self.slug or self.id return self.display_name or self.name or self.slug or self.twitch_id
@property @property
def organizations(self) -> models.QuerySet[Organization]: def organizations(self) -> models.QuerySet[Organization]:
@ -127,7 +127,7 @@ class Game(models.Model):
return self.name return self.name
if self.slug: if self.slug:
return self.slug return self.slug
return self.id return self.twitch_id
@property @property
def twitch_directory_url(self) -> str: def twitch_directory_url(self) -> str:
@ -151,7 +151,7 @@ class TwitchGameData(models.Model):
igdb_id: Optional IGDB id for the game igdb_id: Optional IGDB id for the game
""" """
id = models.TextField(primary_key=True, verbose_name="Twitch Game ID") twitch_id = models.TextField(primary_key=True, verbose_name="Twitch Game ID")
game = models.ForeignKey( game = models.ForeignKey(
Game, Game,
on_delete=models.SET_NULL, on_delete=models.SET_NULL,
@ -179,14 +179,14 @@ class TwitchGameData(models.Model):
ordering = ["name"] ordering = ["name"]
def __str__(self) -> str: def __str__(self) -> str:
return self.name or self.id return self.name or self.twitch_id
# MARK: Channel # MARK: Channel
class Channel(models.Model): class Channel(models.Model):
"""Represents a Twitch channel that can participate in drop campaigns.""" """Represents a Twitch channel that can participate in drop campaigns."""
id = models.TextField( twitch_id = models.TextField(
primary_key=True, primary_key=True,
verbose_name="Channel ID", verbose_name="Channel ID",
help_text="The unique Twitch identifier for the channel.", help_text="The unique Twitch identifier for the channel.",
@ -214,14 +214,14 @@ class Channel(models.Model):
def __str__(self) -> str: def __str__(self) -> str:
"""Return a string representation of the channel.""" """Return a string representation of the channel."""
return self.display_name or self.name or self.id return self.display_name or self.name or self.twitch_id
# MARK: DropCampaign # MARK: DropCampaign
class DropCampaign(models.Model): class DropCampaign(models.Model):
"""Represents a Twitch drop campaign.""" """Represents a Twitch drop campaign."""
id = models.TextField( twitch_id = models.TextField(
primary_key=True, primary_key=True,
help_text="Unique Twitch identifier for the campaign.", help_text="Unique Twitch identifier for the campaign.",
) )
@ -356,7 +356,7 @@ class DropCampaign(models.Model):
class DropBenefit(models.Model): class DropBenefit(models.Model):
"""Represents a benefit that can be earned from a drop.""" """Represents a benefit that can be earned from a drop."""
id = models.TextField( twitch_id = models.TextField(
primary_key=True, primary_key=True,
help_text="Unique Twitch identifier for the benefit.", help_text="Unique Twitch identifier for the benefit.",
) )
@ -419,7 +419,7 @@ class DropBenefit(models.Model):
class TimeBasedDrop(models.Model): class TimeBasedDrop(models.Model):
"""Represents a time-based drop in a drop campaign.""" """Represents a time-based drop in a drop campaign."""
id = models.TextField( twitch_id = models.TextField(
primary_key=True, primary_key=True,
help_text="Unique Twitch identifier for the time-based drop.", help_text="Unique Twitch identifier for the time-based drop.",
) )

131
twitch/schemas.py Normal file
View file

@ -0,0 +1,131 @@
from __future__ import annotations
from typing import Literal
from pydantic import BaseModel
from pydantic import Field
class Organization(BaseModel):
"""Schema for Twitch Organization objects."""
twitch_id: str = Field(alias="id")
name: str
type_name: Literal["Organization"] = Field(alias="__typename")
model_config = {
"extra": "forbid",
"validate_assignment": True,
"strict": True,
"populate_by_name": True,
}
class Game(BaseModel):
"""Schema for Twitch Game objects."""
twitch_id: str = Field(alias="id")
display_name: str = Field(alias="displayName")
box_art_url: str = Field(alias="boxArtURL")
type_name: Literal["Game"] = Field(alias="__typename")
model_config = {
"extra": "forbid",
"validate_assignment": True,
"strict": True,
"populate_by_name": True,
}
class DropCampaignSelfEdge(BaseModel):
"""Schema for the 'self' edge on DropCampaign objects."""
is_account_connected: bool = Field(alias="isAccountConnected")
type_name: Literal["DropCampaignSelfEdge"] = Field(alias="__typename")
model_config = {
"extra": "forbid",
"validate_assignment": True,
"strict": True,
"populate_by_name": True,
}
class DropCampaign(BaseModel):
"""Schema for Twitch DropCampaign objects."""
twitch_id: str = Field(alias="id")
name: str
owner: Organization
game: Game
status: Literal["ACTIVE", "EXPIRED"]
start_at: str = Field(alias="startAt")
end_at: str = Field(alias="endAt")
details_url: str = Field(alias="detailsURL")
account_link_url: str = Field(alias="accountLinkURL")
self: DropCampaignSelfEdge
type_name: Literal["DropCampaign"] = Field(alias="__typename")
model_config = {
"extra": "forbid",
"validate_assignment": True,
"strict": True,
"populate_by_name": True,
}
class CurrentUser(BaseModel):
"""Schema for Twitch User objects."""
twitch_id: str = Field(alias="id")
login: str
drop_campaigns: list[DropCampaign] = Field(alias="dropCampaigns")
type_name: Literal["User"] = Field(alias="__typename")
model_config = {
"extra": "forbid",
"validate_assignment": True,
"strict": True,
"populate_by_name": True,
}
class Data(BaseModel):
"""Schema for the data field in Twitch API responses."""
current_user: CurrentUser = Field(alias="currentUser")
model_config = {
"extra": "forbid",
"validate_assignment": True,
"strict": True,
"populate_by_name": True,
}
class Extensions(BaseModel):
"""Schema for the extensions field in Twitch API responses."""
duration_milliseconds: int = Field(alias="durationMilliseconds")
operation_name: Literal["ViewerDropsDashboard"] = Field(alias="operationName")
request_id: str = Field(alias="requestID")
model_config = {
"extra": "forbid",
"validate_assignment": True,
"strict": True,
"populate_by_name": True,
}
class ViewerDropsDashboardPayload(BaseModel):
"""Schema for the ViewerDropsDashboard response."""
data: Data
extensions: Extensions
model_config = {
"extra": "forbid",
"validate_assignment": True,
"strict": True,
}

34
twitch/utils.py Normal file
View file

@ -0,0 +1,34 @@
from __future__ import annotations
from functools import lru_cache
from typing import TYPE_CHECKING
import dateparser
from django.utils import timezone
if TYPE_CHECKING:
from datetime import datetime
@lru_cache(maxsize=40 * 1024)
def parse_date(value: str) -> datetime | None:
"""Parse a datetime string into a timezone-aware datetime using dateparser.
Args:
value: The datetime string to parse.
Returns:
A timezone-aware datetime object or None if parsing fails.
"""
dateparser_settings: dict[str, bool | int] = {
"RETURN_AS_TIMEZONE_AWARE": True,
"CACHE_SIZE_LIMIT": 0,
}
dt: datetime | None = dateparser.parse(date_string=value, settings=dateparser_settings) # pyright: ignore[reportArgumentType]
if not dt:
return None
# Ensure aware in Django's current timezone
if timezone.is_naive(dt):
dt = timezone.make_aware(dt, timezone.get_current_timezone())
return dt

View file

@ -8,11 +8,13 @@ from collections import defaultdict
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Any from typing import Any
if TYPE_CHECKING:
from django.db.models.manager import BaseManager
from django.contrib.postgres.search import SearchQuery from django.contrib.postgres.search import SearchQuery
from django.contrib.postgres.search import SearchRank from django.contrib.postgres.search import SearchRank
from django.contrib.postgres.search import SearchVector from django.contrib.postgres.search import SearchVector
from django.core.serializers import serialize from django.core.serializers import serialize
from django.db.models import BaseManager
from django.db.models import Count from django.db.models import Count
from django.db.models import F from django.db.models import F
from django.db.models import Model from django.db.models import Model
@ -546,9 +548,9 @@ def dashboard(request: HttpRequest) -> HttpResponse:
for campaign in active_campaigns: for campaign in active_campaigns:
owner: Organization | None = campaign.game.owner owner: Organization | None = campaign.game.owner
org_id: str = owner.id if owner else "unknown" org_id: str = owner.twitch_id if owner else "unknown"
org_name: str = owner.name if owner else "Unknown" org_name: str = owner.name if owner else "Unknown"
game_id: str = campaign.game.id game_id: str = campaign.game.twitch_id
game_name: str = campaign.game.display_name game_name: str = campaign.game.display_name
if org_id not in campaigns_by_org_game: if org_id not in campaigns_by_org_game: