Compare commits

..

No commits in common. "1790bac2e08c370db41ab26425ff34d9a70873d6" and "47d4f5341f3e3b73d25ac078923153f92d1d60f0" have entirely different histories.

9 changed files with 384 additions and 1934 deletions

View file

@ -39,7 +39,6 @@ class TwitchConfig(AppConfig):
# Register post_save signal handlers that dispatch image download tasks # Register post_save signal handlers that dispatch image download tasks
# when new Twitch records are created. # when new Twitch records are created.
from django.db.models.signals import m2m_changed # noqa: I001, PLC0415
from django.db.models.signals import post_save # noqa: PLC0415 from django.db.models.signals import post_save # noqa: PLC0415
from twitch.models import DropBenefit # noqa: PLC0415 from twitch.models import DropBenefit # noqa: PLC0415
@ -47,7 +46,6 @@ class TwitchConfig(AppConfig):
from twitch.models import Game # noqa: PLC0415 from twitch.models import Game # noqa: PLC0415
from twitch.models import RewardCampaign # noqa: PLC0415 from twitch.models import RewardCampaign # noqa: PLC0415
from twitch.signals import on_drop_benefit_saved # noqa: PLC0415 from twitch.signals import on_drop_benefit_saved # noqa: PLC0415
from twitch.signals import on_drop_campaign_allow_channels_changed # noqa: PLC0415
from twitch.signals import on_drop_campaign_saved # noqa: PLC0415 from twitch.signals import on_drop_campaign_saved # noqa: PLC0415
from twitch.signals import on_game_saved # noqa: PLC0415 from twitch.signals import on_game_saved # noqa: PLC0415
from twitch.signals import on_reward_campaign_saved # noqa: PLC0415 from twitch.signals import on_reward_campaign_saved # noqa: PLC0415
@ -56,8 +54,3 @@ class TwitchConfig(AppConfig):
post_save.connect(on_drop_campaign_saved, sender=DropCampaign) post_save.connect(on_drop_campaign_saved, sender=DropCampaign)
post_save.connect(on_drop_benefit_saved, sender=DropBenefit) post_save.connect(on_drop_benefit_saved, sender=DropBenefit)
post_save.connect(on_reward_campaign_saved, sender=RewardCampaign) post_save.connect(on_reward_campaign_saved, sender=RewardCampaign)
m2m_changed.connect(
on_drop_campaign_allow_channels_changed,
sender=DropCampaign.allow_channels.through,
dispatch_uid="twitch_drop_campaign_allow_channels_counter_cache",
)

View file

@ -1,22 +0,0 @@
# Generated by Django 6.0.4 on 2026-04-11 22:41
from django.db import migrations
from django.db import models
class Migration(migrations.Migration):
"Add an index on the RewardCampaign model for the ends_at and starts_at fields, to optimize queries that filter by these fields."
dependencies = [
("twitch", "0019_dropcampaign_campaign_list_indexes"),
]
operations = [
migrations.AddIndex(
model_name="rewardcampaign",
index=models.Index(
fields=["ends_at", "-starts_at"],
name="tw_reward_ends_starts_idx",
),
),
]

View file

@ -1,83 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from django.db import migrations
from django.db import models
if TYPE_CHECKING:
from django.db.backends.base.schema import BaseDatabaseSchemaEditor
from django.db.migrations.state import StateApps
from twitch.models import Channel
from twitch.models import DropCampaign
def backfill_allowed_campaign_count(
apps: StateApps,
schema_editor: BaseDatabaseSchemaEditor,
) -> None:
"""Populate Channel.allowed_campaign_count from the M2M through table."""
del schema_editor
Channel: type[Channel] = apps.get_model("twitch", "Channel")
DropCampaign: type[DropCampaign] = apps.get_model("twitch", "DropCampaign")
through_model: type[Channel] = DropCampaign.allow_channels.through
counts_by_channel = {
row["channel_id"]: row["campaign_count"]
for row in (
through_model.objects.values("channel_id").annotate(
campaign_count=models.Count("dropcampaign_id"),
)
)
}
channels: list[Channel] = list(
Channel.objects.all().only("id", "allowed_campaign_count"),
)
for channel in channels:
channel.allowed_campaign_count = counts_by_channel.get(channel.pk, 0)
if channels:
Channel.objects.bulk_update(
channels,
["allowed_campaign_count"],
batch_size=1000,
)
def noop_reverse(
apps: StateApps,
schema_editor: BaseDatabaseSchemaEditor,
) -> None:
"""No-op reverse migration for cached counters."""
del apps
del schema_editor
class Migration(migrations.Migration):
"""Add cached channel campaign counts and backfill existing rows."""
dependencies = [
("twitch", "0020_rewardcampaign_tw_reward_ends_starts_idx"),
]
operations = [
migrations.AddField(
model_name="channel",
name="allowed_campaign_count",
field=models.PositiveIntegerField(
default=0,
help_text="Cached number of drop campaigns that allow this channel.",
),
),
migrations.AddIndex(
model_name="channel",
index=models.Index(
fields=["-allowed_campaign_count", "name"],
name="tw_chan_cc_name_idx",
),
),
migrations.RunPython(backfill_allowed_campaign_count, noop_reverse),
]

View file

@ -1,22 +0,0 @@
# Generated by Django 6.0.4 on 2026-04-12 03:06
from django.db import migrations
from django.db import models
class Migration(migrations.Migration):
"Add an index on the DropCampaign model for the game and end_at fields, with end_at in descending order. This is to optimize queries that filter by game and order by end_at descending."
dependencies = [
("twitch", "0021_channel_allowed_campaign_count_cache"),
]
operations = [
migrations.AddIndex(
model_name="dropcampaign",
index=models.Index(
fields=["game", "-end_at"],
name="tw_drop_game_end_desc_idx",
),
),
]

View file

@ -1,19 +1,13 @@
import datetime
import logging import logging
from collections import OrderedDict from collections import OrderedDict
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Any from typing import Any
from typing import Self
import auto_prefetch import auto_prefetch
from django.conf import settings from django.conf import settings
from django.contrib.postgres.indexes import GinIndex from django.contrib.postgres.indexes import GinIndex
from django.db import models from django.db import models
from django.db.models import Exists
from django.db.models import F
from django.db.models import Prefetch from django.db.models import Prefetch
from django.db.models import Q
from django.db.models.functions import Coalesce
from django.urls import reverse from django.urls import reverse
from django.utils import timezone from django.utils import timezone
from django.utils.html import format_html from django.utils.html import format_html
@ -21,6 +15,8 @@ from django.utils.html import format_html
from twitch.utils import normalize_twitch_box_art_url from twitch.utils import normalize_twitch_box_art_url
if TYPE_CHECKING: if TYPE_CHECKING:
import datetime
from django.db.models import QuerySet from django.db.models import QuerySet
logger: logging.Logger = logging.getLogger("ttvdrops") logger: logging.Logger = logging.getLogger("ttvdrops")
@ -70,32 +66,6 @@ class Organization(auto_prefetch.Model):
"""Return a string representation of the organization.""" """Return a string representation of the organization."""
return self.name or self.twitch_id return self.name or self.twitch_id
@classmethod
def for_list_view(cls) -> models.QuerySet[Organization]:
"""Return organizations with only fields needed by the org list page."""
return cls.objects.only("twitch_id", "name").order_by("name")
@classmethod
def for_detail_view(cls) -> models.QuerySet[Organization]:
"""Return organizations with only fields and relations needed by detail page."""
return cls.objects.only(
"twitch_id",
"name",
"added_at",
"updated_at",
).prefetch_related(
models.Prefetch(
"games",
queryset=Game.objects.only(
"twitch_id",
"name",
"display_name",
"slug",
).order_by("display_name"),
to_attr="games_for_detail",
),
)
def feed_description(self: Organization) -> str: def feed_description(self: Organization) -> str:
"""Return a description of the organization for RSS feeds.""" """Return a description of the organization for RSS feeds."""
name: str = self.name or "Unknown Organization" name: str = self.name or "Unknown Organization"
@ -265,131 +235,6 @@ class Game(auto_prefetch.Model):
"""Alias for box_art_best_url to provide a common interface with benefits.""" """Alias for box_art_best_url to provide a common interface with benefits."""
return self.box_art_best_url return self.box_art_best_url
@property
def dashboard_box_art_url(self) -> str:
"""Return dashboard-safe box art URL without touching deferred image fields."""
return normalize_twitch_box_art_url(self.box_art or "")
@classmethod
def for_detail_view(cls) -> models.QuerySet[Game]:
"""Return games with only fields/relations needed by game detail view."""
return cls.objects.only(
"twitch_id",
"slug",
"name",
"display_name",
"box_art",
"box_art_file",
"box_art_width",
"box_art_height",
"added_at",
"updated_at",
).prefetch_related(
Prefetch(
"owners",
queryset=Organization.objects.only("twitch_id", "name").order_by(
"name",
),
to_attr="owners_for_detail",
),
)
@classmethod
def with_campaign_counts(
cls,
now: datetime.datetime,
*,
with_campaigns_only: bool = False,
) -> models.QuerySet[Game]:
"""Return games annotated with total/active campaign counts.
Args:
now: Current timestamp used to evaluate active campaigns.
with_campaigns_only: If True, include only games with at least one campaign.
Returns:
QuerySet optimized for games list/grid rendering.
"""
campaigns_for_game = DropCampaign.objects.filter(
game_id=models.OuterRef("pk"),
)
campaign_count_subquery = (
campaigns_for_game
.order_by()
.values("game_id")
.annotate(total=models.Count("id"))
.values("total")[:1]
)
active_count_subquery = (
campaigns_for_game
.filter(start_at__lte=now, end_at__gte=now)
.order_by()
.values("game_id")
.annotate(total=models.Count("id"))
.values("total")[:1]
)
queryset: models.QuerySet[Game] = (
cls.objects
.only(
"twitch_id",
"display_name",
"name",
"slug",
"box_art",
"box_art_file",
"box_art_width",
"box_art_height",
)
.prefetch_related(
Prefetch(
"owners",
queryset=Organization.objects.only("twitch_id", "name").order_by(
"name",
),
),
)
.annotate(
campaign_count=Coalesce(
models.Subquery(
campaign_count_subquery,
output_field=models.IntegerField(),
),
models.Value(0),
),
active_count=Coalesce(
models.Subquery(
active_count_subquery,
output_field=models.IntegerField(),
),
models.Value(0),
),
)
.order_by("display_name")
)
if with_campaigns_only:
queryset = queryset.filter(Exists(campaigns_for_game))
return queryset
@staticmethod
def grouped_by_owner_for_grid(
games: models.QuerySet[Game],
) -> OrderedDict[Organization, list[dict[str, Game]]]:
"""Group games by owner organization for games grid/list pages.
Args:
games: QuerySet of games with prefetched owners.
Returns:
Ordered mapping of organizations to game dictionaries.
"""
grouped: OrderedDict[Organization, list[dict[str, Game]]] = OrderedDict()
for game in games:
for owner in game.owners.all():
grouped.setdefault(owner, []).append({"game": game})
return OrderedDict(sorted(grouped.items(), key=lambda item: item[0].name))
# MARK: TwitchGame # MARK: TwitchGame
class TwitchGameData(auto_prefetch.Model): class TwitchGameData(auto_prefetch.Model):
@ -497,11 +342,6 @@ class Channel(auto_prefetch.Model):
auto_now=True, auto_now=True,
) )
allowed_campaign_count = models.PositiveIntegerField(
help_text="Cached number of drop campaigns that allow this channel.",
default=0,
)
class Meta(auto_prefetch.Model.Meta): class Meta(auto_prefetch.Model.Meta):
ordering = ["display_name"] ordering = ["display_name"]
indexes = [ indexes = [
@ -510,68 +350,12 @@ class Channel(auto_prefetch.Model):
models.Index(fields=["twitch_id"]), models.Index(fields=["twitch_id"]),
models.Index(fields=["added_at"]), models.Index(fields=["added_at"]),
models.Index(fields=["updated_at"]), models.Index(fields=["updated_at"]),
models.Index(
fields=["-allowed_campaign_count", "name"],
name="tw_chan_cc_name_idx",
),
] ]
def __str__(self) -> str: def __str__(self) -> str:
"""Return a string representation of the channel.""" """Return a string representation of the channel."""
return self.display_name or self.name or self.twitch_id return self.display_name or self.name or self.twitch_id
@classmethod
def for_list_view(
cls,
search_query: str | None = None,
) -> models.QuerySet[Channel]:
"""Return channels for list view with campaign counts.
Args:
search_query: Optional free-text search for username/display name.
Returns:
QuerySet optimized for channel list rendering.
"""
queryset: models.QuerySet[Self, Self] = cls.objects.only(
"twitch_id",
"name",
"display_name",
"allowed_campaign_count",
)
normalized_query: str = (search_query or "").strip()
if normalized_query:
queryset = queryset.filter(
Q(name__icontains=normalized_query)
| Q(display_name__icontains=normalized_query),
)
return queryset.annotate(
campaign_count=F("allowed_campaign_count"),
).order_by("-campaign_count", "name")
@classmethod
def for_detail_view(cls) -> models.QuerySet[Channel]:
"""Return channels with only fields needed by the channel detail view."""
return cls.objects.only(
"twitch_id",
"name",
"display_name",
"added_at",
"updated_at",
)
@property
def preferred_name(self) -> str:
"""Return display name fallback used by channel-facing pages."""
return self.display_name or self.name or self.twitch_id
def detail_description(self, total_campaigns: int) -> str:
"""Return a short channel-detail description with pluralization."""
suffix: str = "s" if total_campaigns != 1 else ""
return f"{self.preferred_name} participates in {total_campaigns} drop campaign{suffix}"
# MARK: DropCampaign # MARK: DropCampaign
class DropCampaign(auto_prefetch.Model): class DropCampaign(auto_prefetch.Model):
@ -736,11 +520,6 @@ class DropCampaign(auto_prefetch.Model):
fields=["is_fully_imported", "start_at", "end_at"], fields=["is_fully_imported", "start_at", "end_at"],
name="tw_drop_imported_start_end_idx", name="tw_drop_imported_start_end_idx",
), ),
# For game detail campaigns listing by game with end date ordering.
models.Index(
fields=["game", "-end_at"],
name="tw_drop_game_end_desc_idx",
),
] ]
def __str__(self) -> str: def __str__(self) -> str:
@ -787,306 +566,6 @@ class DropCampaign(auto_prefetch.Model):
queryset = queryset.filter(end_at__lt=now) queryset = queryset.filter(end_at__lt=now)
return queryset return queryset
@classmethod
def for_game_detail(
cls,
game: Game,
) -> models.QuerySet[DropCampaign]:
"""Return campaigns with only game-detail-needed relations/fields loaded."""
return (
cls.objects
.filter(game=game)
.select_related("game")
.only(
"twitch_id",
"name",
"start_at",
"end_at",
"game",
"game__display_name",
)
.prefetch_related(
Prefetch(
"time_based_drops",
queryset=TimeBasedDrop.objects.only(
"twitch_id",
"campaign_id",
).prefetch_related(
Prefetch(
"benefits",
queryset=DropBenefit.objects.only(
"twitch_id",
"name",
"image_asset_url",
"image_file",
"image_width",
"image_height",
).order_by("name"),
),
),
),
)
.order_by("-end_at")
)
@classmethod
def for_detail_view(cls, twitch_id: str) -> DropCampaign:
"""Return a campaign with only detail-view-required relations/fields loaded.
Args:
twitch_id: Campaign Twitch ID.
Returns:
Campaign object with game, owners, channels, drops, and benefits preloaded.
"""
return (
cls.objects
.select_related("game")
.only(
"twitch_id",
"name",
"description",
"details_url",
"account_link_url",
"image_url",
"image_file",
"image_width",
"image_height",
"start_at",
"end_at",
"added_at",
"updated_at",
"game__twitch_id",
"game__name",
"game__display_name",
"game__slug",
)
.prefetch_related(
Prefetch(
"game__owners",
queryset=Organization.objects.only("twitch_id", "name").order_by(
"name",
),
to_attr="owners_for_detail",
),
Prefetch(
"allow_channels",
queryset=Channel.objects.only(
"twitch_id",
"name",
"display_name",
).order_by("display_name"),
to_attr="channels_ordered",
),
Prefetch(
"time_based_drops",
queryset=TimeBasedDrop.objects
.only(
"twitch_id",
"name",
"required_minutes_watched",
"required_subs",
"start_at",
"end_at",
"campaign_id",
)
.prefetch_related(
Prefetch(
"benefits",
queryset=DropBenefit.objects.only(
"twitch_id",
"name",
"distribution_type",
"image_asset_url",
"image_file",
"image_width",
"image_height",
),
),
)
.order_by("required_minutes_watched"),
),
)
.get(twitch_id=twitch_id)
)
@classmethod
def for_channel_detail(cls, channel: Channel) -> models.QuerySet[DropCampaign]:
"""Return campaigns with only channel-detail-required relations/fields.
Args:
channel: Channel used for allow-list filtering.
Returns:
QuerySet ordered by newest start date.
"""
return (
cls.objects
.filter(allow_channels=channel)
.select_related("game")
.only(
"twitch_id",
"name",
"start_at",
"end_at",
"game",
"game__twitch_id",
"game__name",
"game__display_name",
)
.prefetch_related(
Prefetch(
"time_based_drops",
queryset=TimeBasedDrop.objects.only(
"twitch_id",
"campaign_id",
).prefetch_related(
Prefetch(
"benefits",
queryset=DropBenefit.objects.only(
"twitch_id",
"name",
"image_asset_url",
"image_file",
"image_width",
"image_height",
).order_by("name"),
),
),
),
)
.order_by("-start_at")
)
@staticmethod
def split_for_channel_detail(
campaigns: list[DropCampaign],
now: datetime.datetime,
) -> tuple[list[DropCampaign], list[DropCampaign], list[DropCampaign]]:
"""Split channel campaigns into active, upcoming, and expired buckets.
Args:
campaigns: List of campaigns to split.
now: Current datetime for comparison.
Returns:
Tuple containing lists of active, upcoming, and expired campaigns.
"""
sentinel: datetime.datetime = datetime.datetime.max.replace(
tzinfo=datetime.UTC,
)
active_campaigns: list[DropCampaign] = sorted(
[
campaign
for campaign in campaigns
if campaign.start_at is not None
and campaign.start_at <= now
and campaign.end_at is not None
and campaign.end_at >= now
],
key=lambda campaign: campaign.end_at or sentinel,
)
upcoming_campaigns: list[DropCampaign] = sorted(
[
campaign
for campaign in campaigns
if campaign.start_at is not None and campaign.start_at > now
],
key=lambda campaign: campaign.start_at or sentinel,
)
expired_campaigns: list[DropCampaign] = [
campaign
for campaign in campaigns
if campaign.end_at is not None and campaign.end_at < now
]
return active_campaigns, upcoming_campaigns, expired_campaigns
@staticmethod
def _countdown_text_for_drop(
drop: TimeBasedDrop,
now: datetime.datetime,
) -> str:
"""Return a display countdown for a detail-view drop row."""
if drop.end_at and drop.end_at > now:
time_diff: datetime.timedelta = drop.end_at - now
days: int = time_diff.days
hours, remainder = divmod(time_diff.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
if days > 0:
return f"{days}d {hours}h {minutes}m"
if hours > 0:
return f"{hours}h {minutes}m"
if minutes > 0:
return f"{minutes}m {seconds}s"
return f"{seconds}s"
if drop.start_at and drop.start_at > now:
return "Not started"
return "Expired"
def awarded_badges_by_drop_twitch_id(
self,
) -> dict[str, ChatBadge]:
"""Return the first awarded badge per drop keyed by drop Twitch ID."""
drops: list[TimeBasedDrop] = list(self.time_based_drops.all()) # pyright: ignore[reportAttributeAccessIssue]
badge_titles: set[str] = {
benefit.name
for drop in drops
for benefit in drop.benefits.all()
if benefit.distribution_type == "BADGE" and benefit.name
}
if not badge_titles:
return {}
badges_by_title: dict[str, ChatBadge] = {
badge.title: badge
for badge in (
ChatBadge.objects
.select_related("badge_set")
.only(
"title",
"description",
"image_url_2x",
"badge_set__set_id",
)
.filter(title__in=badge_titles)
)
}
awarded_badges: dict[str, ChatBadge] = {}
for drop in drops:
for benefit in drop.benefits.all():
if benefit.distribution_type != "BADGE":
continue
badge: ChatBadge | None = badges_by_title.get(benefit.name)
if badge:
awarded_badges[drop.twitch_id] = badge
break
return awarded_badges
def enhanced_drops_for_detail(
self,
now: datetime.datetime,
) -> list[dict[str, Any]]:
"""Return campaign drops with detail-view presentation metadata."""
drops: list[TimeBasedDrop] = list(self.time_based_drops.all()) # pyright: ignore[reportAttributeAccessIssue]
awarded_badges: dict[str, ChatBadge] = self.awarded_badges_by_drop_twitch_id()
return [
{
"drop": drop,
"local_start": drop.start_at,
"local_end": drop.end_at,
"timezone_name": "UTC",
"countdown_text": self._countdown_text_for_drop(drop, now),
"awarded_badge": awarded_badges.get(drop.twitch_id),
}
for drop in drops
]
@classmethod @classmethod
def active_for_dashboard( def active_for_dashboard(
cls, cls,
@ -1107,22 +586,26 @@ class DropCampaign(auto_prefetch.Model):
"twitch_id", "twitch_id",
"name", "name",
"image_url", "image_url",
"image_file",
"image_width",
"image_height",
"start_at", "start_at",
"end_at", "end_at",
"allow_is_enabled", "allow_is_enabled",
"game", "game",
"game__twitch_id", "game__twitch_id",
"game__display_name", "game__display_name",
"game__name",
"game__slug", "game__slug",
"game__box_art", "game__box_art",
"game__box_art_file",
"game__box_art_width",
"game__box_art_height",
) )
.select_related("game") .select_related("game")
.prefetch_related( .prefetch_related(
models.Prefetch( models.Prefetch(
"game__owners", "game__owners",
queryset=Organization.objects.only("twitch_id", "name"), queryset=Organization.objects.only("twitch_id", "name"),
to_attr="owners_for_dashboard",
), ),
models.Prefetch( models.Prefetch(
"allow_channels", "allow_channels",
@ -1157,14 +640,14 @@ class DropCampaign(auto_prefetch.Model):
for campaign in campaigns: for campaign in campaigns:
game: Game = campaign.game game: Game = campaign.game
game_id: str = game.twitch_id game_id: str = game.twitch_id
game_display_name: str = game.get_game_name game_display_name: str = game.display_name
game_bucket: dict[str, Any] = campaigns_by_game.setdefault( game_bucket: dict[str, Any] = campaigns_by_game.setdefault(
game_id, game_id,
{ {
"name": game_display_name, "name": game_display_name,
"box_art": game.dashboard_box_art_url, "box_art": game.box_art_best_url,
"owners": list(getattr(game, "owners_for_dashboard", [])), "owners": list(game.owners.all()),
"campaigns": [], "campaigns": [],
}, },
) )
@ -1172,7 +655,7 @@ class DropCampaign(auto_prefetch.Model):
game_bucket["campaigns"].append({ game_bucket["campaigns"].append({
"campaign": campaign, "campaign": campaign,
"clean_name": campaign.clean_name, "clean_name": campaign.clean_name,
"image_url": campaign.dashboard_image_url, "image_url": campaign.listing_image_url,
"allowed_channels": getattr(campaign, "channels_ordered", []), "allowed_channels": getattr(campaign, "channels_ordered", []),
"game_display_name": game_display_name, "game_display_name": game_display_name,
"game_twitch_directory_url": game.twitch_directory_url, "game_twitch_directory_url": game.twitch_directory_url,
@ -1195,24 +678,6 @@ class DropCampaign(auto_prefetch.Model):
""" """
return cls.grouped_by_game(cls.active_for_dashboard(now)) return cls.grouped_by_game(cls.active_for_dashboard(now))
@classmethod
def dashboard_context(
cls,
now: datetime.datetime,
) -> dict[str, Any]:
"""Return dashboard data assembled by model-layer query helpers.
Args:
now: Current timestamp used for active-window filtering.
Returns:
Dict with grouped drop campaigns and active reward campaigns.
"""
return {
"campaigns_by_game": cls.campaigns_by_game_for_dashboard(now),
"active_reward_campaigns": RewardCampaign.active_for_dashboard(now),
}
@property @property
def is_active(self) -> bool: def is_active(self) -> bool:
"""Check if the campaign is currently active.""" """Check if the campaign is currently active."""
@ -1294,11 +759,6 @@ class DropCampaign(auto_prefetch.Model):
logger.debug("Failed to resolve DropCampaign.image_file url: %s", exc) logger.debug("Failed to resolve DropCampaign.image_file url: %s", exc)
return self.image_url or "" return self.image_url or ""
@property
def dashboard_image_url(self) -> str:
"""Return dashboard-safe campaign image URL without touching deferred image fields."""
return self.image_url or ""
@property @property
def duration_iso(self) -> str: def duration_iso(self) -> str:
"""Return the campaign duration in ISO 8601 format (e.g., 'P3DT4H30M'). """Return the campaign duration in ISO 8601 format (e.g., 'P3DT4H30M').
@ -1442,60 +902,12 @@ class DropBenefit(auto_prefetch.Model):
"""Return a string representation of the drop benefit.""" """Return a string representation of the drop benefit."""
return self.name return self.name
@classmethod
def emotes_for_gallery(cls) -> list[dict[str, str | DropCampaign]]:
"""Return emote gallery entries with only fields needed by the template.
The emote gallery needs benefit image URL and campaign name/twitch_id.
"""
emote_benefits: QuerySet[DropBenefit, DropBenefit] = (
cls.objects
.filter(distribution_type="EMOTE")
.only("twitch_id", "image_asset_url", "image_file")
.prefetch_related(
Prefetch(
"drops",
queryset=(
TimeBasedDrop.objects.select_related("campaign").only(
"campaign_id",
"campaign__twitch_id",
"campaign__name",
)
),
to_attr="_emote_drops_for_gallery",
),
)
)
emotes: list[dict[str, str | DropCampaign]] = []
for benefit in emote_benefits:
drop: TimeBasedDrop | None = next(
(
drop
for drop in getattr(benefit, "_emote_drops_for_gallery", [])
if drop.campaign_id
),
None,
)
if not drop:
continue
emotes.append({
"image_url": benefit.image_best_url,
"campaign": drop.campaign,
})
return emotes
@property @property
def image_best_url(self) -> str: def image_best_url(self) -> str:
"""Return the best URL for the benefit image (local first).""" """Return the best URL for the benefit image (local first)."""
try: try:
if self.image_file: if self.image_file and getattr(self.image_file, "url", None):
file_name: str = getattr(self.image_file, "name", "") return self.image_file.url
if file_name and self.image_file.storage.exists(file_name):
file_url: str | None = getattr(self.image_file, "url", None)
if file_url:
return file_url
except (AttributeError, OSError, ValueError) as exc: except (AttributeError, OSError, ValueError) as exc:
logger.debug("Failed to resolve DropBenefit.image_file url: %s", exc) logger.debug("Failed to resolve DropBenefit.image_file url: %s", exc)
return self.image_asset_url or "" return self.image_asset_url or ""
@ -1778,10 +1190,6 @@ class RewardCampaign(auto_prefetch.Model):
fields=["starts_at", "ends_at"], fields=["starts_at", "ends_at"],
name="tw_reward_starts_ends_idx", name="tw_reward_starts_ends_idx",
), ),
models.Index(
fields=["ends_at", "-starts_at"],
name="tw_reward_ends_starts_idx",
),
models.Index(fields=["status", "-starts_at"]), models.Index(fields=["status", "-starts_at"]),
] ]

View file

@ -3,8 +3,6 @@ from __future__ import annotations
import logging import logging
from typing import Any from typing import Any
from django.db.models import Count
logger = logging.getLogger("ttvdrops.signals") logger = logging.getLogger("ttvdrops.signals")
@ -65,65 +63,3 @@ def on_reward_campaign_saved(
from twitch.tasks import download_reward_campaign_image # noqa: PLC0415 from twitch.tasks import download_reward_campaign_image # noqa: PLC0415
_dispatch(download_reward_campaign_image, instance.pk) _dispatch(download_reward_campaign_image, instance.pk)
def _refresh_allowed_campaign_counts(channel_ids: set[int]) -> None:
"""Recompute and persist cached campaign counters for the given channels."""
if not channel_ids:
return
from twitch.models import Channel # noqa: PLC0415
from twitch.models import DropCampaign # noqa: PLC0415
through_model: type[Channel] = DropCampaign.allow_channels.through
counts_by_channel: dict[int, int] = {
row["channel_id"]: row["campaign_count"]
for row in (
through_model.objects
.filter(channel_id__in=channel_ids)
.values("channel_id")
.annotate(campaign_count=Count("dropcampaign_id"))
)
}
channels = list(
Channel.objects.filter(pk__in=channel_ids).only("pk", "allowed_campaign_count"),
)
for channel in channels:
channel.allowed_campaign_count = counts_by_channel.get(channel.pk, 0)
if channels:
Channel.objects.bulk_update(channels, ["allowed_campaign_count"])
def on_drop_campaign_allow_channels_changed( # noqa: PLR0913, PLR0917
sender: Any, # noqa: ANN401
instance: Any, # noqa: ANN401
action: str,
reverse: bool, # noqa: FBT001
model: Any, # noqa: ANN401
pk_set: set[int] | None,
**kwargs: Any, # noqa: ANN401
) -> None:
"""Keep Channel.allowed_campaign_count in sync for allow_channels M2M changes."""
if action == "pre_clear" and not reverse:
# post_clear does not expose removed channel IDs; snapshot before clearing.
instance._pre_clear_channel_ids = set( # pyright: ignore[reportAttributeAccessIssue] # noqa: SLF001
instance.allow_channels.values_list("pk", flat=True), # pyright: ignore[reportAttributeAccessIssue]
)
return
if action not in {"post_add", "post_remove", "post_clear"}:
return
channel_ids: set[int] = set()
if reverse:
channel_pk: int | None = getattr(instance, "pk", None)
if isinstance(channel_pk, int):
channel_ids.add(channel_pk)
elif action == "post_clear":
channel_ids = set(getattr(instance, "_pre_clear_channel_ids", set()))
else:
channel_ids = set(pk_set or set())
_refresh_allowed_campaign_counts(channel_ids)

View file

@ -1,86 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING
import pytest
from django.db import connection
from django.db.migrations.executor import MigrationExecutor
from django.db.migrations.state import StateApps
if TYPE_CHECKING:
from django.db.migrations.state import StateApps
from twitch.models import Channel
from twitch.models import DropCampaign
from twitch.models import Game
@pytest.mark.django_db(transaction=True)
def test_0021_backfills_allowed_campaign_count() -> None: # noqa: PLR0914
"""Migration 0021 should backfill cached allowed campaign counts."""
migrate_from: list[tuple[str, str]] = [
("twitch", "0020_rewardcampaign_tw_reward_ends_starts_idx"),
]
migrate_to: list[tuple[str, str]] = [
("twitch", "0021_channel_allowed_campaign_count_cache"),
]
executor = MigrationExecutor(connection)
executor.migrate(migrate_from)
old_apps: StateApps = executor.loader.project_state(migrate_from).apps
Game: type[Game] = old_apps.get_model("twitch", "Game")
Channel: type[Channel] = old_apps.get_model("twitch", "Channel")
DropCampaign: type[DropCampaign] = old_apps.get_model("twitch", "DropCampaign")
game = Game.objects.create(
twitch_id="migration_backfill_game",
name="Migration Backfill Game",
display_name="Migration Backfill Game",
)
channel1 = Channel.objects.create(
twitch_id="migration_backfill_channel_1",
name="migrationbackfillchannel1",
display_name="Migration Backfill Channel 1",
)
channel2 = Channel.objects.create(
twitch_id="migration_backfill_channel_2",
name="migrationbackfillchannel2",
display_name="Migration Backfill Channel 2",
)
_channel3 = Channel.objects.create(
twitch_id="migration_backfill_channel_3",
name="migrationbackfillchannel3",
display_name="Migration Backfill Channel 3",
)
campaign1 = DropCampaign.objects.create(
twitch_id="migration_backfill_campaign_1",
name="Migration Backfill Campaign 1",
game=game,
operation_names=["DropCampaignDetails"],
)
campaign2 = DropCampaign.objects.create(
twitch_id="migration_backfill_campaign_2",
name="Migration Backfill Campaign 2",
game=game,
operation_names=["DropCampaignDetails"],
)
campaign1.allow_channels.add(channel1, channel2)
campaign2.allow_channels.add(channel1)
executor = MigrationExecutor(connection)
executor.migrate(migrate_to)
new_apps: StateApps = executor.loader.project_state(migrate_to).apps
new_channel: type[Channel] = new_apps.get_model("twitch", "Channel")
counts_by_twitch_id: dict[str, int] = {
channel.twitch_id: channel.allowed_campaign_count
for channel in new_channel.objects.order_by("twitch_id")
}
assert counts_by_twitch_id == {
"migration_backfill_channel_1": 2,
"migration_backfill_channel_2": 1,
"migration_backfill_channel_3": 0,
}

View file

@ -36,8 +36,6 @@ from twitch.views import _build_seo_context
from twitch.views import _truncate_description from twitch.views import _truncate_description
if TYPE_CHECKING: if TYPE_CHECKING:
from collections import OrderedDict
from django.core.handlers.wsgi import WSGIRequest from django.core.handlers.wsgi import WSGIRequest
from django.db.models import QuerySet from django.db.models import QuerySet
from django.test import Client from django.test import Client
@ -494,314 +492,6 @@ class TestChannelListView:
assert len(channels) == 1 assert len(channels) == 1
assert channels[0].twitch_id == channel.twitch_id assert channels[0].twitch_id == channel.twitch_id
def test_channel_list_queryset_only_selects_rendered_fields(self) -> None:
"""Channel list queryset should defer non-rendered fields."""
channel: Channel = Channel.objects.create(
twitch_id="channel_minimal_fields",
name="channelminimalfields",
display_name="Channel Minimal Fields",
)
queryset: QuerySet[Channel] = Channel.for_list_view()
fetched_channel: Channel | None = queryset.filter(
twitch_id=channel.twitch_id,
).first()
assert fetched_channel is not None
assert hasattr(fetched_channel, "campaign_count")
deferred_fields: set[str] = fetched_channel.get_deferred_fields()
assert "added_at" in deferred_fields
assert "updated_at" in deferred_fields
assert "name" not in deferred_fields
assert "display_name" not in deferred_fields
assert "twitch_id" not in deferred_fields
def test_channel_list_queryset_uses_counter_cache_without_join(self) -> None:
"""Channel list SQL should use cached count and avoid campaign join/grouping."""
sql: str = str(Channel.for_list_view().query).upper()
assert "TWITCH_DROPCAMPAIGN_ALLOW_CHANNELS" not in sql
assert "GROUP BY" not in sql
assert "ALLOWED_CAMPAIGN_COUNT" in sql
def test_channel_detail_queryset_only_selects_rendered_fields(self) -> None:
"""Channel detail queryset should defer fields not used by the template/SEO."""
channel: Channel = Channel.objects.create(
twitch_id="channel_detail_fields",
name="channeldetailfields",
display_name="Channel Detail Fields",
)
fetched_channel: Channel | None = (
Channel
.for_detail_view()
.filter(
twitch_id=channel.twitch_id,
)
.first()
)
assert fetched_channel is not None
deferred_fields: set[str] = fetched_channel.get_deferred_fields()
assert "allowed_campaign_count" in deferred_fields
assert "name" not in deferred_fields
assert "display_name" not in deferred_fields
assert "twitch_id" not in deferred_fields
assert "added_at" not in deferred_fields
assert "updated_at" not in deferred_fields
def test_channel_detail_campaign_queryset_only_selects_rendered_fields(
self,
) -> None:
"""Channel detail campaign queryset should avoid loading unused campaign fields."""
now: datetime.datetime = timezone.now()
game: Game = Game.objects.create(
twitch_id="channel_detail_game_fields",
name="Channel Detail Game Fields",
display_name="Channel Detail Game Fields",
)
channel: Channel = Channel.objects.create(
twitch_id="channel_detail_campaign_fields",
name="channeldetailcampaignfields",
display_name="Channel Detail Campaign Fields",
)
campaign: DropCampaign = DropCampaign.objects.create(
twitch_id="channel_detail_campaign",
name="Channel Detail Campaign",
game=game,
operation_names=["DropCampaignDetails"],
start_at=now - timedelta(hours=1),
end_at=now + timedelta(hours=1),
)
campaign.allow_channels.add(channel)
fetched_campaign: DropCampaign | None = DropCampaign.for_channel_detail(
channel,
).first()
assert fetched_campaign is not None
deferred_fields: set[str] = fetched_campaign.get_deferred_fields()
assert "description" in deferred_fields
assert "details_url" in deferred_fields
assert "account_link_url" in deferred_fields
assert "name" not in deferred_fields
assert "start_at" not in deferred_fields
assert "end_at" not in deferred_fields
def test_channel_detail_prefetch_avoids_dropbenefit_refresh_n_plus_one(
self,
) -> None:
"""Channel detail prefetch should not refresh each DropBenefit row for image dimensions."""
now: datetime.datetime = timezone.now()
game: Game = Game.objects.create(
twitch_id="channel_detail_n_plus_one_game",
name="Channel Detail N+1 Game",
display_name="Channel Detail N+1 Game",
)
channel: Channel = Channel.objects.create(
twitch_id="channel_detail_n_plus_one_channel",
name="channeldetailnplusone",
display_name="Channel Detail N+1",
)
campaign: DropCampaign = DropCampaign.objects.create(
twitch_id="channel_detail_n_plus_one_campaign",
name="Channel Detail N+1 Campaign",
game=game,
operation_names=["DropCampaignDetails"],
start_at=now - timedelta(hours=1),
end_at=now + timedelta(hours=1),
)
campaign.allow_channels.add(channel)
drop: TimeBasedDrop = TimeBasedDrop.objects.create(
twitch_id="channel_detail_n_plus_one_drop",
name="Channel Detail N+1 Drop",
campaign=campaign,
)
png_1x1: bytes = (
b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01"
b"\x00\x00\x00\x01\x08\x06\x00\x00\x00\x1f\x15\xc4\x89"
b"\x00\x00\x00\x0bIDATx\x9cc\x00\x01\x00\x00\x05\x00\x01"
b"\r\n-\xb4\x00\x00\x00\x00IEND\xaeB`\x82"
)
benefits: list[DropBenefit] = []
for i in range(3):
benefit: DropBenefit = DropBenefit.objects.create(
twitch_id=f"channel_detail_n_plus_one_benefit_{i}",
name=f"Benefit {i}",
image_asset_url=f"https://example.com/benefit_{i}.png",
)
assert benefit.image_file is not None
benefit.image_file.save(
f"channel_detail_n_plus_one_benefit_{i}.png",
ContentFile(png_1x1),
save=True,
)
benefits.append(benefit)
drop.benefits.add(*benefits)
with CaptureQueriesContext(connection) as queries:
campaigns: list[DropCampaign] = list(
DropCampaign.for_channel_detail(channel),
)
assert campaigns
_ = [
benefit.name
for campaign_row in campaigns
for drop_row in campaign_row.time_based_drops.all() # pyright: ignore[reportAttributeAccessIssue]
for benefit in drop_row.benefits.all()
]
refresh_queries: list[str] = [
query_info["sql"]
for query_info in queries.captured_queries
if query_info["sql"].lstrip().upper().startswith("SELECT")
and 'from "twitch_dropbenefit"' in query_info["sql"].lower()
and 'where "twitch_dropbenefit"."id" =' in query_info["sql"].lower()
]
assert not refresh_queries, (
"Channel detail queryset triggered per-benefit refresh SELECTs. "
f"Queries: {refresh_queries}"
)
def test_channel_detail_uses_asset_url_when_local_benefit_file_is_missing(
self,
client: Client,
) -> None:
"""Channel detail should avoid broken local image URLs when cached files are missing."""
now: datetime.datetime = timezone.now()
game: Game = Game.objects.create(
twitch_id="channel_detail_missing_local_file_game",
name="Channel Detail Missing Local File Game",
display_name="Channel Detail Missing Local File Game",
)
channel: Channel = Channel.objects.create(
twitch_id="channel_detail_missing_local_file_channel",
name="missinglocalfilechannel",
display_name="Missing Local File Channel",
)
campaign: DropCampaign = DropCampaign.objects.create(
twitch_id="channel_detail_missing_local_file_campaign",
name="Channel Detail Missing Local File Campaign",
game=game,
operation_names=["DropCampaignDetails"],
start_at=now - timedelta(hours=1),
end_at=now + timedelta(hours=1),
)
campaign.allow_channels.add(channel)
drop: TimeBasedDrop = TimeBasedDrop.objects.create(
twitch_id="channel_detail_missing_local_file_drop",
name="Channel Detail Missing Local File Drop",
campaign=campaign,
)
remote_asset_url: str = "https://example.com/benefit-missing-local-file.png"
benefit: DropBenefit = DropBenefit.objects.create(
twitch_id="channel_detail_missing_local_file_benefit",
name="Benefit Missing Local File",
image_asset_url=remote_asset_url,
)
DropBenefit.objects.filter(pk=benefit.pk).update(
image_file="benefits/images/does-not-exist.png",
)
drop.benefits.add(benefit)
response: _MonkeyPatchedWSGIResponse = client.get(
reverse("twitch:channel_detail", args=[channel.twitch_id]),
)
assert response.status_code == 200
html: str = response.content.decode("utf-8")
assert remote_asset_url in html
assert "benefits/images/does-not-exist.png" not in html
def test_channel_allowed_campaign_count_updates_on_add_remove_clear(self) -> None:
"""Counter cache should stay in sync when campaign-channel links change."""
game: Game = Game.objects.create(
twitch_id="counter_cache_game",
name="Counter Cache Game",
display_name="Counter Cache Game",
)
channel: Channel = Channel.objects.create(
twitch_id="counter_cache_channel",
name="countercachechannel",
display_name="Counter Cache Channel",
)
campaign1: DropCampaign = DropCampaign.objects.create(
twitch_id="counter_cache_campaign_1",
name="Counter Cache Campaign 1",
game=game,
operation_names=["DropCampaignDetails"],
)
campaign2: DropCampaign = DropCampaign.objects.create(
twitch_id="counter_cache_campaign_2",
name="Counter Cache Campaign 2",
game=game,
operation_names=["DropCampaignDetails"],
)
campaign1.allow_channels.add(channel)
channel.refresh_from_db()
assert channel.allowed_campaign_count == 1
campaign2.allow_channels.add(channel)
channel.refresh_from_db()
assert channel.allowed_campaign_count == 2
campaign1.allow_channels.remove(channel)
channel.refresh_from_db()
assert channel.allowed_campaign_count == 1
campaign2.allow_channels.clear()
channel.refresh_from_db()
assert channel.allowed_campaign_count == 0
def test_channel_allowed_campaign_count_updates_on_set(self) -> None:
"""Counter cache should stay in sync when allow_channels.set(...) is used."""
game: Game = Game.objects.create(
twitch_id="counter_cache_set_game",
name="Counter Cache Set Game",
display_name="Counter Cache Set Game",
)
channel1: Channel = Channel.objects.create(
twitch_id="counter_cache_set_channel_1",
name="countercachesetchannel1",
display_name="Counter Cache Set Channel 1",
)
channel2: Channel = Channel.objects.create(
twitch_id="counter_cache_set_channel_2",
name="countercachesetchannel2",
display_name="Counter Cache Set Channel 2",
)
campaign: DropCampaign = DropCampaign.objects.create(
twitch_id="counter_cache_set_campaign",
name="Counter Cache Set Campaign",
game=game,
operation_names=["DropCampaignDetails"],
)
campaign.allow_channels.set([channel1, channel2])
channel1.refresh_from_db()
channel2.refresh_from_db()
assert channel1.allowed_campaign_count == 1
assert channel2.allowed_campaign_count == 1
campaign.allow_channels.set([channel2])
channel1.refresh_from_db()
channel2.refresh_from_db()
assert channel1.allowed_campaign_count == 0
assert channel2.allowed_campaign_count == 1
@pytest.mark.django_db @pytest.mark.django_db
def test_dashboard_view(self, client: Client) -> None: def test_dashboard_view(self, client: Client) -> None:
"""Test dashboard view returns 200 and has grouped campaign data in context.""" """Test dashboard view returns 200 and has grouped campaign data in context."""
@ -853,7 +543,7 @@ class TestChannelListView:
assert len(context["campaigns_by_game"][game.twitch_id]["campaigns"]) == 1 assert len(context["campaigns_by_game"][game.twitch_id]["campaigns"]) == 1
@pytest.mark.django_db @pytest.mark.django_db
def test_dashboard_queries_use_indexes(self, client: Client) -> None: def test_dashboard_queries_use_indexes(self) -> None:
"""Dashboard source queries should use indexes for active-window filtering.""" """Dashboard source queries should use indexes for active-window filtering."""
now: datetime.datetime = timezone.now() now: datetime.datetime = timezone.now()
@ -937,45 +627,20 @@ class TestChannelListView:
RewardCampaign.active_for_dashboard(now) RewardCampaign.active_for_dashboard(now)
) )
response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:dashboard"))
assert response.status_code == 200
campaigns_plan: str = active_campaigns_qs.explain() campaigns_plan: str = active_campaigns_qs.explain()
reward_plan: str = active_reward_campaigns_qs.explain() reward_plan: str = active_reward_campaigns_qs.explain()
if connection.vendor == "sqlite": if connection.vendor == "sqlite":
campaigns_uses_index: bool = "USING INDEX" in campaigns_plan.upper() campaigns_uses_index: bool = "USING INDEX" in campaigns_plan.upper()
rewards_uses_index: bool = "USING INDEX" in reward_plan.upper() rewards_uses_index: bool = "USING INDEX" in reward_plan.upper()
campaigns_uses_expected_index: bool = (
"tw_drop_start_end_idx" in campaigns_plan
or "tw_drop_start_end_game_idx" in campaigns_plan
or "tw_drop_start_desc_idx" in campaigns_plan
)
rewards_uses_expected_index: bool = (
"tw_reward_starts_ends_idx" in reward_plan
or "tw_reward_ends_starts_idx" in reward_plan
or "tw_reward_starts_desc_idx" in reward_plan
)
elif connection.vendor == "postgresql": elif connection.vendor == "postgresql":
campaigns_uses_index = ( campaigns_uses_index = (
"INDEX SCAN" in campaigns_plan.upper() "INDEX SCAN" in campaigns_plan.upper()
or "BITMAP INDEX SCAN" in campaigns_plan.upper() or "BITMAP INDEX SCAN" in campaigns_plan.upper()
or "INDEX ONLY SCAN" in campaigns_plan.upper()
) )
rewards_uses_index = ( rewards_uses_index = (
"INDEX SCAN" in reward_plan.upper() "INDEX SCAN" in reward_plan.upper()
or "BITMAP INDEX SCAN" in reward_plan.upper() or "BITMAP INDEX SCAN" in reward_plan.upper()
or "INDEX ONLY SCAN" in reward_plan.upper()
)
campaigns_uses_expected_index = (
"tw_drop_start_end_idx" in campaigns_plan
or "tw_drop_start_end_game_idx" in campaigns_plan
or "tw_drop_start_desc_idx" in campaigns_plan
)
rewards_uses_expected_index = (
"tw_reward_starts_ends_idx" in reward_plan
or "tw_reward_ends_starts_idx" in reward_plan
or "tw_reward_starts_desc_idx" in reward_plan
) )
else: else:
pytest.skip( pytest.skip(
@ -984,78 +649,6 @@ class TestChannelListView:
assert campaigns_uses_index, campaigns_plan assert campaigns_uses_index, campaigns_plan
assert rewards_uses_index, reward_plan assert rewards_uses_index, reward_plan
assert campaigns_uses_expected_index, campaigns_plan
assert rewards_uses_expected_index, reward_plan
@pytest.mark.django_db
def test_dashboard_context_uses_prefetched_data_without_n_plus_one(self) -> None:
"""Dashboard context should not trigger extra queries when rendering-used attrs are accessed."""
now: datetime.datetime = timezone.now()
org: Organization = Organization.objects.create(
twitch_id="org_dashboard_prefetch",
name="Org Dashboard Prefetch",
)
game: Game = Game.objects.create(
twitch_id="game_dashboard_prefetch",
name="Game Dashboard Prefetch",
display_name="Game Dashboard Prefetch",
)
game.owners.add(org)
channel: Channel = Channel.objects.create(
twitch_id="channel_dashboard_prefetch",
name="channeldashboardprefetch",
display_name="Channel Dashboard Prefetch",
)
campaign: DropCampaign = DropCampaign.objects.create(
twitch_id="campaign_dashboard_prefetch",
name="Campaign Dashboard Prefetch",
game=game,
operation_names=["DropCampaignDetails"],
start_at=now - timedelta(hours=1),
end_at=now + timedelta(hours=1),
)
campaign.allow_channels.add(channel)
RewardCampaign.objects.create(
twitch_id="reward_dashboard_prefetch",
name="Reward Dashboard Prefetch",
game=game,
starts_at=now - timedelta(hours=1),
ends_at=now + timedelta(hours=1),
)
dashboard_data: dict[str, Any] = DropCampaign.dashboard_context(now)
campaigns_by_game: OrderedDict[str, dict[str, Any]] = dashboard_data[
"campaigns_by_game"
]
reward_campaigns: list[RewardCampaign] = list(
dashboard_data["active_reward_campaigns"],
)
with CaptureQueriesContext(connection) as capture:
game_bucket: dict[str, Any] = campaigns_by_game[game.twitch_id]
_ = game_bucket["name"]
_ = game_bucket["box_art"]
_ = [owner.name for owner in game_bucket["owners"]]
campaign_entry: dict[str, Any] = game_bucket["campaigns"][0]
campaign_obj: DropCampaign = campaign_entry["campaign"]
_ = campaign_obj.clean_name
_ = campaign_obj.duration_iso
_ = campaign_obj.start_at
_ = campaign_obj.end_at
_ = campaign_entry["image_url"]
_ = campaign_entry["game_twitch_directory_url"]
_ = [c.display_name for c in campaign_entry["allowed_channels"]]
_ = [r.is_active for r in reward_campaigns]
_ = [r.game.display_name if r.game else None for r in reward_campaigns]
assert len(capture) == 0
@pytest.mark.django_db @pytest.mark.django_db
def test_dashboard_query_plans_reference_expected_index_names(self) -> None: def test_dashboard_query_plans_reference_expected_index_names(self) -> None:
@ -1117,7 +710,6 @@ class TestChannelListView:
expected_reward_indexes: set[str] = { expected_reward_indexes: set[str] = {
"tw_reward_starts_desc_idx", "tw_reward_starts_desc_idx",
"tw_reward_starts_ends_idx", "tw_reward_starts_ends_idx",
"tw_reward_ends_starts_idx",
} }
drop_index_names: set[str] = _index_names(DropCampaign._meta.db_table) drop_index_names: set[str] = _index_names(DropCampaign._meta.db_table)
@ -1149,36 +741,6 @@ class TestChannelListView:
f"Expected one of {sorted(expected_reward_indexes)}. Plan={reward_plan}" f"Expected one of {sorted(expected_reward_indexes)}. Plan={reward_plan}"
) )
@pytest.mark.django_db
def test_dashboard_active_window_composite_indexes_exist(self) -> None:
"""Dashboard active-window filters should have supporting composite indexes."""
with connection.cursor() as cursor:
drop_constraints = connection.introspection.get_constraints(
cursor,
DropCampaign._meta.db_table,
)
reward_constraints = connection.introspection.get_constraints(
cursor,
RewardCampaign._meta.db_table,
)
def _index_columns(constraints: dict[str, Any]) -> list[tuple[str, ...]]:
columns: list[tuple[str, ...]] = []
for meta in constraints.values():
if not meta.get("index"):
continue
index_columns: list[str] = meta.get("columns") or []
columns.append(tuple(index_columns))
return columns
drop_index_columns: list[tuple[str, ...]] = _index_columns(drop_constraints)
reward_index_columns: list[tuple[str, ...]] = _index_columns(
reward_constraints,
)
assert ("start_at", "end_at") in drop_index_columns
assert ("starts_at", "ends_at") in reward_index_columns
@pytest.mark.django_db @pytest.mark.django_db
def test_dashboard_query_count_stays_flat_with_more_data( def test_dashboard_query_count_stays_flat_with_more_data(
self, self,
@ -1287,90 +849,6 @@ class TestChannelListView:
f"baseline={baseline_select_count}, scaled={scaled_select_count}" f"baseline={baseline_select_count}, scaled={scaled_select_count}"
) )
@pytest.mark.django_db
def test_dashboard_field_access_after_prefetch_has_no_extra_selects(self) -> None:
"""Dashboard-accessed fields should not trigger deferred model SELECT queries."""
now: datetime.datetime = timezone.now()
org: Organization = Organization.objects.create(
twitch_id="org_dashboard_field_access",
name="Org Dashboard Field Access",
)
game: Game = Game.objects.create(
twitch_id="game_dashboard_field_access",
name="Game Dashboard Field Access",
display_name="Game Dashboard Field Access",
slug="game-dashboard-field-access",
box_art="https://example.com/game-box-art.jpg",
)
game.owners.add(org)
campaign: DropCampaign = DropCampaign.objects.create(
twitch_id="campaign_dashboard_field_access",
name="Campaign Dashboard Field Access",
game=game,
operation_names=["DropCampaignDetails"],
image_url="https://example.com/campaign.jpg",
start_at=now - timedelta(hours=1),
end_at=now + timedelta(hours=1),
)
channel: Channel = Channel.objects.create(
twitch_id="channel_dashboard_field_access",
name="channeldashboardfieldaccess",
display_name="Channel Dashboard Field Access",
)
campaign.allow_channels.add(channel)
RewardCampaign.objects.create(
twitch_id="reward_dashboard_field_access",
name="Reward Dashboard Field Access",
brand="Brand",
summary="Reward summary",
is_sitewide=False,
game=game,
starts_at=now - timedelta(hours=1),
ends_at=now + timedelta(hours=1),
)
dashboard_rewards_qs: QuerySet[RewardCampaign] = (
RewardCampaign.active_for_dashboard(now)
)
dashboard_campaigns_qs: QuerySet[DropCampaign] = (
DropCampaign.active_for_dashboard(now)
)
rewards_list: list[RewardCampaign] = list(dashboard_rewards_qs)
list(dashboard_campaigns_qs)
with CaptureQueriesContext(connection) as queries:
# Use pre-evaluated queryset to avoid capturing initial SELECT queries
grouped = DropCampaign.grouped_by_game(dashboard_campaigns_qs)
for reward in rewards_list:
_ = reward.twitch_id
_ = reward.name
_ = reward.brand
_ = reward.summary
_ = reward.starts_at
_ = reward.ends_at
_ = reward.is_sitewide
_ = reward.is_active
if reward.game:
_ = reward.game.twitch_id
_ = reward.game.display_name
assert game.twitch_id in grouped
deferred_selects: list[str] = [
query_info["sql"]
for query_info in queries.captured_queries
if query_info["sql"].lstrip().upper().startswith("SELECT")
]
assert not deferred_selects, (
"Dashboard model field access triggered unexpected deferred SELECT queries. "
f"Queries: {deferred_selects}"
)
@pytest.mark.django_db @pytest.mark.django_db
def test_dashboard_grouping_reuses_selected_game_relation(self) -> None: def test_dashboard_grouping_reuses_selected_game_relation(self) -> None:
"""Dashboard grouping should not issue extra standalone Game queries.""" """Dashboard grouping should not issue extra standalone Game queries."""
@ -2097,75 +1575,6 @@ class TestChannelListView:
html = response.content.decode("utf-8") html = response.content.decode("utf-8")
assert "This badge was earned by subscribing." in html assert "This badge was earned by subscribing." in html
@pytest.mark.django_db
def test_drop_campaign_detail_badge_queries_stay_flat(self, client: Client) -> None:
"""Campaign detail should avoid N+1 ChatBadge lookups across many badge drops."""
now: datetime.datetime = timezone.now()
game: Game = Game.objects.create(
twitch_id="g-badge-flat",
name="Game",
display_name="Game",
)
campaign: DropCampaign = DropCampaign.objects.create(
twitch_id="c-badge-flat",
name="Campaign",
game=game,
operation_names=["DropCampaignDetails"],
start_at=now - timedelta(hours=1),
end_at=now + timedelta(hours=1),
)
badge_set = ChatBadgeSet.objects.create(set_id="badge-flat")
def _create_badge_drop(i: int) -> None:
drop = TimeBasedDrop.objects.create(
twitch_id=f"flat-drop-{i}",
name=f"Drop {i}",
campaign=campaign,
required_minutes_watched=i,
required_subs=0,
start_at=now - timedelta(hours=2),
end_at=now + timedelta(hours=2),
)
title = f"Badge {i}"
benefit = DropBenefit.objects.create(
twitch_id=f"flat-benefit-{i}",
name=title,
distribution_type="BADGE",
)
drop.benefits.add(benefit)
ChatBadge.objects.create(
badge_set=badge_set,
badge_id=str(i),
image_url_1x=f"https://example.com/{i}/1x.png",
image_url_2x=f"https://example.com/{i}/2x.png",
image_url_4x=f"https://example.com/{i}/4x.png",
title=title,
description=f"Badge description {i}",
)
def _select_count() -> int:
url: str = reverse("twitch:campaign_detail", args=[campaign.twitch_id])
with CaptureQueriesContext(connection) as capture:
response: _MonkeyPatchedWSGIResponse = client.get(url)
assert response.status_code == 200
return sum(
1
for query in capture.captured_queries
if query["sql"].lstrip().upper().startswith("SELECT")
)
_create_badge_drop(1)
baseline_selects: int = _select_count()
for i in range(2, 22):
_create_badge_drop(i)
expanded_selects: int = _select_count()
# Query volume should remain effectively constant as badge-drop count grows.
assert expanded_selects <= baseline_selects + 2
@pytest.mark.django_db @pytest.mark.django_db
def test_games_grid_view(self, client: Client) -> None: def test_games_grid_view(self, client: Client) -> None:
"""Test games grid view returns 200 and has games in context.""" """Test games grid view returns 200 and has games in context."""
@ -2173,70 +1582,6 @@ class TestChannelListView:
assert response.status_code == 200 assert response.status_code == 200
assert "games" in response.context assert "games" in response.context
@pytest.mark.django_db
def test_games_grid_view_groups_only_games_with_campaigns(
self,
client: Client,
) -> None:
"""Games grid should group only games that actually have campaigns."""
now: datetime.datetime = timezone.now()
org_with_campaign: Organization = Organization.objects.create(
twitch_id="org-games-grid-with-campaign",
name="Org Games Grid With Campaign",
)
org_without_campaign: Organization = Organization.objects.create(
twitch_id="org-games-grid-without-campaign",
name="Org Games Grid Without Campaign",
)
game_with_campaign: Game = Game.objects.create(
twitch_id="game-games-grid-with-campaign",
name="Game Games Grid With Campaign",
display_name="Game Games Grid With Campaign",
)
game_with_campaign.owners.add(org_with_campaign)
game_without_campaign: Game = Game.objects.create(
twitch_id="game-games-grid-without-campaign",
name="Game Games Grid Without Campaign",
display_name="Game Games Grid Without Campaign",
)
game_without_campaign.owners.add(org_without_campaign)
DropCampaign.objects.create(
twitch_id="campaign-games-grid-with-campaign",
name="Campaign Games Grid With Campaign",
game=game_with_campaign,
operation_names=["DropCampaignDetails"],
start_at=now - timedelta(hours=1),
end_at=now + timedelta(hours=1),
)
response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:games_grid"))
assert response.status_code == 200
context: ContextList | dict[str, Any] = response.context # type: ignore[assignment]
if isinstance(context, list):
context = context[-1]
games: list[Game] = list(context["games"])
games_by_org: OrderedDict[Organization, list[dict[str, Game]]] = context[
"games_by_org"
]
game_ids: set[str] = {game.twitch_id for game in games}
assert game_with_campaign.twitch_id in game_ids
assert game_without_campaign.twitch_id not in game_ids
grouped_ids: set[str] = {
item["game"].twitch_id
for grouped_games in games_by_org.values()
for item in grouped_games
}
assert game_with_campaign.twitch_id in grouped_ids
assert game_without_campaign.twitch_id not in grouped_ids
@pytest.mark.django_db @pytest.mark.django_db
def test_games_list_view(self, client: Client) -> None: def test_games_list_view(self, client: Client) -> None:
"""Test games list view returns 200 and has games in context.""" """Test games list view returns 200 and has games in context."""
@ -2257,58 +1602,6 @@ class TestChannelListView:
assert response.status_code == 200 assert response.status_code == 200
assert "game" in response.context assert "game" in response.context
@pytest.mark.django_db
def test_game_detail_campaign_query_plan_uses_game_end_index(self) -> None:
"""Game-detail campaign list query should use the game/end_at composite index."""
now: datetime.datetime = timezone.now()
game: Game = Game.objects.create(
twitch_id="game_detail_idx_game",
name="Game Detail Index Game",
display_name="Game Detail Index Game",
)
campaigns: list[DropCampaign] = []
for i in range(200):
campaigns.extend((
DropCampaign(
twitch_id=f"game_detail_idx_old_{i}",
name=f"Old campaign {i}",
game=game,
operation_names=["DropCampaignDetails"],
start_at=now - timedelta(days=90),
end_at=now - timedelta(days=60),
),
DropCampaign(
twitch_id=f"game_detail_idx_future_{i}",
name=f"Future campaign {i}",
game=game,
operation_names=["DropCampaignDetails"],
start_at=now + timedelta(days=60),
end_at=now + timedelta(days=90),
),
))
campaigns.append(
DropCampaign(
twitch_id="game_detail_idx_active",
name="Active campaign",
game=game,
operation_names=["DropCampaignDetails"],
start_at=now - timedelta(hours=1),
end_at=now + timedelta(hours=1),
),
)
DropCampaign.objects.bulk_create(campaigns)
plan: str = DropCampaign.for_game_detail(game).explain().lower()
if connection.vendor in {"sqlite", "postgresql"}:
assert "tw_drop_game_end_desc_idx" in plan, plan
else:
pytest.skip(
f"Unsupported DB vendor for index-name plan assertion: {connection.vendor}",
)
@pytest.mark.django_db @pytest.mark.django_db
def test_game_detail_image_aspect_ratio(self, client: Client, db: None) -> None: def test_game_detail_image_aspect_ratio(self, client: Client, db: None) -> None:
"""Box art should render with a width attribute only, preserving aspect ratio.""" """Box art should render with a width attribute only, preserving aspect ratio."""
@ -2364,30 +1657,6 @@ class TestChannelListView:
assert response.status_code == 200 assert response.status_code == 200
assert "orgs" in response.context assert "orgs" in response.context
@pytest.mark.django_db
def test_org_list_queryset_only_selects_rendered_fields(self) -> None:
"""Organization list queryset should defer non-rendered fields."""
org: Organization = Organization.objects.create(
twitch_id="org_list_fields",
name="Org List Fields",
)
fetched_org: Organization | None = (
Organization
.for_list_view()
.filter(
twitch_id=org.twitch_id,
)
.first()
)
assert fetched_org is not None
deferred_fields: set[str] = fetched_org.get_deferred_fields()
assert "added_at" in deferred_fields
assert "updated_at" in deferred_fields
assert "twitch_id" not in deferred_fields
assert "name" not in deferred_fields
@pytest.mark.django_db @pytest.mark.django_db
def test_organization_detail_view(self, client: Client, db: None) -> None: def test_organization_detail_view(self, client: Client, db: None) -> None:
"""Test organization detail view returns 200 and has organization in context.""" """Test organization detail view returns 200 and has organization in context."""
@ -2397,73 +1666,6 @@ class TestChannelListView:
assert response.status_code == 200 assert response.status_code == 200
assert "organization" in response.context assert "organization" in response.context
@pytest.mark.django_db
def test_organization_detail_queryset_only_selects_rendered_fields(self) -> None:
"""Organization detail queryset should defer non-rendered organization fields."""
org: Organization = Organization.objects.create(
twitch_id="org_detail_fields",
name="Org Detail Fields",
)
Game.objects.create(
twitch_id="org_detail_game_fields",
name="Org Detail Game Fields",
display_name="Org Detail Game Fields",
).owners.add(org)
fetched_org: Organization | None = (
Organization.for_detail_view().filter(twitch_id=org.twitch_id).first()
)
assert fetched_org is not None
deferred_fields: set[str] = fetched_org.get_deferred_fields()
assert "twitch_id" not in deferred_fields
assert "name" not in deferred_fields
assert "added_at" not in deferred_fields
assert "updated_at" not in deferred_fields
@pytest.mark.django_db
def test_organization_detail_prefetched_games_do_not_trigger_extra_queries(
self,
) -> None:
"""Organization detail should prefetch games used by the template."""
org: Organization = Organization.objects.create(
twitch_id="org_detail_prefetch",
name="Org Detail Prefetch",
)
game: Game = Game.objects.create(
twitch_id="org_detail_prefetch_game",
name="Org Detail Prefetch Game",
display_name="Org Detail Prefetch Game",
slug="org-detail-prefetch-game",
)
game.owners.add(org)
fetched_org: Organization = Organization.for_detail_view().get(
twitch_id=org.twitch_id,
)
games_for_detail: list[Game] = list(
getattr(fetched_org, "games_for_detail", []),
)
assert len(games_for_detail) == 1
with CaptureQueriesContext(connection) as queries:
game_row: Game = games_for_detail[0]
_ = game_row.twitch_id
_ = game_row.display_name
_ = game_row.name
_ = str(game_row)
select_queries: list[str] = [
query_info["sql"]
for query_info in queries.captured_queries
if query_info["sql"].lstrip().upper().startswith("SELECT")
]
assert not select_queries, (
"Organization detail prefetched games triggered unexpected SELECT queries. "
f"Queries: {select_queries}"
)
@pytest.mark.django_db @pytest.mark.django_db
def test_channel_detail_view(self, client: Client, db: None) -> None: def test_channel_detail_view(self, client: Client, db: None) -> None:
"""Test channel detail view returns 200 and has channel in context.""" """Test channel detail view returns 200 and has channel in context."""
@ -4047,200 +3249,6 @@ class TestBadgeSetDetailView:
) )
@pytest.mark.django_db
class TestEmoteGalleryView:
"""Tests for emote gallery model delegation and query safety."""
def test_emote_gallery_view_uses_model_helper(
self,
client: Client,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Emote gallery view should delegate data loading to the model layer."""
game: Game = Game.objects.create(
twitch_id="emote_gallery_delegate_game",
name="Emote Delegate Game",
display_name="Emote Delegate Game",
)
campaign: DropCampaign = DropCampaign.objects.create(
twitch_id="emote_gallery_delegate_campaign",
name="Emote Delegate Campaign",
game=game,
operation_names=["DropCampaignDetails"],
)
expected: list[dict[str, str | DropCampaign]] = [
{
"image_url": "https://example.com/emote.png",
"campaign": campaign,
},
]
calls: dict[str, int] = {"count": 0}
def _fake_emotes_for_gallery(
_cls: type[DropBenefit],
) -> list[dict[str, str | DropCampaign]]:
calls["count"] += 1
return expected
monkeypatch.setattr(
DropBenefit,
"emotes_for_gallery",
classmethod(_fake_emotes_for_gallery),
)
response: _MonkeyPatchedWSGIResponse = client.get(
reverse("twitch:emote_gallery"),
)
assert response.status_code == 200
context: ContextList | dict[str, Any] = response.context
if isinstance(context, list):
context = context[-1]
assert calls["count"] == 1
assert context["emotes"] == expected
def test_emotes_for_gallery_uses_prefetched_fields_without_extra_queries(
self,
) -> None:
"""Accessing template-used fields should not issue follow-up SELECT queries."""
now: datetime.datetime = timezone.now()
game: Game = Game.objects.create(
twitch_id="emote_gallery_fields_game",
name="Emote Fields Game",
display_name="Emote Fields Game",
)
campaign: DropCampaign = DropCampaign.objects.create(
twitch_id="emote_gallery_fields_campaign",
name="Emote Fields Campaign",
game=game,
operation_names=["DropCampaignDetails"],
start_at=now - timedelta(hours=1),
end_at=now + timedelta(hours=1),
)
drop: TimeBasedDrop = TimeBasedDrop.objects.create(
twitch_id="emote_gallery_fields_drop",
name="Emote Fields Drop",
campaign=campaign,
)
benefit: DropBenefit = DropBenefit.objects.create(
twitch_id="emote_gallery_fields_benefit",
name="Emote Fields Benefit",
distribution_type="EMOTE",
image_asset_url="https://example.com/emote_fields.png",
)
drop.benefits.add(benefit)
emotes: list[dict[str, str | DropCampaign]] = DropBenefit.emotes_for_gallery()
assert len(emotes) == 1
with CaptureQueriesContext(connection) as capture:
for emote in emotes:
_ = emote["image_url"]
campaign_obj = emote["campaign"]
assert isinstance(campaign_obj, DropCampaign)
_ = campaign_obj.twitch_id
_ = campaign_obj.name
assert len(capture) == 0
def test_emotes_for_gallery_skips_emotes_without_campaign_link(self) -> None:
"""Gallery should only include EMOTE benefits reachable from a campaign drop."""
game: Game = Game.objects.create(
twitch_id="emote_gallery_skip_game",
name="Emote Skip Game",
display_name="Emote Skip Game",
)
campaign: DropCampaign = DropCampaign.objects.create(
twitch_id="emote_gallery_skip_campaign",
name="Emote Skip Campaign",
game=game,
operation_names=["DropCampaignDetails"],
)
drop: TimeBasedDrop = TimeBasedDrop.objects.create(
twitch_id="emote_gallery_skip_drop",
name="Emote Skip Drop",
campaign=campaign,
)
included: DropBenefit = DropBenefit.objects.create(
twitch_id="emote_gallery_included_benefit",
name="Included Emote",
distribution_type="EMOTE",
image_asset_url="https://example.com/included-emote.png",
)
orphaned: DropBenefit = DropBenefit.objects.create(
twitch_id="emote_gallery_orphaned_benefit",
name="Orphaned Emote",
distribution_type="EMOTE",
image_asset_url="https://example.com/orphaned-emote.png",
)
drop.benefits.add(included)
emotes: list[dict[str, str | DropCampaign]] = DropBenefit.emotes_for_gallery()
image_urls: list[str] = [str(item["image_url"]) for item in emotes]
campaign_ids: list[str] = [
campaign_obj.twitch_id
for campaign_obj in (item["campaign"] for item in emotes)
if isinstance(campaign_obj, DropCampaign)
]
assert included.image_asset_url in image_urls
assert orphaned.image_asset_url not in image_urls
assert campaign.twitch_id in campaign_ids
def test_emote_gallery_view_renders_only_campaign_linked_emotes(
self,
client: Client,
) -> None:
"""Emote gallery page should not render EMOTE benefits without campaign-linked drops."""
game: Game = Game.objects.create(
twitch_id="emote_gallery_view_game",
name="Emote View Game",
display_name="Emote View Game",
)
campaign: DropCampaign = DropCampaign.objects.create(
twitch_id="emote_gallery_view_campaign",
name="Emote View Campaign",
game=game,
operation_names=["DropCampaignDetails"],
)
drop: TimeBasedDrop = TimeBasedDrop.objects.create(
twitch_id="emote_gallery_view_drop",
name="Emote View Drop",
campaign=campaign,
)
linked: DropBenefit = DropBenefit.objects.create(
twitch_id="emote_gallery_view_linked",
name="Linked Emote",
distribution_type="EMOTE",
image_asset_url="https://example.com/linked-view-emote.png",
)
orphaned: DropBenefit = DropBenefit.objects.create(
twitch_id="emote_gallery_view_orphaned",
name="Orphaned View Emote",
distribution_type="EMOTE",
image_asset_url="https://example.com/orphaned-view-emote.png",
)
drop.benefits.add(linked)
response: _MonkeyPatchedWSGIResponse = client.get(
reverse("twitch:emote_gallery"),
)
assert response.status_code == 200
html: str = response.content.decode("utf-8")
assert linked.image_asset_url in html
assert orphaned.image_asset_url not in html
assert reverse("twitch:campaign_detail", args=[campaign.twitch_id]) in html
@pytest.mark.django_db @pytest.mark.django_db
class TestDropCampaignListView: class TestDropCampaignListView:
"""Tests for drop_campaign_list_view index usage and fat-model delegation.""" """Tests for drop_campaign_list_view index usage and fat-model delegation."""

View file

@ -1,21 +1,25 @@
from __future__ import annotations from __future__ import annotations
import csv import csv
import datetime
import json import json
import logging import logging
from collections import OrderedDict
from collections import defaultdict
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Any from typing import Any
from typing import Literal from typing import Literal
from urllib.parse import urlencode
from django.core.paginator import EmptyPage from django.core.paginator import EmptyPage
from django.core.paginator import Page from django.core.paginator import Page
from django.core.paginator import PageNotAnInteger from django.core.paginator import PageNotAnInteger
from django.core.paginator import Paginator from django.core.paginator import Paginator
from django.db.models import Count
from django.db.models import Prefetch
from django.db.models import Q
from django.db.models.query import QuerySet from django.db.models.query import QuerySet
from django.http import Http404 from django.http import Http404
from django.http import HttpResponse from django.http import HttpResponse
from django.shortcuts import get_object_or_404
from django.shortcuts import render from django.shortcuts import render
from django.urls import reverse from django.urls import reverse
from django.utils import timezone from django.utils import timezone
@ -31,10 +35,9 @@ from twitch.models import DropCampaign
from twitch.models import Game from twitch.models import Game
from twitch.models import Organization from twitch.models import Organization
from twitch.models import RewardCampaign from twitch.models import RewardCampaign
from twitch.models import TimeBasedDrop
if TYPE_CHECKING: if TYPE_CHECKING:
import datetime
from django.db.models import QuerySet from django.db.models import QuerySet
from django.http import HttpRequest from django.http import HttpRequest
@ -264,7 +267,31 @@ def emote_gallery_view(request: HttpRequest) -> HttpResponse:
Returns: Returns:
HttpResponse: The rendered emote gallery page. HttpResponse: The rendered emote gallery page.
""" """
emotes: list[dict[str, str | DropCampaign]] = DropBenefit.emotes_for_gallery() emote_benefits: QuerySet[DropBenefit, DropBenefit] = (
DropBenefit.objects
.filter(distribution_type="EMOTE")
.select_related()
.prefetch_related(
Prefetch(
"drops",
queryset=TimeBasedDrop.objects.select_related("campaign"),
to_attr="_emote_drops",
),
)
)
emotes: list[dict[str, str | DropCampaign]] = []
for benefit in emote_benefits:
# Find the first drop with a campaign for this benefit
drop: TimeBasedDrop | None = next(
(d for d in getattr(benefit, "_emote_drops", []) if d.campaign),
None,
)
if drop and drop.campaign:
emotes.append({
"image_url": benefit.image_best_url,
"campaign": drop.campaign,
})
seo_context: dict[str, Any] = _build_seo_context( seo_context: dict[str, Any] = _build_seo_context(
page_title="Twitch Emotes", page_title="Twitch Emotes",
@ -284,7 +311,7 @@ def org_list_view(request: HttpRequest) -> HttpResponse:
Returns: Returns:
HttpResponse: The rendered organization list page. HttpResponse: The rendered organization list page.
""" """
orgs: QuerySet[Organization] = Organization.for_list_view() orgs: QuerySet[Organization] = Organization.objects.all().order_by("name")
# CollectionPage schema for organizations list # CollectionPage schema for organizations list
collection_schema: dict[str, str] = { collection_schema: dict[str, str] = {
@ -300,7 +327,10 @@ def org_list_view(request: HttpRequest) -> HttpResponse:
page_description="List of Twitch organizations.", page_description="List of Twitch organizations.",
seo_meta={"schema_data": collection_schema}, seo_meta={"schema_data": collection_schema},
) )
context: dict[str, Any] = {"orgs": orgs, **seo_context} context: dict[str, Any] = {
"orgs": orgs,
**seo_context,
}
return render(request, "twitch/org_list.html", context) return render(request, "twitch/org_list.html", context)
@ -316,18 +346,21 @@ def organization_detail_view(request: HttpRequest, twitch_id: str) -> HttpRespon
Returns: Returns:
HttpResponse: The rendered organization detail page. HttpResponse: The rendered organization detail page.
Raises:
Http404: If the organization is not found.
""" """
organization: Organization = get_object_or_404( try:
Organization.for_detail_view(), organization: Organization = Organization.objects.get(twitch_id=twitch_id)
twitch_id=twitch_id, except Organization.DoesNotExist as exc:
) msg = "No organization found matching the query"
raise Http404(msg) from exc
games: list[Game] = list(getattr(organization, "games_for_detail", [])) games: QuerySet[Game] = organization.games.all() # pyright: ignore[reportAttributeAccessIssue]
org_name: str = organization.name or organization.twitch_id org_name: str = organization.name or organization.twitch_id
games_count: int = len(games) games_count: int = games.count()
noun: str = "game" if games_count == 1 else "games" s: Literal["", "s"] = "" if games_count == 1 else "s"
org_description: str = f"{org_name} has {games_count} {noun}." org_description: str = f"{org_name} has {games_count} game{s}."
url: str = build_absolute_uri( url: str = build_absolute_uri(
reverse("twitch:organization_detail", args=[organization.twitch_id]), reverse("twitch:organization_detail", args=[organization.twitch_id]),
@ -482,6 +515,48 @@ def drop_campaign_list_view(request: HttpRequest) -> HttpResponse: # noqa: PLR0
return render(request, "twitch/campaign_list.html", context) return render(request, "twitch/campaign_list.html", context)
def _enhance_drops_with_context(
drops: QuerySet[TimeBasedDrop],
now: datetime.datetime,
) -> list[dict[str, Any]]:
"""Helper to enhance drops with countdown and context.
Args:
drops: QuerySet of TimeBasedDrop objects.
now: Current datetime.
Returns:
List of dicts with drop and additional context for display.
"""
enhanced: list[dict[str, Any]] = []
for drop in drops:
if drop.end_at and drop.end_at > now:
time_diff: datetime.timedelta = drop.end_at - now
days: int = time_diff.days
hours, remainder = divmod(time_diff.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
if days > 0:
countdown_text: str = f"{days}d {hours}h {minutes}m"
elif hours > 0:
countdown_text = f"{hours}h {minutes}m"
elif minutes > 0:
countdown_text = f"{minutes}m {seconds}s"
else:
countdown_text = f"{seconds}s"
elif drop.start_at and drop.start_at > now:
countdown_text = "Not started"
else:
countdown_text = "Expired"
enhanced.append({
"drop": drop,
"local_start": drop.start_at,
"local_end": drop.end_at,
"timezone_name": "UTC",
"countdown_text": countdown_text,
})
return enhanced
# MARK: /campaigns/<twitch_id>/ # MARK: /campaigns/<twitch_id>/
def drop_campaign_detail_view(request: HttpRequest, twitch_id: str) -> HttpResponse: # noqa: PLR0914 def drop_campaign_detail_view(request: HttpRequest, twitch_id: str) -> HttpResponse: # noqa: PLR0914
"""Function-based view for a drop campaign detail. """Function-based view for a drop campaign detail.
@ -497,20 +572,45 @@ def drop_campaign_detail_view(request: HttpRequest, twitch_id: str) -> HttpRespo
Http404: If the campaign is not found. Http404: If the campaign is not found.
""" """
try: try:
campaign: DropCampaign = DropCampaign.for_detail_view(twitch_id) campaign: DropCampaign = DropCampaign.objects.prefetch_related(
"game__owners",
Prefetch(
"allow_channels",
queryset=Channel.objects.order_by("display_name"),
to_attr="channels_ordered",
),
Prefetch(
"time_based_drops",
queryset=TimeBasedDrop.objects.prefetch_related("benefits").order_by(
"required_minutes_watched",
),
),
).get(twitch_id=twitch_id)
except DropCampaign.DoesNotExist as exc: except DropCampaign.DoesNotExist as exc:
msg = "No campaign found matching the query" msg = "No campaign found matching the query"
raise Http404(msg) from exc raise Http404(msg) from exc
drops: QuerySet[TimeBasedDrop] = campaign.time_based_drops.all() # pyright: ignore[reportAttributeAccessIssue]
now: datetime.datetime = timezone.now() now: datetime.datetime = timezone.now()
owners: list[Organization] = list(getattr(campaign.game, "owners_for_detail", [])) enhanced_drops: list[dict[str, Any]] = _enhance_drops_with_context(drops, now)
enhanced_drops: list[dict[str, Any]] = campaign.enhanced_drops_for_detail(now) # Attach awarded_badge to each drop in enhanced_drops
for enhanced_drop in enhanced_drops:
drop = enhanced_drop["drop"]
awarded_badge = None
for benefit in drop.benefits.all():
if benefit.distribution_type == "BADGE":
awarded_badge: ChatBadge | None = ChatBadge.objects.filter(
title=benefit.name,
).first()
break
enhanced_drop["awarded_badge"] = awarded_badge
context: dict[str, Any] = { context: dict[str, Any] = {
"campaign": campaign, "campaign": campaign,
"now": now, "now": now,
"drops": enhanced_drops, "drops": enhanced_drops,
"owners": owners, "owners": list(campaign.game.owners.all()),
"allowed_channels": getattr(campaign, "channels_ordered", []), "allowed_channels": getattr(campaign, "channels_ordered", []),
} }
@ -550,7 +650,9 @@ def drop_campaign_detail_view(request: HttpRequest, twitch_id: str) -> HttpRespo
campaign_event["startDate"] = campaign.start_at.isoformat() campaign_event["startDate"] = campaign.start_at.isoformat()
if campaign.end_at: if campaign.end_at:
campaign_event["endDate"] = campaign.end_at.isoformat() campaign_event["endDate"] = campaign.end_at.isoformat()
campaign_owner: Organization | None = _pick_owner(owners) if owners else None campaign_owner: Organization | None = (
_pick_owner(list(campaign.game.owners.all())) if campaign.game else None
)
campaign_owner_name: str = ( campaign_owner_name: str = (
(campaign_owner.name or campaign_owner.twitch_id) (campaign_owner.name or campaign_owner.twitch_id)
if campaign_owner if campaign_owner
@ -643,9 +745,23 @@ class GamesGridView(ListView):
Returns: Returns:
QuerySet: Annotated games queryset. QuerySet: Annotated games queryset.
""" """
return Game.with_campaign_counts( now: datetime.datetime = timezone.now()
timezone.now(), return (
with_campaigns_only=True, super()
.get_queryset()
.prefetch_related("owners")
.annotate(
campaign_count=Count("drop_campaigns", distinct=True),
active_count=Count(
"drop_campaigns",
filter=Q(
drop_campaigns__start_at__lte=now,
drop_campaigns__end_at__gte=now,
),
distinct=True,
),
)
.order_by("display_name")
) )
def get_context_data(self, **kwargs) -> dict[str, Any]: def get_context_data(self, **kwargs) -> dict[str, Any]:
@ -660,9 +776,35 @@ class GamesGridView(ListView):
dict: Context data with games grouped by organization. dict: Context data with games grouped by organization.
""" """
context: dict[str, Any] = super().get_context_data(**kwargs) context: dict[str, Any] = super().get_context_data(**kwargs)
games: QuerySet[Game] = context["games"] now: datetime.datetime = timezone.now()
context["games_by_org"] = Game.grouped_by_owner_for_grid(
games, games_with_campaigns: QuerySet[Game] = (
Game.objects
.filter(drop_campaigns__isnull=False)
.prefetch_related("owners")
.annotate(
campaign_count=Count("drop_campaigns", distinct=True),
active_count=Count(
"drop_campaigns",
filter=Q(
drop_campaigns__start_at__lte=now,
drop_campaigns__end_at__gte=now,
),
distinct=True,
),
)
.order_by("display_name")
)
games_by_org: defaultdict[Organization, list[dict[str, Game]]] = defaultdict(
list,
)
for game in games_with_campaigns:
for org in game.owners.all():
games_by_org[org].append({"game": game})
context["games_by_org"] = OrderedDict(
sorted(games_by_org.items(), key=lambda item: item[0].name),
) )
# CollectionPage schema for games list # CollectionPage schema for games list
@ -691,12 +833,32 @@ class GameDetailView(DetailView):
model = Game model = Game
template_name = "twitch/game_detail.html" template_name = "twitch/game_detail.html"
context_object_name = "game" context_object_name = "game"
slug_field = "twitch_id" lookup_field = "twitch_id"
slug_url_kwarg = "twitch_id"
def get_queryset(self) -> QuerySet[Game]: def get_object(self, queryset: QuerySet[Game] | None = None) -> Game:
"""Return game queryset optimized for the game detail page.""" """Get the game object using twitch_id as the primary key lookup.
return Game.for_detail_view()
Args:
queryset: Optional queryset to use.
Returns:
Game: The game object.
Raises:
Http404: If the game is not found.
"""
if queryset is None:
queryset = self.get_queryset()
# Use twitch_id as the lookup field since it's the primary key
twitch_id: str | None = self.kwargs.get("twitch_id")
try:
game: Game = queryset.get(twitch_id=twitch_id)
except Game.DoesNotExist as exc:
msg = "No game found matching the query"
raise Http404(msg) from exc
return game
def get_context_data(self, **kwargs) -> dict[str, Any]: # noqa: PLR0914 def get_context_data(self, **kwargs) -> dict[str, Any]: # noqa: PLR0914
"""Add additional context data. """Add additional context data.
@ -713,13 +875,88 @@ class GameDetailView(DetailView):
game: Game = self.object # pyright: ignore[reportAssignmentType] game: Game = self.object # pyright: ignore[reportAssignmentType]
now: datetime.datetime = timezone.now() now: datetime.datetime = timezone.now()
campaigns_list: list[DropCampaign] = list(DropCampaign.for_game_detail(game)) all_campaigns: QuerySet[DropCampaign] = (
active_campaigns, upcoming_campaigns, expired_campaigns = ( DropCampaign.objects
DropCampaign.split_for_channel_detail(campaigns_list, now) .filter(game=game)
.select_related("game")
.prefetch_related(
Prefetch(
"time_based_drops",
queryset=TimeBasedDrop.objects.prefetch_related(
Prefetch(
"benefits",
queryset=DropBenefit.objects.order_by("name"),
),
),
),
)
.order_by("-end_at")
) )
owners: list[Organization] = list(getattr(game, "owners_for_detail", []))
game_name: str = game.get_game_name campaigns_list: list[DropCampaign] = list(all_campaigns)
# For each drop, find awarded badge (distribution_type BADGE)
drop_awarded_badges: dict[str, ChatBadge] = {}
benefit_badge_titles: set[str] = set()
for campaign in campaigns_list:
for drop in campaign.time_based_drops.all(): # pyright: ignore[reportAttributeAccessIssue]
for benefit in drop.benefits.all():
if benefit.distribution_type == "BADGE" and benefit.name:
benefit_badge_titles.add(benefit.name)
# Bulk-load all matching ChatBadge instances to avoid N+1 queries
badges_by_title: dict[str, ChatBadge] = {
badge.title: badge
for badge in ChatBadge.objects.filter(title__in=benefit_badge_titles)
}
for campaign in campaigns_list:
for drop in campaign.time_based_drops.all(): # pyright: ignore[reportAttributeAccessIssue]
for benefit in drop.benefits.all():
if benefit.distribution_type == "BADGE":
badge: ChatBadge | None = badges_by_title.get(benefit.name)
if badge:
drop_awarded_badges[drop.twitch_id] = badge
active_campaigns: list[DropCampaign] = [
campaign
for campaign in campaigns_list
if campaign.start_at is not None
and campaign.start_at <= now
and campaign.end_at is not None
and campaign.end_at >= now
]
active_campaigns.sort(
key=lambda c: (
c.end_at
if c.end_at is not None
else datetime.datetime.max.replace(tzinfo=datetime.UTC)
),
)
upcoming_campaigns: list[DropCampaign] = [
campaign
for campaign in campaigns_list
if campaign.start_at is not None and campaign.start_at > now
]
upcoming_campaigns.sort(
key=lambda c: (
c.start_at
if c.start_at is not None
else datetime.datetime.max.replace(tzinfo=datetime.UTC)
),
)
expired_campaigns: list[DropCampaign] = [
campaign
for campaign in campaigns_list
if campaign.end_at is not None and campaign.end_at < now
]
owners: list[Organization] = list(game.owners.all())
game_name: str = game.display_name or game.name or game.twitch_id
game_description: str = f"Twitch drops for {game_name}." game_description: str = f"Twitch drops for {game_name}."
game_image: str | None = game.box_art_best_url game_image: str | None = game.box_art_best_url
game_image_width: int | None = game.box_art_width if game.box_art_file else None game_image_width: int | None = game.box_art_width if game.box_art_file else None
@ -800,6 +1037,7 @@ class GameDetailView(DetailView):
"expired_campaigns": expired_campaigns, "expired_campaigns": expired_campaigns,
"owner": owners[0] if owners else None, "owner": owners[0] if owners else None,
"owners": owners, "owners": owners,
"drop_awarded_badges": drop_awarded_badges,
"now": now, "now": now,
**seo_context, **seo_context,
}) })
@ -818,7 +1056,14 @@ def dashboard(request: HttpRequest) -> HttpResponse:
HttpResponse: The rendered dashboard template. HttpResponse: The rendered dashboard template.
""" """
now: datetime.datetime = timezone.now() now: datetime.datetime = timezone.now()
dashboard_data: dict[str, Any] = DropCampaign.dashboard_context(now) campaigns_by_game: OrderedDict[str, dict[str, Any]] = (
DropCampaign.campaigns_by_game_for_dashboard(now)
)
# Get active reward campaigns (Quest rewards)
active_reward_campaigns: QuerySet[RewardCampaign] = (
RewardCampaign.active_for_dashboard(now)
)
# WebSite schema with SearchAction for sitelinks search box # WebSite schema with SearchAction for sitelinks search box
# TODO(TheLovinator): Should this be on all pages instead of just the dashboard? # noqa: TD003 # TODO(TheLovinator): Should this be on all pages instead of just the dashboard? # noqa: TD003
@ -851,8 +1096,9 @@ def dashboard(request: HttpRequest) -> HttpResponse:
request, request,
"twitch/dashboard.html", "twitch/dashboard.html",
{ {
"campaigns_by_game": campaigns_by_game,
"active_reward_campaigns": active_reward_campaigns,
"now": now, "now": now,
**dashboard_data,
**seo_context, **seo_context,
}, },
) )
@ -1084,8 +1330,19 @@ class ChannelListView(ListView):
Returns: Returns:
QuerySet: Filtered channels. QuerySet: Filtered channels.
""" """
queryset: QuerySet[Channel] = super().get_queryset()
search_query: str | None = self.request.GET.get("search") search_query: str | None = self.request.GET.get("search")
return Channel.for_list_view(search_query)
if search_query:
queryset = queryset.filter(
Q(name__icontains=search_query)
| Q(display_name__icontains=search_query),
)
return queryset.annotate(campaign_count=Count("allowed_campaigns")).order_by(
"-campaign_count",
"name",
)
def get_context_data(self, **kwargs) -> dict[str, Any]: def get_context_data(self, **kwargs) -> dict[str, Any]:
"""Add additional context data. """Add additional context data.
@ -1097,11 +1354,12 @@ class ChannelListView(ListView):
dict: Context data. dict: Context data.
""" """
context: dict[str, Any] = super().get_context_data(**kwargs) context: dict[str, Any] = super().get_context_data(**kwargs)
search_query: str = self.request.GET.get("search", "").strip() search_query: str = self.request.GET.get("search", "")
# Build pagination info # Build pagination info
query_string: str = urlencode({"search": search_query}) if search_query else "" base_url = "/channels/"
base_url: str = f"/channels/?{query_string}" if query_string else "/channels/" if search_query:
base_url += f"?search={search_query}"
page_obj: Page | None = context.get("page_obj") page_obj: Page | None = context.get("page_obj")
pagination_info: list[dict[str, str]] | None = ( pagination_info: list[dict[str, str]] | None = (
@ -1150,10 +1408,20 @@ class ChannelDetailView(DetailView):
Returns: Returns:
Channel: The channel object. Channel: The channel object.
Raises:
Http404: If the channel is not found.
""" """
queryset = queryset or Channel.for_detail_view() if queryset is None:
twitch_id: str = str(self.kwargs.get("twitch_id", "")) queryset = self.get_queryset()
channel: Channel = get_object_or_404(queryset, twitch_id=twitch_id)
twitch_id: str | None = self.kwargs.get("twitch_id")
try:
channel: Channel = queryset.get(twitch_id=twitch_id)
except Channel.DoesNotExist as exc:
msg = "No channel found matching the query"
raise Http404(msg) from exc
return channel return channel
def get_context_data(self, **kwargs) -> dict[str, Any]: # noqa: PLR0914 def get_context_data(self, **kwargs) -> dict[str, Any]: # noqa: PLR0914
@ -1169,16 +1437,66 @@ class ChannelDetailView(DetailView):
channel: Channel = self.object # pyright: ignore[reportAssignmentType] channel: Channel = self.object # pyright: ignore[reportAssignmentType]
now: datetime.datetime = timezone.now() now: datetime.datetime = timezone.now()
campaigns_list: list[DropCampaign] = list( all_campaigns: QuerySet[DropCampaign] = (
DropCampaign.for_channel_detail(channel), DropCampaign.objects
.filter(allow_channels=channel)
.select_related("game")
.prefetch_related(
Prefetch(
"time_based_drops",
queryset=TimeBasedDrop.objects.prefetch_related(
Prefetch(
"benefits",
queryset=DropBenefit.objects.order_by("name"),
),
),
),
) )
active_campaigns, upcoming_campaigns, expired_campaigns = ( .order_by("-start_at")
DropCampaign.split_for_channel_detail(campaigns_list, now)
) )
name: str = channel.preferred_name campaigns_list: list[DropCampaign] = list(all_campaigns)
active_campaigns: list[DropCampaign] = [
campaign
for campaign in campaigns_list
if campaign.start_at is not None
and campaign.start_at <= now
and campaign.end_at is not None
and campaign.end_at >= now
]
active_campaigns.sort(
key=lambda c: (
c.end_at
if c.end_at is not None
else datetime.datetime.max.replace(tzinfo=datetime.UTC)
),
)
upcoming_campaigns: list[DropCampaign] = [
campaign
for campaign in campaigns_list
if campaign.start_at is not None and campaign.start_at > now
]
upcoming_campaigns.sort(
key=lambda c: (
c.start_at
if c.start_at is not None
else datetime.datetime.max.replace(tzinfo=datetime.UTC)
),
)
expired_campaigns: list[DropCampaign] = [
campaign
for campaign in campaigns_list
if campaign.end_at is not None and campaign.end_at < now
]
name: str = channel.display_name or channel.name or channel.twitch_id
total_campaigns: int = len(campaigns_list) total_campaigns: int = len(campaigns_list)
description: str = channel.detail_description(total_campaigns) description: str = f"{name} participates in {total_campaigns} drop campaign"
if total_campaigns > 1:
description += "s"
channel_url: str = build_absolute_uri( channel_url: str = build_absolute_uri(
reverse("twitch:channel_detail", args=[channel.twitch_id]), reverse("twitch:channel_detail", args=[channel.twitch_id]),