Compare commits

..

No commits in common. "47d4f5341f3e3b73d25ac078923153f92d1d60f0" and "66ea46cf2331470d9e4ac20c134161b55e64e4bc" have entirely different histories.

31 changed files with 313 additions and 2795 deletions

View file

@ -5,7 +5,7 @@ import logging
from celery import shared_task from celery import shared_task
from django.core.management import call_command from django.core.management import call_command
logger: logging.Logger = logging.getLogger("ttvdrops.tasks") logger = logging.getLogger("ttvdrops.tasks")
@shared_task(bind=True, queue="imports", max_retries=3, default_retry_delay=60) @shared_task(bind=True, queue="imports", max_retries=3, default_retry_delay=60)

View file

@ -136,7 +136,7 @@ class ImportChzzkCampaignRangeCommandTest(TestCase):
stdout = StringIO() stdout = StringIO()
stderr = StringIO() stderr = StringIO()
def side_effect(command: str, *args: str, **kwargs: StringIO) -> None: def side_effect(command: str, *args: str, **kwargs: object) -> None:
if "4" in args: if "4" in args:
msg = "Campaign 4 not found" msg = "Campaign 4 not found"
raise CommandError(msg) raise CommandError(msg)

View file

@ -1,9 +1,7 @@
from datetime import timedelta from datetime import timedelta
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from django.db import connection
from django.test import TestCase from django.test import TestCase
from django.test.utils import CaptureQueriesContext
from django.urls import reverse from django.urls import reverse
from django.utils import timezone from django.utils import timezone
@ -18,75 +16,6 @@ if TYPE_CHECKING:
class ChzzkDashboardViewTests(TestCase): class ChzzkDashboardViewTests(TestCase):
"""Test cases for the dashboard view of the chzzk app.""" """Test cases for the dashboard view of the chzzk app."""
def test_dashboard_view_no_n_plus_one_on_rewards(self) -> None:
"""Test that the dashboard view does not trigger an N+1 query for rewards."""
now = timezone.now()
base_kwargs = {
"category_type": "game",
"category_id": "1",
"category_value": "TestGame",
"service_id": "chzzk",
"state": "ACTIVE",
"start_date": now - timedelta(days=1),
"end_date": now + timedelta(days=1),
"has_ios_based_reward": False,
"drops_campaign_not_started": False,
"source_api": "unit-test",
}
reward_kwargs = {
"reward_type": "ITEM",
"campaign_reward_type": "Standard",
"condition_type": "watch",
"ios_based_reward": False,
"code_remaining_count": 100,
}
campaign1 = ChzzkCampaign.objects.create(
campaign_no=9001,
title="C1",
**base_kwargs,
)
campaign1.rewards.create( # pyright: ignore[reportAttributeAccessIssue]
reward_no=901,
title="R1",
condition_for_minutes=10,
**reward_kwargs,
) # pyright: ignore[reportAttributeAccessIssue]
campaign2 = ChzzkCampaign.objects.create(
campaign_no=9002,
title="C2",
**base_kwargs,
)
campaign2.rewards.create( # pyright: ignore[reportAttributeAccessIssue]
reward_no=902,
title="R2",
condition_for_minutes=20,
**reward_kwargs,
) # pyright: ignore[reportAttributeAccessIssue]
with CaptureQueriesContext(connection) as one_campaign_ctx:
self.client.get(reverse("chzzk:dashboard"))
query_count_two = len(one_campaign_ctx)
campaign3 = ChzzkCampaign.objects.create(
campaign_no=9003,
title="C3",
**base_kwargs,
)
campaign3.rewards.create( # pyright: ignore[reportAttributeAccessIssue]
reward_no=903,
title="R3",
condition_for_minutes=30,
**reward_kwargs,
) # pyright: ignore[reportAttributeAccessIssue]
with CaptureQueriesContext(connection) as three_campaign_ctx:
self.client.get(reverse("chzzk:dashboard"))
query_count_three = len(three_campaign_ctx)
# With prefetch_related, adding more campaigns should not add extra queries per campaign.
assert query_count_two == query_count_three
def test_dashboard_view_excludes_testing_state_campaigns(self) -> None: def test_dashboard_view_excludes_testing_state_campaigns(self) -> None:
"""Test that the dashboard view excludes campaigns in the TESTING state.""" """Test that the dashboard view excludes campaigns in the TESTING state."""
now: datetime = timezone.now() now: datetime = timezone.now()

View file

@ -1,6 +1,7 @@
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from django.db.models import Q from django.db.models import Q
from django.db.models.query import QuerySet
from django.shortcuts import get_object_or_404 from django.shortcuts import get_object_or_404
from django.shortcuts import render from django.shortcuts import render
from django.urls import reverse from django.urls import reverse
@ -15,9 +16,9 @@ from twitch.feeds import TTVDropsBaseFeed
if TYPE_CHECKING: if TYPE_CHECKING:
import datetime import datetime
from django.db.models.query import QuerySet
from django.http import HttpResponse from django.http import HttpResponse
from django.http.request import HttpRequest from django.http.request import HttpRequest
from pytest_django.asserts import QuerySet
def dashboard_view(request: HttpRequest) -> HttpResponse: def dashboard_view(request: HttpRequest) -> HttpResponse:
@ -33,7 +34,6 @@ def dashboard_view(request: HttpRequest) -> HttpResponse:
models.ChzzkCampaign.objects models.ChzzkCampaign.objects
.filter(end_date__gte=timezone.now()) .filter(end_date__gte=timezone.now())
.exclude(state="TESTING") .exclude(state="TESTING")
.prefetch_related("rewards")
.order_by("-start_date") .order_by("-start_date")
) )
return render( return render(

View file

@ -224,10 +224,6 @@ DATABASES: dict[str, dict[str, Any]] = configure_databases(
base_dir=BASE_DIR, base_dir=BASE_DIR,
) )
if DEBUG or TESTING:
INSTALLED_APPS.append("zeal")
MIDDLEWARE.append("zeal.middleware.zeal_middleware")
if not TESTING: if not TESTING:
INSTALLED_APPS = [*INSTALLED_APPS, "debug_toolbar", "silk"] INSTALLED_APPS = [*INSTALLED_APPS, "debug_toolbar", "silk"]
MIDDLEWARE = [ MIDDLEWARE = [

View file

@ -1,349 +0,0 @@
from __future__ import annotations
from datetime import timedelta
from pathlib import Path
from typing import TYPE_CHECKING
from django.conf import settings
from django.test import TestCase
from django.urls import reverse
from django.utils import timezone
from chzzk.models import ChzzkCampaign
from kick.models import KickCategory
from kick.models import KickChannel
from kick.models import KickDropCampaign
from kick.models import KickOrganization
from kick.models import KickReward
from kick.models import KickUser
from twitch.models import Channel
from twitch.models import ChatBadge
from twitch.models import ChatBadgeSet
from twitch.models import DropBenefit
from twitch.models import DropCampaign
from twitch.models import Game
from twitch.models import Organization
from twitch.models import RewardCampaign
from twitch.models import TimeBasedDrop
if TYPE_CHECKING:
from datetime import datetime
from pathlib import Path
from django.test.client import _MonkeyPatchedWSGIResponse
class SiteEndpointSmokeTest(TestCase):
"""Smoke-test all named site endpoints with realistic fixture data."""
def setUp(self) -> None:
"""Set up representative Twitch, Kick, and CHZZK data for endpoint smoke tests."""
now: datetime = timezone.now()
# Twitch fixtures
self.twitch_org: Organization = Organization.objects.create(
twitch_id="smoke-org-1",
name="Smoke Organization",
)
self.twitch_game: Game = Game.objects.create(
twitch_id="smoke-game-1",
slug="smoke-game",
name="Smoke Game",
display_name="Smoke Game",
box_art="https://example.com/smoke-game.png",
)
self.twitch_game.owners.add(self.twitch_org)
self.twitch_channel: Channel = Channel.objects.create(
twitch_id="smoke-channel-1",
name="smokechannel",
display_name="SmokeChannel",
)
self.twitch_campaign: DropCampaign = DropCampaign.objects.create(
twitch_id="smoke-campaign-1",
name="Smoke Campaign",
description="Smoke campaign description",
game=self.twitch_game,
image_url="https://example.com/smoke-campaign.png",
start_at=now - timedelta(days=1),
end_at=now + timedelta(days=1),
operation_names=["DropCampaignDetails"],
is_fully_imported=True,
)
self.twitch_campaign.allow_channels.add(self.twitch_channel)
self.twitch_drop: TimeBasedDrop = TimeBasedDrop.objects.create(
twitch_id="smoke-drop-1",
name="Smoke Drop",
campaign=self.twitch_campaign,
required_minutes_watched=15,
start_at=now - timedelta(days=1),
end_at=now + timedelta(days=1),
)
self.twitch_benefit: DropBenefit = DropBenefit.objects.create(
twitch_id="smoke-benefit-1",
name="Smoke Benefit",
image_asset_url="https://example.com/smoke-benefit.png",
)
self.twitch_drop.benefits.add(self.twitch_benefit)
self.twitch_reward_campaign: RewardCampaign = RewardCampaign.objects.create(
twitch_id="smoke-reward-campaign-1",
name="Smoke Reward Campaign",
brand="Smoke Brand",
starts_at=now - timedelta(days=1),
ends_at=now + timedelta(days=2),
status="ACTIVE",
summary="Smoke reward summary",
external_url="https://example.com/smoke-reward",
is_sitewide=False,
game=self.twitch_game,
)
self.badge_set: ChatBadgeSet = ChatBadgeSet.objects.create(
set_id="smoke-badge-set",
)
ChatBadge.objects.create(
badge_set=self.badge_set,
badge_id="1",
image_url_1x="https://example.com/badge-1x.png",
image_url_2x="https://example.com/badge-2x.png",
image_url_4x="https://example.com/badge-4x.png",
title="Smoke Badge",
description="Smoke badge description",
)
# Kick fixtures
self.kick_org: KickOrganization = KickOrganization.objects.create(
kick_id="smoke-kick-org-1",
name="Smoke Kick Organization",
)
self.kick_category: KickCategory = KickCategory.objects.create(
kick_id=9101,
name="Smoke Kick Category",
slug="smoke-kick-category",
image_url="https://example.com/smoke-kick-category.png",
)
self.kick_campaign: KickDropCampaign = KickDropCampaign.objects.create(
kick_id="smoke-kick-campaign-1",
name="Smoke Kick Campaign",
status="active",
starts_at=now - timedelta(days=1),
ends_at=now + timedelta(days=1),
organization=self.kick_org,
category=self.kick_category,
rule_id=1,
rule_name="Watch to redeem",
is_fully_imported=True,
)
kick_user: KickUser = KickUser.objects.create(
kick_id=990001,
username="smokekickuser",
)
kick_channel: KickChannel = KickChannel.objects.create(
kick_id=990002,
slug="smokekickchannel",
user=kick_user,
)
self.kick_campaign.channels.add(kick_channel)
KickReward.objects.create(
kick_id="smoke-kick-reward-1",
name="Smoke Kick Reward",
image_url="drops/reward-image/smoke-kick-reward.png",
required_units=20,
campaign=self.kick_campaign,
category=self.kick_category,
organization=self.kick_org,
)
# CHZZK fixtures
self.chzzk_campaign: ChzzkCampaign = ChzzkCampaign.objects.create(
campaign_no=9901,
title="Smoke CHZZK Campaign",
description="Smoke CHZZK description",
category_type="game",
category_id="1",
category_value="SmokeGame",
service_id="chzzk",
state="ACTIVE",
start_date=now - timedelta(days=1),
end_date=now + timedelta(days=1),
has_ios_based_reward=False,
drops_campaign_not_started=False,
source_api="unit-test",
raw_json_v1={"ok": True},
)
self.chzzk_campaign.rewards.create( # pyright: ignore[reportAttributeAccessIssue]
reward_no=991,
title="Smoke CHZZK Reward",
reward_type="ITEM",
campaign_reward_type="Standard",
condition_type="watch",
condition_for_minutes=10,
ios_based_reward=False,
code_remaining_count=100,
)
# Core dataset download fixture
self.dataset_dir: Path = settings.DATA_DIR / "datasets"
self.dataset_dir.mkdir(parents=True, exist_ok=True)
self.dataset_name = "smoke-dataset.zst"
(self.dataset_dir / self.dataset_name).write_bytes(b"smoke")
def tearDown(self) -> None:
"""Clean up any files created for testing."""
dataset_path: Path = self.dataset_dir / self.dataset_name
if dataset_path.exists():
dataset_path.unlink()
def test_all_site_endpoints_return_success(self) -> None:
"""Test that all named site endpoints return a successful response with representative data."""
endpoints: list[tuple[str, dict[str, str | int], int]] = [
# Top-level config endpoints
("sitemap", {}, 200),
("sitemap-static", {}, 200),
("sitemap-twitch-channels", {}, 200),
("sitemap-twitch-drops", {}, 200),
("sitemap-twitch-others", {}, 200),
("sitemap-kick", {}, 200),
("sitemap-youtube", {}, 200),
# Core endpoints
("core:dashboard", {}, 200),
("core:search", {}, 200),
("core:debug", {}, 200),
("core:dataset_backups", {}, 200),
("core:dataset_backup_download", {"relative_path": self.dataset_name}, 200),
("core:docs_rss", {}, 200),
("core:campaign_feed", {}, 200),
("core:game_feed", {}, 200),
("core:game_campaign_feed", {"twitch_id": self.twitch_game.twitch_id}, 200),
("core:organization_feed", {}, 200),
("core:reward_campaign_feed", {}, 200),
("core:campaign_feed_atom", {}, 200),
("core:game_feed_atom", {}, 200),
(
"core:game_campaign_feed_atom",
{"twitch_id": self.twitch_game.twitch_id},
200,
),
("core:organization_feed_atom", {}, 200),
("core:reward_campaign_feed_atom", {}, 200),
("core:campaign_feed_discord", {}, 200),
("core:game_feed_discord", {}, 200),
(
"core:game_campaign_feed_discord",
{"twitch_id": self.twitch_game.twitch_id},
200,
),
("core:organization_feed_discord", {}, 200),
("core:reward_campaign_feed_discord", {}, 200),
# Twitch endpoints
("twitch:dashboard", {}, 200),
("twitch:badge_list", {}, 200),
("twitch:badge_set_detail", {"set_id": self.badge_set.set_id}, 200),
("twitch:campaign_list", {}, 200),
(
"twitch:campaign_detail",
{"twitch_id": self.twitch_campaign.twitch_id},
200,
),
("twitch:channel_list", {}, 200),
(
"twitch:channel_detail",
{"twitch_id": self.twitch_channel.twitch_id},
200,
),
("twitch:emote_gallery", {}, 200),
("twitch:games_grid", {}, 200),
("twitch:games_list", {}, 200),
("twitch:game_detail", {"twitch_id": self.twitch_game.twitch_id}, 200),
("twitch:org_list", {}, 200),
(
"twitch:organization_detail",
{"twitch_id": self.twitch_org.twitch_id},
200,
),
("twitch:reward_campaign_list", {}, 200),
(
"twitch:reward_campaign_detail",
{"twitch_id": self.twitch_reward_campaign.twitch_id},
200,
),
("twitch:export_campaigns_csv", {}, 200),
("twitch:export_campaigns_json", {}, 200),
("twitch:export_games_csv", {}, 200),
("twitch:export_games_json", {}, 200),
("twitch:export_organizations_csv", {}, 200),
("twitch:export_organizations_json", {}, 200),
# Kick endpoints
("kick:dashboard", {}, 200),
("kick:campaign_list", {}, 200),
("kick:campaign_detail", {"kick_id": self.kick_campaign.kick_id}, 200),
("kick:game_list", {}, 200),
("kick:game_detail", {"kick_id": self.kick_category.kick_id}, 200),
("kick:category_list", {}, 200),
("kick:category_detail", {"kick_id": self.kick_category.kick_id}, 200),
("kick:organization_list", {}, 200),
("kick:organization_detail", {"kick_id": self.kick_org.kick_id}, 200),
("kick:campaign_feed", {}, 200),
("kick:game_feed", {}, 200),
("kick:game_campaign_feed", {"kick_id": self.kick_category.kick_id}, 200),
("kick:category_feed", {}, 200),
(
"kick:category_campaign_feed",
{"kick_id": self.kick_category.kick_id},
200,
),
("kick:organization_feed", {}, 200),
("kick:campaign_feed_atom", {}, 200),
("kick:game_feed_atom", {}, 200),
(
"kick:game_campaign_feed_atom",
{"kick_id": self.kick_category.kick_id},
200,
),
("kick:category_feed_atom", {}, 200),
(
"kick:category_campaign_feed_atom",
{"kick_id": self.kick_category.kick_id},
200,
),
("kick:organization_feed_atom", {}, 200),
("kick:campaign_feed_discord", {}, 200),
("kick:game_feed_discord", {}, 200),
(
"kick:game_campaign_feed_discord",
{"kick_id": self.kick_category.kick_id},
200,
),
("kick:category_feed_discord", {}, 200),
(
"kick:category_campaign_feed_discord",
{"kick_id": self.kick_category.kick_id},
200,
),
("kick:organization_feed_discord", {}, 200),
# CHZZK endpoints
("chzzk:dashboard", {}, 200),
("chzzk:campaign_list", {}, 200),
(
"chzzk:campaign_detail",
{"campaign_no": self.chzzk_campaign.campaign_no},
200,
),
("chzzk:campaign_feed", {}, 200),
("chzzk:campaign_feed_atom", {}, 200),
("chzzk:campaign_feed_discord", {}, 200),
# YouTube endpoint
("youtube:index", {}, 200),
]
for route_name, kwargs, expected_status in endpoints:
response: _MonkeyPatchedWSGIResponse = self.client.get(
reverse(route_name, kwargs=kwargs),
)
assert response.status_code == expected_status, (
f"{route_name} returned {response.status_code}, expected {expected_status}"
)
response.close()

View file

@ -1,25 +0,0 @@
from typing import TYPE_CHECKING
import pytest
from zeal import zeal_context
if TYPE_CHECKING:
from collections.abc import Generator
@pytest.fixture(autouse=True)
def use_zeal(request: pytest.FixtureRequest) -> Generator[None]:
"""Enable Zeal N+1 detection context for each pytest test.
Use @pytest.mark.no_zeal for tests that intentionally exercise import paths
where Zeal's strict get() heuristics are too noisy.
Yields:
None: Control back to pytest for test execution.
"""
if request.node.get_closest_marker("no_zeal") is not None:
yield
return
with zeal_context():
yield

View file

@ -69,7 +69,7 @@ class _TTVDropsSite:
domain: str domain: str
def get_current_site(request: HttpRequest | None) -> _TTVDropsSite: def get_current_site(request: object) -> _TTVDropsSite:
"""Return a site-like object with domain derived from BASE_URL.""" """Return a site-like object with domain derived from BASE_URL."""
base_url: str = _get_base_url() base_url: str = _get_base_url()
parts: SplitResult = urlsplit(base_url) parts: SplitResult = urlsplit(base_url)

View file

@ -5,7 +5,7 @@ import logging
from celery import shared_task from celery import shared_task
from django.core.management import call_command from django.core.management import call_command
logger: logging.Logger = logging.getLogger("ttvdrops.tasks") logger = logging.getLogger("ttvdrops.tasks")
@shared_task(bind=True, queue="default", max_retries=3, default_retry_delay=300) @shared_task(bind=True, queue="default", max_retries=3, default_retry_delay=300)

View file

@ -5,7 +5,6 @@ from django.urls import reverse
if TYPE_CHECKING: if TYPE_CHECKING:
from django.test.client import Client from django.test.client import Client
from pytest_django.fixtures import SettingsWrapper
def _extract_locs(xml_bytes: bytes) -> list[str]: def _extract_locs(xml_bytes: bytes) -> list[str]:
@ -16,7 +15,7 @@ def _extract_locs(xml_bytes: bytes) -> list[str]:
def test_sitemap_static_contains_expected_links( def test_sitemap_static_contains_expected_links(
client: Client, client: Client,
settings: SettingsWrapper, settings: object,
) -> None: ) -> None:
"""Ensure the static sitemap contains the main site links across apps. """Ensure the static sitemap contains the main site links across apps.

View file

@ -15,9 +15,11 @@ from django.db.models import Max
from django.db.models import OuterRef from django.db.models import OuterRef
from django.db.models import Prefetch from django.db.models import Prefetch
from django.db.models import Q from django.db.models import Q
from django.db.models import QuerySet
from django.db.models.functions import Trim from django.db.models.functions import Trim
from django.http import FileResponse from django.http import FileResponse
from django.http import Http404 from django.http import Http404
from django.http import HttpRequest
from django.http import HttpResponse from django.http import HttpResponse
from django.shortcuts import render from django.shortcuts import render
from django.template.defaultfilters import filesizeformat from django.template.defaultfilters import filesizeformat

View file

@ -206,8 +206,8 @@ class KickOrganizationFeed(TTVDropsBaseFeed):
def __call__( def __call__(
self, self,
request: HttpRequest, request: HttpRequest,
*args: str | int, *args: object,
**kwargs: str | int, **kwargs: object,
) -> HttpResponse: ) -> HttpResponse:
"""Capture optional ?limit query parameter. """Capture optional ?limit query parameter.
@ -283,8 +283,8 @@ class KickCategoryFeed(TTVDropsBaseFeed):
def __call__( def __call__(
self, self,
request: HttpRequest, request: HttpRequest,
*args: str | int, *args: object,
**kwargs: str | int, **kwargs: object,
) -> HttpResponse: ) -> HttpResponse:
"""Capture optional ?limit query parameter. """Capture optional ?limit query parameter.
@ -372,8 +372,8 @@ class KickCampaignFeed(TTVDropsBaseFeed):
def __call__( def __call__(
self, self,
request: HttpRequest, request: HttpRequest,
*args: str | int, *args: object,
**kwargs: str | int, **kwargs: object,
) -> HttpResponse: ) -> HttpResponse:
"""Capture optional ?limit query parameter. """Capture optional ?limit query parameter.
@ -481,8 +481,8 @@ class KickCategoryCampaignFeed(TTVDropsBaseFeed):
def __call__( def __call__(
self, self,
request: HttpRequest, request: HttpRequest,
*args: str | int, *args: object,
**kwargs: str | int, **kwargs: object,
) -> HttpResponse: ) -> HttpResponse:
"""Capture optional ?limit query parameter. """Capture optional ?limit query parameter.

View file

@ -1,7 +1,4 @@
from __future__ import annotations
import logging import logging
from datetime import datetime
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import httpx import httpx
@ -17,8 +14,6 @@ from kick.models import KickUser
from kick.schemas import KickDropsResponseSchema from kick.schemas import KickDropsResponseSchema
if TYPE_CHECKING: if TYPE_CHECKING:
from collections.abc import Mapping
from django.core.management.base import CommandParser from django.core.management.base import CommandParser
from kick.schemas import KickCategorySchema from kick.schemas import KickCategorySchema
@ -28,26 +23,6 @@ if TYPE_CHECKING:
logger: logging.Logger = logging.getLogger("ttvdrops") logger: logging.Logger = logging.getLogger("ttvdrops")
type KickImportModel = (
KickOrganization
| KickCategory
| KickDropCampaign
| KickUser
| KickChannel
| KickReward
)
type KickFieldValue = (
str
| bool
| int
| datetime
| KickOrganization
| KickCategory
| KickDropCampaign
| KickUser
| None
)
KICK_DROPS_API_URL = "https://web.kick.com/api/v1/drops/campaigns" KICK_DROPS_API_URL = "https://web.kick.com/api/v1/drops/campaigns"
# Kick's public API requires a browser-like User-Agent. # Kick's public API requires a browser-like User-Agent.
@ -73,26 +48,7 @@ class Command(BaseCommand):
help="API endpoint to fetch (default: %(default)s).", help="API endpoint to fetch (default: %(default)s).",
) )
@staticmethod def handle(self, *args: object, **options: object) -> None: # noqa: ARG002
def _save_if_changed(
obj: KickImportModel,
defaults: Mapping[str, KickFieldValue],
) -> None:
"""Persist only changed fields to avoid unnecessary updates."""
changed_fields: list[str] = []
for field, new_value in defaults.items():
if getattr(obj, field, None) != new_value:
setattr(obj, field, new_value)
changed_fields.append(field)
if changed_fields:
obj.save(update_fields=changed_fields)
def handle(
self,
*_args: str,
**options: str | bool | int | None,
) -> None:
"""Main entry point for the command.""" """Main entry point for the command."""
url: str = str(options["url"]) url: str = str(options["url"])
self.stdout.write(f"Fetching Kick drops from {url} ...") self.stdout.write(f"Fetching Kick drops from {url} ...")
@ -143,50 +99,39 @@ class Command(BaseCommand):
self.style.SUCCESS(f"Imported {imported}/{len(campaigns)} campaign(s)."), self.style.SUCCESS(f"Imported {imported}/{len(campaigns)} campaign(s)."),
) )
def _import_campaign(self, data: KickDropCampaignSchema) -> None: # noqa: PLR0914, PLR0915 def _import_campaign(self, data: KickDropCampaignSchema) -> None:
"""Import a single campaign and all its related objects.""" """Import a single campaign and all its related objects."""
# Organization # Organisation
org_data: KickOrganizationSchema = data.organization org_data: KickOrganizationSchema = data.organization
org_defaults: dict[str, str | bool] = { org, created = KickOrganization.objects.update_or_create(
kick_id=org_data.id,
defaults={
"name": org_data.name, "name": org_data.name,
"logo_url": org_data.logo_url, "logo_url": org_data.logo_url,
"url": org_data.url, "url": org_data.url,
"restricted": org_data.restricted, "restricted": org_data.restricted,
} },
org: KickOrganization | None = KickOrganization.objects.filter( )
kick_id=org_data.id,
).first()
created: bool = org is None
if org is None:
org = KickOrganization.objects.create(kick_id=org_data.id, **org_defaults)
else:
self._save_if_changed(org, org_defaults)
if created: if created:
logger.info("Created new organization: %s", org.kick_id) logger.info("Created new organization: %s", org.kick_id)
# Category # Category
cat_data: KickCategorySchema = data.category cat_data: KickCategorySchema = data.category
category_defaults: dict[str, KickFieldValue] = { category, created = KickCategory.objects.update_or_create(
kick_id=cat_data.id,
defaults={
"name": cat_data.name, "name": cat_data.name,
"slug": cat_data.slug, "slug": cat_data.slug,
"image_url": cat_data.image_url, "image_url": cat_data.image_url,
} },
category: KickCategory | None = KickCategory.objects.filter(
kick_id=cat_data.id,
).first()
created = category is None
if category is None:
category = KickCategory.objects.create(
kick_id=cat_data.id,
**category_defaults,
) )
else:
self._save_if_changed(category, category_defaults)
if created: if created:
logger.info("Created new category: %s", category.kick_id) logger.info("Created new category: %s", category.kick_id)
# Campaign # Campaign
campaign_defaults: dict[str, KickFieldValue] = { campaign, created = KickDropCampaign.objects.update_or_create(
kick_id=data.id,
defaults={
"name": data.name, "name": data.name,
"status": data.status, "status": data.status,
"starts_at": data.starts_at, "starts_at": data.starts_at,
@ -200,18 +145,8 @@ class Command(BaseCommand):
"created_at": data.created_at, "created_at": data.created_at,
"api_updated_at": data.updated_at, "api_updated_at": data.updated_at,
"is_fully_imported": True, "is_fully_imported": True,
} },
campaign: KickDropCampaign | None = KickDropCampaign.objects.filter(
kick_id=data.id,
).first()
created = campaign is None
if campaign is None:
campaign = KickDropCampaign.objects.create(
kick_id=data.id,
**campaign_defaults,
) )
else:
self._save_if_changed(campaign, campaign_defaults)
if created: if created:
logger.info("Created new campaign: %s", campaign.kick_id) logger.info("Created new campaign: %s", campaign.kick_id)
@ -219,38 +154,25 @@ class Command(BaseCommand):
channel_objs: list[KickChannel] = [] channel_objs: list[KickChannel] = []
for ch_data in data.channels: for ch_data in data.channels:
user_data: KickUserSchema = ch_data.user user_data: KickUserSchema = ch_data.user
user_defaults: dict[str, KickFieldValue] = { user, created = KickUser.objects.update_or_create(
kick_id=user_data.id,
defaults={
"username": user_data.username, "username": user_data.username,
"profile_picture": user_data.profile_picture, "profile_picture": user_data.profile_picture,
} },
user: KickUser | None = KickUser.objects.filter( )
kick_id=user_data.id,
).first()
created = user is None
if user is None:
user = KickUser.objects.create(kick_id=user_data.id, **user_defaults)
else:
self._save_if_changed(user, user_defaults)
if created: if created:
logger.info("Created new user: %s", user.kick_id) logger.info("Created new user: %s", user.kick_id)
channel_defaults: dict[str, KickFieldValue] = { channel, created = KickChannel.objects.update_or_create(
kick_id=ch_data.id,
defaults={
"slug": ch_data.slug, "slug": ch_data.slug,
"description": ch_data.description, "description": ch_data.description,
"banner_picture_url": ch_data.banner_picture_url, "banner_picture_url": ch_data.banner_picture_url,
"user": user, "user": user,
} },
channel: KickChannel | None = KickChannel.objects.filter(
kick_id=ch_data.id,
).first()
created = channel is None
if channel is None:
channel = KickChannel.objects.create(
kick_id=ch_data.id,
**channel_defaults,
) )
else:
self._save_if_changed(channel, channel_defaults)
if created: if created:
logger.info("Created new channel: %s", channel.kick_id) logger.info("Created new channel: %s", channel.kick_id)
@ -262,46 +184,36 @@ class Command(BaseCommand):
# Resolve reward's category (may differ from campaign category) # Resolve reward's category (may differ from campaign category)
reward_category: KickCategory = category reward_category: KickCategory = category
if reward_data.category_id != cat_data.id: if reward_data.category_id != cat_data.id:
reward_category = KickCategory.objects.filter( reward_category, created = KickCategory.objects.get_or_create(
kick_id=reward_data.category_id, kick_id=reward_data.category_id,
).first() or KickCategory.objects.create( defaults={"name": "", "slug": "", "image_url": ""},
kick_id=reward_data.category_id,
name="",
slug="",
image_url="",
) )
created = not reward_category.name and not reward_category.slug
if created: if created:
logger.info("Created new category: %s", reward_category.kick_id) logger.info("Created new category: %s", reward_category.kick_id)
# Resolve reward's organization (may differ from campaign org) # Resolve reward's organization (may differ from campaign org)
reward_org: KickOrganization = org reward_org: KickOrganization = org
if reward_data.organization_id != org_data.id: if reward_data.organization_id != org_data.id:
reward_org = KickOrganization.objects.filter( reward_org, created = KickOrganization.objects.get_or_create(
kick_id=reward_data.organization_id, kick_id=reward_data.organization_id,
).first() or KickOrganization.objects.create( defaults={
kick_id=reward_data.organization_id, "name": "",
name="", "logo_url": "",
logo_url="", "url": "",
url="", "restricted": False,
restricted=False, },
) )
created = not reward_org.name and not reward_org.url
if created: if created:
logger.info("Created new organization: %s", reward_org.kick_id) logger.info("Created new organization: %s", reward_org.kick_id)
reward_defaults: dict[str, KickFieldValue] = { KickReward.objects.update_or_create(
kick_id=reward_data.id,
defaults={
"name": reward_data.name, "name": reward_data.name,
"image_url": reward_data.image_url, "image_url": reward_data.image_url,
"required_units": reward_data.required_units, "required_units": reward_data.required_units,
"campaign": campaign, "campaign": campaign,
"category": reward_category, "category": reward_category,
"organization": reward_org, "organization": reward_org,
} },
reward: KickReward | None = KickReward.objects.filter( )
kick_id=reward_data.id,
).first()
if reward is None:
KickReward.objects.create(kick_id=reward_data.id, **reward_defaults)
else:
self._save_if_changed(reward, reward_defaults)

View file

@ -292,17 +292,8 @@ class KickDropCampaign(auto_prefetch.Model):
def image_url(self) -> str: def image_url(self) -> str:
"""Return the image URL for the campaign.""" """Return the image URL for the campaign."""
# Image from first drop # Image from first drop
rewards_prefetched: list[KickReward] | None = getattr( if self.rewards.exists(): # pyright: ignore[reportAttributeAccessIssue]
self, first_reward: KickReward | None = self.rewards.first() # pyright: ignore[reportAttributeAccessIssue]
"rewards_ordered",
None,
)
if rewards_prefetched is not None:
first_reward: KickReward | None = (
rewards_prefetched[0] if rewards_prefetched else None
)
else:
first_reward = self.rewards.first() # pyright: ignore[reportAttributeAccessIssue]
if first_reward and first_reward.image_url: if first_reward and first_reward.image_url:
return first_reward.full_image_url return first_reward.full_image_url
@ -361,24 +352,7 @@ class KickDropCampaign(auto_prefetch.Model):
If both a base reward and a "(Con)" variant exist, prefer the base reward name. If both a base reward and a "(Con)" variant exist, prefer the base reward name.
""" """
rewards_by_name: dict[str, KickReward] = {} rewards_by_name: dict[str, KickReward] = {}
prefetched_rewards: list[KickReward] | None = getattr( for reward in self.rewards.all().order_by("required_units", "name", "kick_id"): # pyright: ignore[reportAttributeAccessIssue]
self,
"_prefetched_objects_cache",
{},
).get("rewards")
if prefetched_rewards is not None:
rewards_iterable = sorted(
prefetched_rewards,
key=lambda reward: (reward.required_units, reward.name, reward.kick_id),
)
else:
rewards_iterable = self.rewards.all().order_by( # pyright: ignore[reportAttributeAccessIssue]
"required_units",
"name",
"kick_id",
)
for reward in rewards_iterable:
key: str = self._normalized_reward_name(reward.name) key: str = self._normalized_reward_name(reward.name)
existing: KickReward | None = rewards_by_name.get(key) existing: KickReward | None = rewards_by_name.get(key)
if existing is None: if existing is None:

View file

@ -12,10 +12,8 @@ from unittest.mock import patch
import httpx import httpx
import pytest import pytest
from django.core.management import call_command from django.core.management import call_command
from django.db import connection
from django.test import Client from django.test import Client
from django.test import TestCase from django.test import TestCase
from django.test.utils import CaptureQueriesContext
from django.urls import reverse from django.urls import reverse
from django.utils import timezone from django.utils import timezone
from pydantic import ValidationError from pydantic import ValidationError
@ -416,37 +414,6 @@ class KickDropCampaignMergedRewardsTest(TestCase):
assert len(merged) == 1 assert len(merged) == 1
assert merged[0].name == "9th Anniversary Cake & Confetti" assert merged[0].name == "9th Anniversary Cake & Confetti"
def test_uses_prefetched_rewards_without_extra_queries(self) -> None:
"""When rewards are prefetched, merged_rewards should not hit the database again."""
campaign: KickDropCampaign = self._make_campaign()
KickReward.objects.create(
kick_id="reward-prefetch-a",
name="Alpha Reward",
image_url="drops/reward-image/alpha.png",
required_units=10,
campaign=campaign,
category=campaign.category,
organization=campaign.organization,
)
KickReward.objects.create(
kick_id="reward-prefetch-b",
name="Alpha Reward (Con)",
image_url="drops/reward-image/alpha-con.png",
required_units=10,
campaign=campaign,
category=campaign.category,
organization=campaign.organization,
)
campaign = KickDropCampaign.objects.prefetch_related("rewards").get(
pk=campaign.pk,
)
with self.assertNumQueries(0):
merged: list[KickReward] = campaign.merged_rewards
assert len(merged) == 1
assert merged[0].name == "Alpha Reward"
# MARK: Management command tests # MARK: Management command tests
class ImportKickDropsCommandTest(TestCase): class ImportKickDropsCommandTest(TestCase):
@ -579,115 +546,24 @@ class KickDashboardViewTest(TestCase):
) )
assert campaign.name in response.content.decode() assert campaign.name in response.content.decode()
def test_dashboard_query_count_stays_flat_with_more_campaigns(self) -> None:
"""Dashboard SELECT query count should stay flat as active campaign count grows."""
def _create_active_campaign(index: int) -> KickDropCampaign:
org: KickOrganization = KickOrganization.objects.create(
kick_id=f"org-qc-{index}",
name=f"Org QC {index}",
)
cat: KickCategory = KickCategory.objects.create(
kick_id=10000 + index,
name=f"Cat QC {index}",
slug=f"cat-qc-{index}",
)
campaign: KickDropCampaign = KickDropCampaign.objects.create(
kick_id=f"camp-qc-{index}",
name=f"Campaign QC {index}",
status="active",
starts_at=dt(2020, 1, 1, tzinfo=UTC),
ends_at=dt(2099, 12, 31, tzinfo=UTC),
organization=org,
category=cat,
rule_id=1,
rule_name="Watch to redeem",
is_fully_imported=True,
)
user: KickUser = KickUser.objects.create(
kick_id=3000000 + index,
username=f"qcuser{index}",
)
channel: KickChannel = KickChannel.objects.create(
kick_id=2000000 + index,
slug=f"qc-channel-{index}",
user=user,
)
campaign.channels.add(channel)
KickReward.objects.create(
kick_id=f"reward-qc-{index}-a",
name="Alpha Reward",
image_url="drops/reward-image/alpha.png",
required_units=30,
campaign=campaign,
category=cat,
organization=org,
)
KickReward.objects.create(
kick_id=f"reward-qc-{index}-b",
name="Alpha Reward (Con)",
image_url="drops/reward-image/alpha-con.png",
required_units=30,
campaign=campaign,
category=cat,
organization=org,
)
return campaign
def _capture_dashboard_select_count() -> int:
with CaptureQueriesContext(connection) as queries:
response: _MonkeyPatchedWSGIResponse = self.client.get(
reverse("kick:dashboard"),
)
assert response.status_code == 200
select_queries: list[str] = [
query_info["sql"]
for query_info in queries.captured_queries
if query_info["sql"].lstrip().upper().startswith("SELECT")
]
return len(select_queries)
_create_active_campaign(1)
baseline_select_count: int = _capture_dashboard_select_count()
for i in range(2, 12):
_create_active_campaign(i)
scaled_select_count: int = _capture_dashboard_select_count()
assert scaled_select_count <= baseline_select_count + 2, (
"Kick dashboard SELECT query count grew with campaign volume; "
f"possible N+1 regression. baseline={baseline_select_count}, "
f"scaled={scaled_select_count}"
)
class KickCampaignListViewTest(TestCase): class KickCampaignListViewTest(TestCase):
"""Tests for the kick campaign list view.""" """Tests for the kick campaign list view."""
@classmethod
def setUpTestData(cls) -> None:
"""Set up shared test data for campaign list view tests."""
cls.org: KickOrganization = KickOrganization.objects.create(
kick_id="org-list",
name="List Org",
)
cls.cat: KickCategory = KickCategory.objects.create(
kick_id=300,
name="List Cat",
slug="list-cat",
)
def _make_campaign( def _make_campaign(
self, self,
kick_id: str, kick_id: str,
name: str, name: str,
status: str = "active", status: str = "active",
) -> KickDropCampaign: ) -> KickDropCampaign:
org, _ = KickOrganization.objects.get_or_create(
kick_id="org-list",
defaults={"name": "List Org"},
)
cat, _ = KickCategory.objects.get_or_create(
kick_id=300,
defaults={"name": "List Cat", "slug": "list-cat"},
)
# Set dates so the active/expired filter works correctly # Set dates so the active/expired filter works correctly
if status == "active": if status == "active":
starts_at = dt(2020, 1, 1, tzinfo=UTC) starts_at = dt(2020, 1, 1, tzinfo=UTC)
@ -701,8 +577,8 @@ class KickCampaignListViewTest(TestCase):
status=status, status=status,
starts_at=starts_at, starts_at=starts_at,
ends_at=ends_at, ends_at=ends_at,
organization=self.org, organization=org,
category=self.cat, category=cat,
rule_id=1, rule_id=1,
rule_name="Watch to redeem", rule_name="Watch to redeem",
is_fully_imported=True, is_fully_imported=True,
@ -935,95 +811,6 @@ class KickOrganizationDetailViewTest(TestCase):
) )
assert response.status_code == 404 assert response.status_code == 404
def test_organization_detail_query_count_stays_flat_with_more_campaigns(
self,
) -> None:
"""Organization detail SELECT query count should stay flat as campaign count grows."""
org: KickOrganization = KickOrganization.objects.create(
kick_id="org-orgdet-qc",
name="Orgdet Query Count",
)
def _create_org_campaign(index: int) -> None:
cat: KickCategory = KickCategory.objects.create(
kick_id=17000 + index,
name=f"Orgdet QC Cat {index}",
slug=f"orgdet-qc-cat-{index}",
)
campaign: KickDropCampaign = KickDropCampaign.objects.create(
kick_id=f"camp-orgdet-qc-{index}",
name=f"Orgdet QC Campaign {index}",
status="active",
starts_at=dt(2020, 1, 1, tzinfo=UTC),
ends_at=dt(2099, 12, 31, tzinfo=UTC),
organization=org,
category=cat,
rule_id=1,
rule_name="Watch to redeem",
is_fully_imported=True,
)
user: KickUser = KickUser.objects.create(
kick_id=3700000 + index,
username=f"orgdetqcuser{index}",
)
channel: KickChannel = KickChannel.objects.create(
kick_id=2700000 + index,
slug=f"orgdet-qc-channel-{index}",
user=user,
)
campaign.channels.add(channel)
KickReward.objects.create(
kick_id=f"reward-orgdet-qc-{index}-a",
name="Org Reward",
image_url="drops/reward-image/org.png",
required_units=30,
campaign=campaign,
category=cat,
organization=org,
)
KickReward.objects.create(
kick_id=f"reward-orgdet-qc-{index}-b",
name="Org Reward (Con)",
image_url="drops/reward-image/org-con.png",
required_units=30,
campaign=campaign,
category=cat,
organization=org,
)
def _capture_org_detail_select_count() -> int:
with CaptureQueriesContext(connection) as queries:
response: _MonkeyPatchedWSGIResponse = self.client.get(
reverse(
"kick:organization_detail",
kwargs={"kick_id": org.kick_id},
),
)
assert response.status_code == 200
select_queries: list[str] = [
query_info["sql"]
for query_info in queries.captured_queries
if query_info["sql"].lstrip().upper().startswith("SELECT")
]
return len(select_queries)
_create_org_campaign(1)
baseline_select_count: int = _capture_org_detail_select_count()
for i in range(2, 12):
_create_org_campaign(i)
scaled_select_count: int = _capture_org_detail_select_count()
assert scaled_select_count <= baseline_select_count + 2, (
"Organization detail SELECT query count grew with campaign volume; "
f"possible N+1 regression. baseline={baseline_select_count}, "
f"scaled={scaled_select_count}"
)
class KickFeedsTest(TestCase): class KickFeedsTest(TestCase):
"""Tests for Kick RSS/Atom/Discord feed endpoints.""" """Tests for Kick RSS/Atom/Discord feed endpoints."""
@ -1155,109 +942,6 @@ class KickFeedsTest(TestCase):
assert not str(discord_timestamp(None)) assert not str(discord_timestamp(None))
class KickEndpointCoverageTest(TestCase):
"""Endpoint smoke coverage for all Kick routes in kick.urls."""
def setUp(self) -> None:
"""Create shared fixtures used by detail and feed endpoints."""
self.org: KickOrganization = KickOrganization.objects.create(
kick_id="org-endpoint-1",
name="Endpoint Org",
logo_url="https://example.com/org-endpoint.png",
)
self.category: KickCategory = KickCategory.objects.create(
kick_id=9123,
name="Endpoint Category",
slug="endpoint-category",
image_url="https://example.com/endpoint-category.png",
)
self.campaign: KickDropCampaign = KickDropCampaign.objects.create(
kick_id="camp-endpoint-1",
name="Endpoint Campaign",
status="active",
starts_at=timezone.now() - timedelta(days=1),
ends_at=timezone.now() + timedelta(days=1),
organization=self.org,
category=self.category,
connect_url="https://example.com/connect",
url="https://example.com/campaign",
rule_id=1,
rule_name="Watch to redeem",
is_fully_imported=True,
)
user: KickUser = KickUser.objects.create(
kick_id=5551001,
username="endpointuser",
)
channel: KickChannel = KickChannel.objects.create(
kick_id=5551002,
slug="endpointchannel",
user=user,
)
self.campaign.channels.add(channel)
KickReward.objects.create(
kick_id="reward-endpoint-1",
name="Endpoint Reward",
image_url="drops/reward-image/endpoint.png",
required_units=20,
campaign=self.campaign,
category=self.category,
organization=self.org,
)
def test_all_kick_html_endpoints_return_success(self) -> None:
"""All Kick HTML endpoints should render successfully with populated fixtures."""
html_routes: list[tuple[str, dict[str, str | int]]] = [
("kick:dashboard", {}),
("kick:campaign_list", {}),
("kick:campaign_detail", {"kick_id": self.campaign.kick_id}),
("kick:game_list", {}),
("kick:game_detail", {"kick_id": self.category.kick_id}),
("kick:category_list", {}),
("kick:category_detail", {"kick_id": self.category.kick_id}),
("kick:organization_list", {}),
("kick:organization_detail", {"kick_id": self.org.kick_id}),
]
for route_name, kwargs in html_routes:
response: _MonkeyPatchedWSGIResponse = self.client.get(
reverse(route_name, kwargs=kwargs),
)
assert response.status_code == 200, route_name
def test_all_kick_feed_endpoints_return_success(self) -> None:
"""All Kick RSS/Atom/Discord feed endpoints should return XML responses."""
feed_routes: list[tuple[str, dict[str, int]]] = [
("kick:campaign_feed", {}),
("kick:game_feed", {}),
("kick:game_campaign_feed", {"kick_id": self.category.kick_id}),
("kick:category_feed", {}),
("kick:category_campaign_feed", {"kick_id": self.category.kick_id}),
("kick:organization_feed", {}),
("kick:campaign_feed_atom", {}),
("kick:game_feed_atom", {}),
("kick:game_campaign_feed_atom", {"kick_id": self.category.kick_id}),
("kick:category_feed_atom", {}),
("kick:category_campaign_feed_atom", {"kick_id": self.category.kick_id}),
("kick:organization_feed_atom", {}),
("kick:campaign_feed_discord", {}),
("kick:game_feed_discord", {}),
("kick:game_campaign_feed_discord", {"kick_id": self.category.kick_id}),
("kick:category_feed_discord", {}),
("kick:category_campaign_feed_discord", {"kick_id": self.category.kick_id}),
("kick:organization_feed_discord", {}),
]
for route_name, kwargs in feed_routes:
response: _MonkeyPatchedWSGIResponse = self.client.get(
reverse(route_name, kwargs=kwargs),
)
assert response.status_code == 200, route_name
assert response["Content-Type"] == "application/xml; charset=utf-8"
class KickDropCampaignFullyImportedTest(TestCase): class KickDropCampaignFullyImportedTest(TestCase):
"""Tests for KickDropCampaign.is_fully_imported field and filtering.""" """Tests for KickDropCampaign.is_fully_imported field and filtering."""

View file

@ -532,7 +532,7 @@ def organization_detail_view(request: HttpRequest, kick_id: str) -> HttpResponse
KickDropCampaign.objects KickDropCampaign.objects
.filter(organization=org) .filter(organization=org)
.select_related("category") .select_related("category")
.prefetch_related("rewards", "channels__user") .prefetch_related("rewards")
.order_by("-starts_at"), .order_by("-starts_at"),
) )

View file

@ -31,7 +31,6 @@ dependencies = [
"setproctitle", "setproctitle",
"sitemap-parser", "sitemap-parser",
"tqdm", "tqdm",
"django-zeal>=2.1.0",
] ]
@ -52,7 +51,6 @@ dev = [
DJANGO_SETTINGS_MODULE = "config.settings" DJANGO_SETTINGS_MODULE = "config.settings"
python_files = ["test_*.py", "*_test.py"] python_files = ["test_*.py", "*_test.py"]
addopts = "--tb=short -n auto --cov" addopts = "--tb=short -n auto --cov"
markers = ["no_zeal: run test without zeal_context N+1 checks"]
filterwarnings = [ filterwarnings = [
"ignore:Parsing dates involving a day of month without a year specified is ambiguous:DeprecationWarning", "ignore:Parsing dates involving a day of month without a year specified is ambiguous:DeprecationWarning",
] ]

View file

@ -4,11 +4,11 @@
Chat Badges Chat Badges
{% endblock title %} {% endblock title %}
{% block content %} {% block content %}
<h1>{{ badge_data|length }} Twitch Chat Badges</h1> <h1>{{ badge_sets.count }} Twitch Chat Badges</h1>
<div> <div>
<a href="{% url 'twitch:dashboard' %}">Twitch</a> > Badges <a href="{% url 'twitch:dashboard' %}">Twitch</a> > Badges
</div> </div>
{% if badge_data %} {% if badge_sets %}
{% for data in badge_data %} {% for data in badge_data %}
<h2> <h2>
<a href="{% url 'twitch:badge_set_detail' set_id=data.set.set_id %}">{{ data.set.set_id }}</a> <a href="{% url 'twitch:badge_set_detail' set_id=data.set.set_id %}">{{ data.set.set_id }}</a>

View file

@ -68,8 +68,8 @@
flex-shrink: 0"> flex-shrink: 0">
<div> <div>
<a href="{% url 'twitch:campaign_detail' campaign_data.campaign.twitch_id %}"> <a href="{% url 'twitch:campaign_detail' campaign_data.campaign.twitch_id %}">
{% picture campaign_data.image_url alt="Image for "|add:campaign_data.campaign.name width=120 %} {% picture campaign_data.campaign.image_best_url|default:campaign_data.campaign.image_url alt="Image for "|add:campaign_data.campaign.name width=120 %}
<h4 style="margin: 0.5rem 0; text-align: left;">{{ campaign_data.clean_name }}</h4> <h4 style="margin: 0.5rem 0; text-align: left;">{{ campaign_data.campaign.clean_name }}</h4>
</a> </a>
<!-- End time --> <!-- End time -->
<time datetime="{{ campaign_data.campaign.end_at|date:'c' }}" <time datetime="{{ campaign_data.campaign.end_at|date:'c' }}"
@ -114,11 +114,11 @@
{% endfor %} {% endfor %}
{% else %} {% else %}
<!-- No allowed channels means drops are available in any stream of the game's category --> <!-- No allowed channels means drops are available in any stream of the game's category -->
{% if campaign_data.game_twitch_directory_url %} {% if campaign.game.twitch_directory_url %}
<li> <li>
<a href="{{ campaign_data.game_twitch_directory_url }}" <a href="{{ campaign.game.twitch_directory_url }}"
title="Open Twitch category page for {{ campaign_data.game_display_name }} with Drops filter"> title="Open Twitch category page for {{ campaign_data.campaign.game.display_name }} with Drops filter">
Browse {{ campaign_data.game_display_name }} category Browse {{ campaign_data.campaign.game.display_name }} category
</a> </a>
</li> </li>
{% else %} {% else %}
@ -131,10 +131,10 @@
</li> </li>
{% endif %} {% endif %}
{% else %} {% else %}
{% if campaign_data.game_twitch_directory_url %} {% if campaign_data.campaign.game.twitch_directory_url %}
<li> <li>
<a href="{{ campaign_data.game_twitch_directory_url }}" <a href="{{ campaign_data.campaign.game.twitch_directory_url }}"
title="Find streamers playing {{ campaign_data.game_display_name }} with drops enabled"> title="Find streamers playing {{ campaign_data.campaign.game.display_name }} with drops enabled">
Go to a participating live channel Go to a participating live channel
</a> </a>
</li> </li>

View file

@ -161,7 +161,7 @@ class TTVDropsBaseFeed(Feed):
response.content = content.encode(encoding) response.content = content.encode(encoding)
def get_feed(self, obj: Model | None, request: HttpRequest) -> SyndicationFeed: def get_feed(self, obj: object, request: HttpRequest) -> SyndicationFeed:
"""Use deterministic BASE_URL handling for syndication feed generation. """Use deterministic BASE_URL handling for syndication feed generation.
Returns: Returns:
@ -199,8 +199,8 @@ class TTVDropsBaseFeed(Feed):
def __call__( def __call__(
self, self,
request: HttpRequest, request: HttpRequest,
*args: str | int, *args: object,
**kwargs: str | int, **kwargs: object,
) -> HttpResponse: ) -> HttpResponse:
"""Return feed response with inline content disposition for browser display.""" """Return feed response with inline content disposition for browser display."""
original_stylesheets: list[str] = self.stylesheets original_stylesheets: list[str] = self.stylesheets
@ -745,8 +745,8 @@ class OrganizationRSSFeed(TTVDropsBaseFeed):
def __call__( def __call__(
self, self,
request: HttpRequest, request: HttpRequest,
*args: str | int, *args: object,
**kwargs: str | int, **kwargs: object,
) -> HttpResponse: ) -> HttpResponse:
"""Override to capture limit parameter from request. """Override to capture limit parameter from request.
@ -822,8 +822,8 @@ class GameFeed(TTVDropsBaseFeed):
def __call__( def __call__(
self, self,
request: HttpRequest, request: HttpRequest,
*args: str | int, *args: object,
**kwargs: str | int, **kwargs: object,
) -> HttpResponse: ) -> HttpResponse:
"""Override to capture limit parameter from request. """Override to capture limit parameter from request.
@ -975,8 +975,8 @@ class DropCampaignFeed(TTVDropsBaseFeed):
def __call__( def __call__(
self, self,
request: HttpRequest, request: HttpRequest,
*args: str | int, *args: object,
**kwargs: str | int, **kwargs: object,
) -> HttpResponse: ) -> HttpResponse:
"""Override to capture limit parameter from request. """Override to capture limit parameter from request.
@ -1114,8 +1114,8 @@ class GameCampaignFeed(TTVDropsBaseFeed):
def __call__( def __call__(
self, self,
request: HttpRequest, request: HttpRequest,
*args: str | int, *args: object,
**kwargs: str | int, **kwargs: object,
) -> HttpResponse: ) -> HttpResponse:
"""Override to capture limit parameter from request. """Override to capture limit parameter from request.
@ -1293,8 +1293,8 @@ class RewardCampaignFeed(TTVDropsBaseFeed):
def __call__( def __call__(
self, self,
request: HttpRequest, request: HttpRequest,
*args: str | int, *args: object,
**kwargs: str | int, **kwargs: object,
) -> HttpResponse: ) -> HttpResponse:
"""Override to capture limit parameter from request. """Override to capture limit parameter from request.

View file

@ -7,7 +7,6 @@ from compression import zstd
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Protocol
from django.conf import settings from django.conf import settings
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
@ -20,15 +19,6 @@ if TYPE_CHECKING:
from argparse import ArgumentParser from argparse import ArgumentParser
class SupportsStr(Protocol):
"""Protocol for values that provide a string representation."""
def __str__(self) -> str: ...
type SqlSerializable = bool | int | float | bytes | SupportsStr | None
class Command(BaseCommand): class Command(BaseCommand):
"""Create a compressed SQL dump of the Twitch and Kick dataset tables.""" """Create a compressed SQL dump of the Twitch and Kick dataset tables."""
@ -295,7 +285,7 @@ def _write_postgres_dump(output_path: Path, tables: list[str]) -> None:
raise CommandError(msg) raise CommandError(msg)
def _sql_literal(value: SqlSerializable) -> str: def _sql_literal(value: object) -> str:
"""Convert a Python value to a SQL literal. """Convert a Python value to a SQL literal.
Args: Args:
@ -315,7 +305,7 @@ def _sql_literal(value: SqlSerializable) -> str:
return "'" + str(value).replace("'", "''") + "'" return "'" + str(value).replace("'", "''") + "'"
def _json_default(value: bytes | SupportsStr) -> str: def _json_default(value: object) -> str:
"""Convert non-serializable values to JSON-compatible strings. """Convert non-serializable values to JSON-compatible strings.
Args: Args:

View file

@ -583,31 +583,16 @@ class Command(BaseCommand):
Returns: Returns:
Organization instance. Organization instance.
""" """
cache: dict[str, Organization] = getattr(self, "_org_cache", {}) org_obj, created = Organization.objects.get_or_create(
if not hasattr(self, "_org_cache"):
self._org_cache = cache
cached_org: Organization | None = cache.get(org_data.twitch_id)
if cached_org is not None:
self._save_if_changed(cached_org, {"name": org_data.name})
return cached_org
org_obj: Organization | None = Organization.objects.filter(
twitch_id=org_data.twitch_id, twitch_id=org_data.twitch_id,
).first() defaults={"name": org_data.name},
_created: bool = org_obj is None
if org_obj is None:
org_obj = Organization.objects.create(
twitch_id=org_data.twitch_id,
name=org_data.name,
) )
if not created:
self._save_if_changed(org_obj, {"name": org_data.name})
else:
tqdm.write( tqdm.write(
f"{Fore.GREEN}{Style.RESET_ALL} Created new organization: {org_data.name}", f"{Fore.GREEN}{Style.RESET_ALL} Created new organization: {org_data.name}",
) )
else:
self._save_if_changed(org_obj, {"name": org_data.name})
cache[org_data.twitch_id] = org_obj
return org_obj return org_obj
@ -636,10 +621,6 @@ class Command(BaseCommand):
if campaign_org_obj: if campaign_org_obj:
owner_orgs.add(campaign_org_obj) owner_orgs.add(campaign_org_obj)
cache: dict[str, Game] = getattr(self, "_game_cache", {})
if not hasattr(self, "_game_cache"):
self._game_cache = cache
defaults: dict[str, object] = { defaults: dict[str, object] = {
"display_name": game_data.display_name or (game_data.name or ""), "display_name": game_data.display_name or (game_data.name or ""),
"name": game_data.name or "", "name": game_data.name or "",
@ -647,21 +628,9 @@ class Command(BaseCommand):
"box_art": game_data.box_art_url or "", "box_art": game_data.box_art_url or "",
} }
cached_game: Game | None = cache.get(game_data.twitch_id) game_obj, created = Game.objects.get_or_create(
if cached_game is not None:
if owner_orgs:
cached_game.owners.add(*owner_orgs)
self._save_if_changed(cached_game, defaults)
return cached_game
game_obj: Game | None = Game.objects.filter(
twitch_id=game_data.twitch_id, twitch_id=game_data.twitch_id,
).first() defaults=defaults,
created: bool = game_obj is None
if game_obj is None:
game_obj = Game.objects.create(
twitch_id=game_data.twitch_id,
**defaults,
) )
# Set owners (ManyToMany) # Set owners (ManyToMany)
if created or owner_orgs: if created or owner_orgs:
@ -673,7 +642,6 @@ class Command(BaseCommand):
f"{Fore.GREEN}{Style.RESET_ALL} Created new game: {game_data.display_name}", f"{Fore.GREEN}{Style.RESET_ALL} Created new game: {game_data.display_name}",
) )
self._download_game_box_art(game_obj, game_obj.box_art) self._download_game_box_art(game_obj, game_obj.box_art)
cache[game_data.twitch_id] = game_obj
return game_obj return game_obj
def _download_game_box_art(self, game_obj: Game, box_art_url: str | None) -> None: def _download_game_box_art(self, game_obj: Game, box_art_url: str | None) -> None:
@ -733,7 +701,7 @@ class Command(BaseCommand):
return channel_obj return channel_obj
def process_responses( # noqa: PLR0915 def process_responses(
self, self,
responses: list[dict[str, Any]], responses: list[dict[str, Any]],
file_path: Path, file_path: Path,
@ -824,18 +792,13 @@ class Command(BaseCommand):
"account_link_url": drop_campaign.account_link_url, "account_link_url": drop_campaign.account_link_url,
} }
campaign_obj: DropCampaign | None = DropCampaign.objects.filter( campaign_obj, created = DropCampaign.objects.get_or_create(
twitch_id=drop_campaign.twitch_id, twitch_id=drop_campaign.twitch_id,
).first() defaults=defaults,
created: bool = campaign_obj is None
if campaign_obj is None:
campaign_obj = DropCampaign.objects.create(
twitch_id=drop_campaign.twitch_id,
**defaults,
) )
else: if not created:
self._save_if_changed(campaign_obj, defaults) self._save_if_changed(campaign_obj, defaults)
if created: else:
tqdm.write( tqdm.write(
f"{Fore.GREEN}{Style.RESET_ALL} Created new campaign: {drop_campaign.name}", f"{Fore.GREEN}{Style.RESET_ALL} Created new campaign: {drop_campaign.name}",
) )
@ -919,18 +882,13 @@ class Command(BaseCommand):
if end_at_dt is not None: if end_at_dt is not None:
drop_defaults["end_at"] = end_at_dt drop_defaults["end_at"] = end_at_dt
drop_obj: TimeBasedDrop | None = TimeBasedDrop.objects.filter( drop_obj, created = TimeBasedDrop.objects.get_or_create(
twitch_id=drop_schema.twitch_id, twitch_id=drop_schema.twitch_id,
).first() defaults=drop_defaults,
created: bool = drop_obj is None
if drop_obj is None:
drop_obj = TimeBasedDrop.objects.create(
twitch_id=drop_schema.twitch_id,
**drop_defaults,
) )
else: if not created:
self._save_if_changed(drop_obj, drop_defaults) self._save_if_changed(drop_obj, drop_defaults)
if created: else:
tqdm.write( tqdm.write(
f"{Fore.GREEN}{Style.RESET_ALL} Created TimeBasedDrop: {drop_schema.name}", f"{Fore.GREEN}{Style.RESET_ALL} Created TimeBasedDrop: {drop_schema.name}",
) )
@ -942,10 +900,6 @@ class Command(BaseCommand):
def _get_or_update_benefit(self, benefit_schema: DropBenefitSchema) -> DropBenefit: def _get_or_update_benefit(self, benefit_schema: DropBenefitSchema) -> DropBenefit:
"""Return a DropBenefit, creating or updating as needed.""" """Return a DropBenefit, creating or updating as needed."""
cache: dict[str, DropBenefit] = getattr(self, "_benefit_cache", {})
if not hasattr(self, "_benefit_cache"):
self._benefit_cache = cache
distribution_type: str = (benefit_schema.distribution_type or "").strip() distribution_type: str = (benefit_schema.distribution_type or "").strip()
benefit_defaults: dict[str, str | int | datetime | bool | None] = { benefit_defaults: dict[str, str | int | datetime | bool | None] = {
"name": benefit_schema.name, "name": benefit_schema.name,
@ -960,19 +914,9 @@ class Command(BaseCommand):
if created_at_dt: if created_at_dt:
benefit_defaults["created_at"] = created_at_dt benefit_defaults["created_at"] = created_at_dt
cached_benefit: DropBenefit | None = cache.get(benefit_schema.twitch_id) benefit_obj, created = DropBenefit.objects.get_or_create(
if cached_benefit is not None:
self._save_if_changed(cached_benefit, benefit_defaults)
return cached_benefit
benefit_obj: DropBenefit | None = DropBenefit.objects.filter(
twitch_id=benefit_schema.twitch_id, twitch_id=benefit_schema.twitch_id,
).first() defaults=benefit_defaults,
created: bool = benefit_obj is None
if benefit_obj is None:
benefit_obj = DropBenefit.objects.create(
twitch_id=benefit_schema.twitch_id,
**benefit_defaults,
) )
if not created: if not created:
self._save_if_changed(benefit_obj, benefit_defaults) self._save_if_changed(benefit_obj, benefit_defaults)
@ -981,8 +925,6 @@ class Command(BaseCommand):
f"{Fore.GREEN}{Style.RESET_ALL} Created DropBenefit: {benefit_schema.name}", f"{Fore.GREEN}{Style.RESET_ALL} Created DropBenefit: {benefit_schema.name}",
) )
cache[benefit_schema.twitch_id] = benefit_obj
return benefit_obj return benefit_obj
def _process_benefit_edges( def _process_benefit_edges(
@ -1004,16 +946,10 @@ class Command(BaseCommand):
) )
defaults = {"entitlement_limit": edge_schema.entitlement_limit} defaults = {"entitlement_limit": edge_schema.entitlement_limit}
edge_obj: DropBenefitEdge | None = DropBenefitEdge.objects.filter( edge_obj, created = DropBenefitEdge.objects.get_or_create(
drop=drop_obj, drop=drop_obj,
benefit=benefit_obj, benefit=benefit_obj,
).first() defaults=defaults,
created: bool = edge_obj is None
if edge_obj is None:
edge_obj = DropBenefitEdge.objects.create(
drop=drop_obj,
benefit=benefit_obj,
**defaults,
) )
if not created: if not created:
self._save_if_changed(edge_obj, defaults) self._save_if_changed(edge_obj, defaults)

View file

@ -39,13 +39,9 @@ class Command(BaseCommand):
help="Re-download even if a local box art file already exists.", help="Re-download even if a local box art file already exists.",
) )
def handle( # noqa: PLR0914, PLR0915 def handle(self, *_args: object, **options: object) -> None: # noqa: PLR0914, PLR0915
self,
*_args: str,
**options: str | bool | int | None,
) -> None:
"""Download Twitch box art images for all games.""" """Download Twitch box art images for all games."""
limit_value: str | bool | int | None = options.get("limit") limit_value: object | None = options.get("limit")
limit: int | None = limit_value if isinstance(limit_value, int) else None limit: int | None = limit_value if isinstance(limit_value, int) else None
force: bool = bool(options.get("force")) force: bool = bool(options.get("force"))

View file

@ -50,14 +50,10 @@ class Command(BaseCommand):
help="Re-download even if a local image file already exists.", help="Re-download even if a local image file already exists.",
) )
def handle( def handle(self, *_args: object, **options: object) -> None:
self,
*_args: str,
**options: str | bool | int | None,
) -> None:
"""Download images for campaigns, benefits, and/or rewards.""" """Download images for campaigns, benefits, and/or rewards."""
model_choice: str = str(options.get("model", "all")) model_choice: str = str(options.get("model", "all"))
limit_value: str | bool | int | None = options.get("limit") limit_value: object | None = options.get("limit")
limit: int | None = limit_value if isinstance(limit_value, int) else None limit: int | None = limit_value if isinstance(limit_value, int) else None
force: bool = bool(options.get("force")) force: bool = bool(options.get("force"))

View file

@ -196,12 +196,9 @@ class Command(BaseCommand):
Returns: Returns:
Tuple of (ChatBadgeSet instance, created flag) Tuple of (ChatBadgeSet instance, created flag)
""" """
badge_set_obj: ChatBadgeSet | None = ChatBadgeSet.objects.filter( badge_set_obj, created = ChatBadgeSet.objects.get_or_create(
set_id=badge_set_schema.set_id, set_id=badge_set_schema.set_id,
).first() )
created: bool = badge_set_obj is None
if badge_set_obj is None:
badge_set_obj = ChatBadgeSet.objects.create(set_id=badge_set_schema.set_id)
if created: if created:
self.stdout.write( self.stdout.write(
@ -261,25 +258,11 @@ class Command(BaseCommand):
"click_url": version_schema.click_url, "click_url": version_schema.click_url,
} }
badge_obj: ChatBadge | None = ChatBadge.objects.filter( _badge_obj, created = ChatBadge.objects.update_or_create(
badge_set=badge_set_obj, badge_set=badge_set_obj,
badge_id=version_schema.badge_id, badge_id=version_schema.badge_id,
).first() defaults=defaults,
created: bool = badge_obj is None
if badge_obj is None:
badge_obj = ChatBadge.objects.create(
badge_set=badge_set_obj,
badge_id=version_schema.badge_id,
**defaults,
) )
else:
changed_fields: list[str] = []
for field, value in defaults.items():
if getattr(badge_obj, field) != value:
setattr(badge_obj, field, value)
changed_fields.append(field)
if changed_fields:
badge_obj.save(update_fields=changed_fields)
if created: if created:
msg: str = ( msg: str = (

View file

@ -1,22 +0,0 @@
# Generated by Django 6.0.4 on 2026-04-10 23:02
from django.db import migrations
from django.db import models
class Migration(migrations.Migration):
"""Add an index on DropBenefit for distribution_type and name."""
dependencies = [
("twitch", "0016_mark_all_drops_fully_imported"),
]
operations = [
migrations.AddIndex(
model_name="dropbenefit",
index=models.Index(
fields=["distribution_type", "name"],
name="twitch_drop_distrib_70d961_idx",
),
),
]

View file

@ -1,39 +0,0 @@
# Generated by Django 6.0.4 on 2026-04-10 23:18
from django.db import migrations
class Migration(migrations.Migration):
"""Rename some indexes on DropCampaign and RewardCampaign to be more descriptive."""
dependencies = [
("twitch", "0017_dropbenefit_twitch_drop_distrib_70d961_idx"),
]
operations = [
migrations.RenameIndex(
model_name="dropcampaign",
new_name="tw_drop_start_desc_idx",
old_name="twitch_drop_start_a_929f09_idx",
),
migrations.RenameIndex(
model_name="dropcampaign",
new_name="tw_drop_start_end_idx",
old_name="twitch_drop_start_a_6e5fb6_idx",
),
migrations.RenameIndex(
model_name="dropcampaign",
new_name="tw_drop_start_end_game_idx",
old_name="twitch_drop_start_a_b02d4c_idx",
),
migrations.RenameIndex(
model_name="rewardcampaign",
new_name="tw_reward_starts_desc_idx",
old_name="twitch_rewa_starts__4df564_idx",
),
migrations.RenameIndex(
model_name="rewardcampaign",
new_name="tw_reward_starts_ends_idx",
old_name="twitch_rewa_starts__dd909d_idx",
),
]

View file

@ -1,32 +0,0 @@
# Generated by Django 6.0.4 on 2026-04-10 23:25
from django.db import migrations
from django.db import models
class Migration(migrations.Migration):
"""Add indexes to optimize queries for the campaign list view."""
dependencies = [
(
"twitch",
"0018_rename_twitch_drop_start_a_929f09_idx_tw_drop_start_desc_idx_and_more",
),
]
operations = [
migrations.AddIndex(
model_name="dropcampaign",
index=models.Index(
fields=["is_fully_imported", "-start_at"],
name="tw_drop_imported_start_idx",
),
),
migrations.AddIndex(
model_name="dropcampaign",
index=models.Index(
fields=["is_fully_imported", "start_at", "end_at"],
name="tw_drop_imported_start_end_idx",
),
),
]

View file

@ -1,13 +1,10 @@
import logging import logging
from collections import OrderedDict
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Any
import auto_prefetch import auto_prefetch
from django.conf import settings from django.conf import settings
from django.contrib.postgres.indexes import GinIndex from django.contrib.postgres.indexes import GinIndex
from django.db import models from django.db import models
from django.db.models import Prefetch
from django.urls import reverse from django.urls import reverse
from django.utils import timezone from django.utils import timezone
from django.utils.html import format_html from django.utils.html import format_html
@ -17,7 +14,6 @@ from twitch.utils import normalize_twitch_box_art_url
if TYPE_CHECKING: if TYPE_CHECKING:
import datetime import datetime
from django.db.models import QuerySet
logger: logging.Logger = logging.getLogger("ttvdrops") logger: logging.Logger = logging.getLogger("ttvdrops")
@ -491,7 +487,7 @@ class DropCampaign(auto_prefetch.Model):
class Meta(auto_prefetch.Model.Meta): class Meta(auto_prefetch.Model.Meta):
ordering = ["-start_at"] ordering = ["-start_at"]
indexes = [ indexes = [
models.Index(fields=["-start_at"], name="tw_drop_start_desc_idx"), models.Index(fields=["-start_at"]),
models.Index(fields=["end_at"]), models.Index(fields=["end_at"]),
models.Index(fields=["game"]), models.Index(fields=["game"]),
models.Index(fields=["twitch_id"]), models.Index(fields=["twitch_id"]),
@ -503,181 +499,15 @@ class DropCampaign(auto_prefetch.Model):
models.Index(fields=["updated_at"]), models.Index(fields=["updated_at"]),
# Composite indexes for common queries # Composite indexes for common queries
models.Index(fields=["game", "-start_at"]), models.Index(fields=["game", "-start_at"]),
models.Index(fields=["start_at", "end_at"], name="tw_drop_start_end_idx"), models.Index(fields=["start_at", "end_at"]),
# For dashboard and game_detail active campaign filtering # For dashboard and game_detail active campaign filtering
models.Index( models.Index(fields=["start_at", "end_at", "game"]),
fields=["start_at", "end_at", "game"],
name="tw_drop_start_end_game_idx",
),
models.Index(fields=["end_at", "-start_at"]), models.Index(fields=["end_at", "-start_at"]),
# For campaign list view: is_fully_imported filter + ordering
models.Index(
fields=["is_fully_imported", "-start_at"],
name="tw_drop_imported_start_idx",
),
# For campaign list view: is_fully_imported + active-window filter
models.Index(
fields=["is_fully_imported", "start_at", "end_at"],
name="tw_drop_imported_start_end_idx",
),
] ]
def __str__(self) -> str: def __str__(self) -> str:
return self.name return self.name
@classmethod
def for_campaign_list(
cls,
now: datetime.datetime,
*,
game_twitch_id: str | None = None,
status: str | None = None,
) -> models.QuerySet[DropCampaign]:
"""Return fully-imported campaigns with relations needed by the campaign list view.
Args:
now: Current timestamp used to evaluate status filters.
game_twitch_id: Optional Twitch game ID to filter campaigns by.
status: Optional status filter; one of "active", "upcoming", or "expired".
Returns:
QuerySet of campaigns ordered by newest start date.
"""
queryset = (
cls.objects
.filter(is_fully_imported=True)
.select_related("game")
.prefetch_related(
"game__owners",
models.Prefetch(
"time_based_drops",
queryset=TimeBasedDrop.objects.prefetch_related("benefits"),
),
)
.order_by("-start_at")
)
if game_twitch_id:
queryset = queryset.filter(game__twitch_id=game_twitch_id)
if status == "active":
queryset = queryset.filter(start_at__lte=now, end_at__gte=now)
elif status == "upcoming":
queryset = queryset.filter(start_at__gt=now)
elif status == "expired":
queryset = queryset.filter(end_at__lt=now)
return queryset
@classmethod
def active_for_dashboard(
cls,
now: datetime.datetime,
) -> models.QuerySet[DropCampaign]:
"""Return active campaigns with relations needed by the dashboard.
Args:
now: Current timestamp used to evaluate active campaigns.
Returns:
QuerySet of active campaigns ordered by newest start date.
"""
return (
cls.objects
.filter(start_at__lte=now, end_at__gte=now)
.only(
"twitch_id",
"name",
"image_url",
"image_file",
"image_width",
"image_height",
"start_at",
"end_at",
"allow_is_enabled",
"game",
"game__twitch_id",
"game__display_name",
"game__slug",
"game__box_art",
"game__box_art_file",
"game__box_art_width",
"game__box_art_height",
)
.select_related("game")
.prefetch_related(
models.Prefetch(
"game__owners",
queryset=Organization.objects.only("twitch_id", "name"),
),
models.Prefetch(
"allow_channels",
queryset=Channel.objects.only(
"twitch_id",
"name",
"display_name",
).order_by("display_name"),
to_attr="channels_ordered",
),
)
.order_by("-start_at")
)
@staticmethod
def grouped_by_game(
campaigns: models.QuerySet[DropCampaign],
) -> OrderedDict[str, dict[str, Any]]:
"""Group campaigns by game for dashboard rendering.
The grouping keeps insertion order and avoids duplicate per-game cards when
games have multiple owners.
Args:
campaigns: Campaign queryset from active_for_dashboard().
Returns:
Ordered mapping keyed by game twitch_id.
"""
campaigns_by_game: OrderedDict[str, dict[str, Any]] = OrderedDict()
for campaign in campaigns:
game: Game = campaign.game
game_id: str = game.twitch_id
game_display_name: str = game.display_name
game_bucket: dict[str, Any] = campaigns_by_game.setdefault(
game_id,
{
"name": game_display_name,
"box_art": game.box_art_best_url,
"owners": list(game.owners.all()),
"campaigns": [],
},
)
game_bucket["campaigns"].append({
"campaign": campaign,
"clean_name": campaign.clean_name,
"image_url": campaign.listing_image_url,
"allowed_channels": getattr(campaign, "channels_ordered", []),
"game_display_name": game_display_name,
"game_twitch_directory_url": game.twitch_directory_url,
})
return campaigns_by_game
@classmethod
def campaigns_by_game_for_dashboard(
cls,
now: datetime.datetime,
) -> OrderedDict[str, dict[str, Any]]:
"""Return active campaigns grouped by game for dashboard rendering.
Args:
now: Current timestamp used to evaluate active campaigns.
Returns:
Ordered mapping keyed by game twitch_id.
"""
return cls.grouped_by_game(cls.active_for_dashboard(now))
@property @property
def is_active(self) -> bool: def is_active(self) -> bool:
"""Check if the campaign is currently active.""" """Check if the campaign is currently active."""
@ -696,21 +526,19 @@ class DropCampaign(auto_prefetch.Model):
"Skull & Bones - Closed Beta" -> "Closed Beta" (& is replaced "Skull & Bones - Closed Beta" -> "Closed Beta" (& is replaced
with "and") with "and")
""" """
self_game: Game | None = self.game if not self.game or not self.game.display_name:
if not self_game or not self_game.display_name:
return self.name return self.name
game_variations: list[str] = [self_game.display_name] game_variations = [self.game.display_name]
if "&" in self_game.display_name: if "&" in self.game.display_name:
game_variations.append(self_game.display_name.replace("&", "and")) game_variations.append(self.game.display_name.replace("&", "and"))
if "and" in self_game.display_name: if "and" in self.game.display_name:
game_variations.append(self_game.display_name.replace("and", "&")) game_variations.append(self.game.display_name.replace("and", "&"))
for game_name in game_variations: for game_name in game_variations:
# Check for different separators after the game name # Check for different separators after the game name
for separator in [" - ", " | ", " "]: for separator in [" - ", " | ", " "]:
prefix_to_check: str = game_name + separator prefix_to_check = game_name + separator
name: str = self.name name: str = self.name
if name.startswith(prefix_to_check): if name.startswith(prefix_to_check):
@ -745,20 +573,6 @@ class DropCampaign(auto_prefetch.Model):
return "" return ""
@property
def listing_image_url(self) -> str:
"""Return a campaign image URL optimized for list views.
This intentionally avoids traversing drops/benefits to prevent N+1 queries
in list pages that render many campaigns.
"""
try:
if self.image_file and getattr(self.image_file, "url", None):
return self.image_file.url
except (AttributeError, OSError, ValueError) as exc:
logger.debug("Failed to resolve DropCampaign.image_file url: %s", exc)
return self.image_url or ""
@property @property
def duration_iso(self) -> str: def duration_iso(self) -> str:
"""Return the campaign duration in ISO 8601 format (e.g., 'P3DT4H30M'). """Return the campaign duration in ISO 8601 format (e.g., 'P3DT4H30M').
@ -894,8 +708,6 @@ class DropBenefit(auto_prefetch.Model):
models.Index(fields=["is_ios_available"]), models.Index(fields=["is_ios_available"]),
models.Index(fields=["added_at"]), models.Index(fields=["added_at"]),
models.Index(fields=["updated_at"]), models.Index(fields=["updated_at"]),
# Composite index for badge award lookups (distribution_type="BADGE", name__in=titles)
models.Index(fields=["distribution_type", "name"]),
] ]
def __str__(self) -> str: def __str__(self) -> str:
@ -1175,7 +987,7 @@ class RewardCampaign(auto_prefetch.Model):
class Meta(auto_prefetch.Model.Meta): class Meta(auto_prefetch.Model.Meta):
ordering = ["-starts_at"] ordering = ["-starts_at"]
indexes = [ indexes = [
models.Index(fields=["-starts_at"], name="tw_reward_starts_desc_idx"), models.Index(fields=["-starts_at"]),
models.Index(fields=["ends_at"]), models.Index(fields=["ends_at"]),
models.Index(fields=["twitch_id"]), models.Index(fields=["twitch_id"]),
models.Index(fields=["name"]), models.Index(fields=["name"]),
@ -1186,10 +998,7 @@ class RewardCampaign(auto_prefetch.Model):
models.Index(fields=["added_at"]), models.Index(fields=["added_at"]),
models.Index(fields=["updated_at"]), models.Index(fields=["updated_at"]),
# Composite indexes for common queries # Composite indexes for common queries
models.Index( models.Index(fields=["starts_at", "ends_at"]),
fields=["starts_at", "ends_at"],
name="tw_reward_starts_ends_idx",
),
models.Index(fields=["status", "-starts_at"]), models.Index(fields=["status", "-starts_at"]),
] ]
@ -1197,32 +1006,6 @@ class RewardCampaign(auto_prefetch.Model):
"""Return a string representation of the reward campaign.""" """Return a string representation of the reward campaign."""
return f"{self.brand}: {self.name}" if self.brand else self.name return f"{self.brand}: {self.name}" if self.brand else self.name
@classmethod
def active_for_dashboard(
cls,
now: datetime.datetime,
) -> models.QuerySet[RewardCampaign]:
"""Return active reward campaigns with only dashboard-needed fields."""
return (
cls.objects
.filter(starts_at__lte=now, ends_at__gte=now)
.only(
"twitch_id",
"name",
"brand",
"summary",
"external_url",
"starts_at",
"ends_at",
"is_sitewide",
"game",
"game__twitch_id",
"game__display_name",
)
.select_related("game")
.order_by("-starts_at")
)
@property @property
def is_active(self) -> bool: def is_active(self) -> bool:
"""Check if the reward campaign is currently active.""" """Check if the reward campaign is currently active."""
@ -1278,20 +1061,6 @@ class ChatBadgeSet(auto_prefetch.Model):
"""Return a string representation of the badge set.""" """Return a string representation of the badge set."""
return self.set_id return self.set_id
@classmethod
def for_list_view(cls) -> QuerySet[ChatBadgeSet]:
"""Return all badge sets with badges prefetched, ordered by set_id."""
return cls.objects.prefetch_related(
Prefetch("badges", queryset=ChatBadge.objects.order_by("badge_id")),
).order_by("set_id")
@classmethod
def for_detail_view(cls, set_id: str) -> ChatBadgeSet:
"""Return a single badge set with badges prefetched."""
return cls.objects.prefetch_related(
Prefetch("badges", queryset=ChatBadge.objects.order_by("badge_id")),
).get(set_id=set_id)
# MARK: ChatBadge # MARK: ChatBadge
class ChatBadge(auto_prefetch.Model): class ChatBadge(auto_prefetch.Model):
@ -1386,43 +1155,3 @@ class ChatBadge(auto_prefetch.Model):
def __str__(self) -> str: def __str__(self) -> str:
"""Return a string representation of the badge.""" """Return a string representation of the badge."""
return f"{self.badge_set.set_id}/{self.badge_id}: {self.title}" return f"{self.badge_set.set_id}/{self.badge_id}: {self.title}"
@staticmethod
def award_campaigns_by_title(titles: list[str]) -> dict[str, list[DropCampaign]]:
"""Batch-fetch DropCampaigns that award badges matching the given titles.
Avoids N+1 queries: one query traverses DropBenefit TimeBasedDrop DropCampaign
to get (benefit_name, campaign_pk) pairs, then one more query fetches the campaigns.
Returns:
Mapping of badge title to a list of DropCampaigns awarding it.
Titles with no matching campaigns are omitted.
"""
if not titles:
return {}
# Single JOIN query: (benefit_name, campaign_pk) via the M2M chain
# DropBenefit -> DropBenefitEdge -> TimeBasedDrop -> DropCampaign (FK column)
pairs: list[tuple[str, int | None]] = list(
DropBenefit.objects
.filter(distribution_type="BADGE", name__in=titles)
.values_list("name", "drops__campaign_id")
.distinct(),
)
title_to_campaign_pks: dict[str, set[int]] = {}
for name, campaign_pk in pairs:
if campaign_pk is not None:
title_to_campaign_pks.setdefault(name, set()).add(campaign_pk)
if not title_to_campaign_pks:
return {}
all_campaign_pks = {pk for pks in title_to_campaign_pks.values() for pk in pks}
campaigns_by_pk: dict[int, DropCampaign] = {
c.pk: c for c in DropCampaign.objects.filter(pk__in=all_campaign_pks)
}
return {
title: [campaigns_by_pk[pk] for pk in sorted(pks) if pk in campaigns_by_pk]
for title, pks in title_to_campaign_pks.items()
}

View file

@ -7,13 +7,10 @@ from typing import Any
from typing import Literal from typing import Literal
import pytest import pytest
from django.core.files.base import ContentFile
from django.core.handlers.wsgi import WSGIRequest from django.core.handlers.wsgi import WSGIRequest
from django.core.paginator import Paginator from django.core.paginator import Paginator
from django.db import connection
from django.db.models import Max from django.db.models import Max
from django.test import RequestFactory from django.test import RequestFactory
from django.test.utils import CaptureQueriesContext
from django.urls import reverse from django.urls import reverse
from django.utils import timezone from django.utils import timezone
@ -37,17 +34,15 @@ from twitch.views import _truncate_description
if TYPE_CHECKING: if TYPE_CHECKING:
from django.core.handlers.wsgi import WSGIRequest from django.core.handlers.wsgi import WSGIRequest
from django.db.models import QuerySet
from django.test import Client from django.test import Client
from django.test.client import _MonkeyPatchedWSGIResponse from django.test.client import _MonkeyPatchedWSGIResponse
from django.test.utils import ContextList from django.test.utils import ContextList
from pytest_django.fixtures import SettingsWrapper
from twitch.views import Page from twitch.views import Page
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def apply_base_url_override(settings: SettingsWrapper) -> None: def apply_base_url_override(settings: object) -> None:
"""Ensure BASE_URL is globally overridden for all tests.""" """Ensure BASE_URL is globally overridden for all tests."""
settings.BASE_URL = "https://ttvdrops.lovinator.space" # pyright: ignore[reportAttributeAccessIssue] settings.BASE_URL = "https://ttvdrops.lovinator.space" # pyright: ignore[reportAttributeAccessIssue]
@ -494,10 +489,10 @@ class TestChannelListView:
@pytest.mark.django_db @pytest.mark.django_db
def test_dashboard_view(self, client: Client) -> None: def test_dashboard_view(self, client: Client) -> None:
"""Test dashboard view returns 200 and has grouped campaign data in context.""" """Test dashboard view returns 200 and has active_campaigns in context."""
response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:dashboard")) response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:dashboard"))
assert response.status_code == 200 assert response.status_code == 200
assert "campaigns_by_game" in response.context assert "active_campaigns" in response.context
@pytest.mark.django_db @pytest.mark.django_db
def test_dashboard_dedupes_campaigns_for_multi_owner_game( def test_dashboard_dedupes_campaigns_for_multi_owner_game(
@ -542,649 +537,6 @@ class TestChannelListView:
assert game.twitch_id in context["campaigns_by_game"] assert game.twitch_id in context["campaigns_by_game"]
assert len(context["campaigns_by_game"][game.twitch_id]["campaigns"]) == 1 assert len(context["campaigns_by_game"][game.twitch_id]["campaigns"]) == 1
@pytest.mark.django_db
def test_dashboard_queries_use_indexes(self) -> None:
"""Dashboard source queries should use indexes for active-window filtering."""
now: datetime.datetime = timezone.now()
org: Organization = Organization.objects.create(
twitch_id="org_index_test",
name="Org Index Test",
)
game: Game = Game.objects.create(
twitch_id="game_index_test",
name="Game Index Test",
display_name="Game Index Test",
)
game.owners.add(org)
# Add enough rows so the query planner has a reason to pick indexes.
campaigns: list[DropCampaign] = []
for i in range(250):
campaigns.extend((
DropCampaign(
twitch_id=f"inactive_old_{i}",
name=f"Inactive old {i}",
game=game,
operation_names=["DropCampaignDetails"],
start_at=now - timedelta(days=60),
end_at=now - timedelta(days=30),
),
DropCampaign(
twitch_id=f"inactive_future_{i}",
name=f"Inactive future {i}",
game=game,
operation_names=["DropCampaignDetails"],
start_at=now + timedelta(days=30),
end_at=now + timedelta(days=60),
),
))
campaigns.append(
DropCampaign(
twitch_id="active_for_dashboard_index_test",
name="Active campaign",
game=game,
operation_names=["DropCampaignDetails"],
start_at=now - timedelta(hours=1),
end_at=now + timedelta(hours=1),
),
)
DropCampaign.objects.bulk_create(campaigns)
reward_campaigns: list[RewardCampaign] = []
for i in range(250):
reward_campaigns.extend((
RewardCampaign(
twitch_id=f"reward_inactive_old_{i}",
name=f"Reward inactive old {i}",
game=game,
starts_at=now - timedelta(days=60),
ends_at=now - timedelta(days=30),
),
RewardCampaign(
twitch_id=f"reward_inactive_future_{i}",
name=f"Reward inactive future {i}",
game=game,
starts_at=now + timedelta(days=30),
ends_at=now + timedelta(days=60),
),
))
reward_campaigns.append(
RewardCampaign(
twitch_id="reward_active_for_dashboard_index_test",
name="Active reward campaign",
game=game,
starts_at=now - timedelta(hours=1),
ends_at=now + timedelta(hours=1),
),
)
RewardCampaign.objects.bulk_create(reward_campaigns)
active_campaigns_qs: QuerySet[DropCampaign] = DropCampaign.active_for_dashboard(
now,
)
active_reward_campaigns_qs: QuerySet[RewardCampaign] = (
RewardCampaign.active_for_dashboard(now)
)
campaigns_plan: str = active_campaigns_qs.explain()
reward_plan: str = active_reward_campaigns_qs.explain()
if connection.vendor == "sqlite":
campaigns_uses_index: bool = "USING INDEX" in campaigns_plan.upper()
rewards_uses_index: bool = "USING INDEX" in reward_plan.upper()
elif connection.vendor == "postgresql":
campaigns_uses_index = (
"INDEX SCAN" in campaigns_plan.upper()
or "BITMAP INDEX SCAN" in campaigns_plan.upper()
)
rewards_uses_index = (
"INDEX SCAN" in reward_plan.upper()
or "BITMAP INDEX SCAN" in reward_plan.upper()
)
else:
pytest.skip(
f"Unsupported DB vendor for index-plan assertion: {connection.vendor}",
)
assert campaigns_uses_index, campaigns_plan
assert rewards_uses_index, reward_plan
@pytest.mark.django_db
def test_dashboard_query_plans_reference_expected_index_names(self) -> None:
"""Dashboard active-window plans should mention concrete index names."""
now: datetime.datetime = timezone.now()
org: Organization = Organization.objects.create(
twitch_id="org_index_name_test",
name="Org Index Name Test",
)
game: Game = Game.objects.create(
twitch_id="game_index_name_test",
name="Game Index Name Test",
display_name="Game Index Name Test",
)
game.owners.add(org)
DropCampaign.objects.create(
twitch_id="active_for_dashboard_index_name_test",
name="Active campaign index-name test",
game=game,
operation_names=["DropCampaignDetails"],
start_at=now - timedelta(hours=1),
end_at=now + timedelta(hours=1),
)
RewardCampaign.objects.create(
twitch_id="reward_active_for_dashboard_index_name_test",
name="Active reward campaign index-name test",
game=game,
starts_at=now - timedelta(hours=1),
ends_at=now + timedelta(hours=1),
)
# Keep this assertion scoped to engines whose plans typically include index names.
if connection.vendor not in {"sqlite", "postgresql"}:
pytest.skip(
f"Unsupported DB vendor for index-name plan assertion: {connection.vendor}",
)
def _index_names(table_name: str) -> set[str]:
with connection.cursor() as cursor:
constraints = connection.introspection.get_constraints(
cursor,
table_name,
)
names: set[str] = set()
for name, meta in constraints.items():
if not meta.get("index"):
continue
names.add(name)
return names
expected_drop_indexes: set[str] = {
"tw_drop_start_desc_idx",
"tw_drop_start_end_idx",
"tw_drop_start_end_game_idx",
}
expected_reward_indexes: set[str] = {
"tw_reward_starts_desc_idx",
"tw_reward_starts_ends_idx",
}
drop_index_names: set[str] = _index_names(DropCampaign._meta.db_table)
reward_index_names: set[str] = _index_names(RewardCampaign._meta.db_table)
missing_drop_indexes: set[str] = expected_drop_indexes - drop_index_names
missing_reward_indexes: set[str] = expected_reward_indexes - reward_index_names
assert not missing_drop_indexes, (
"Missing expected DropCampaign dashboard indexes: "
f"{sorted(missing_drop_indexes)}"
)
assert not missing_reward_indexes, (
"Missing expected RewardCampaign dashboard indexes: "
f"{sorted(missing_reward_indexes)}"
)
campaigns_plan: str = DropCampaign.active_for_dashboard(now).explain().lower()
reward_plan: str = RewardCampaign.active_for_dashboard(now).explain().lower()
assert any(name.lower() in campaigns_plan for name in expected_drop_indexes), (
"DropCampaign active-for-dashboard plan did not reference an expected "
"named dashboard index. "
f"Expected one of {sorted(expected_drop_indexes)}. Plan={campaigns_plan}"
)
assert any(name.lower() in reward_plan for name in expected_reward_indexes), (
"RewardCampaign active-for-dashboard plan did not reference an expected "
"named dashboard index. "
f"Expected one of {sorted(expected_reward_indexes)}. Plan={reward_plan}"
)
@pytest.mark.django_db
def test_dashboard_query_count_stays_flat_with_more_data(
self,
client: Client,
) -> None:
"""Dashboard should avoid N+1 queries as campaign volume grows."""
now: datetime.datetime = timezone.now()
org: Organization = Organization.objects.create(
twitch_id="org_query_count",
name="Org Query Count",
)
game: Game = Game.objects.create(
twitch_id="game_query_count",
name="game_query_count",
display_name="Game Query Count",
)
game.owners.add(org)
def _capture_dashboard_select_count() -> int:
with CaptureQueriesContext(connection) as queries:
response: _MonkeyPatchedWSGIResponse = client.get(
reverse("twitch:dashboard"),
)
assert response.status_code == 200
select_queries: list[str] = [
query_info["sql"]
for query_info in queries.captured_queries
if query_info["sql"].lstrip().upper().startswith("SELECT")
]
return len(select_queries)
# Baseline: one active drop campaign and one active reward campaign.
base_campaign: DropCampaign = DropCampaign.objects.create(
twitch_id="baseline_campaign",
name="Baseline campaign",
game=game,
operation_names=["DropCampaignDetails"],
start_at=now - timedelta(hours=1),
end_at=now + timedelta(hours=1),
)
base_channel: Channel = Channel.objects.create(
twitch_id="baseline_channel",
name="baselinechannel",
display_name="BaselineChannel",
)
base_campaign.allow_channels.add(base_channel)
RewardCampaign.objects.create(
twitch_id="baseline_reward_campaign",
name="Baseline reward campaign",
game=game,
starts_at=now - timedelta(hours=1),
ends_at=now + timedelta(hours=1),
summary="Baseline summary",
external_url="https://example.com/reward/baseline",
)
baseline_select_count: int = _capture_dashboard_select_count()
# Scale up active dashboard data substantially.
extra_campaigns: list[DropCampaign] = [
DropCampaign(
twitch_id=f"scaled_campaign_{i}",
name=f"Scaled campaign {i}",
game=game,
operation_names=["DropCampaignDetails"],
start_at=now - timedelta(hours=2),
end_at=now + timedelta(hours=2),
)
for i in range(12)
]
DropCampaign.objects.bulk_create(extra_campaigns)
for i, campaign in enumerate(
DropCampaign.objects.filter(
twitch_id__startswith="scaled_campaign_",
).order_by("twitch_id"),
):
channel: Channel = Channel.objects.create(
twitch_id=f"scaled_channel_{i}",
name=f"scaledchannel{i}",
display_name=f"ScaledChannel{i}",
)
campaign.allow_channels.add(channel)
extra_rewards: list[RewardCampaign] = [
RewardCampaign(
twitch_id=f"scaled_reward_{i}",
name=f"Scaled reward {i}",
game=game,
starts_at=now - timedelta(hours=2),
ends_at=now + timedelta(hours=2),
summary=f"Scaled summary {i}",
external_url=f"https://example.com/reward/{i}",
)
for i in range(12)
]
RewardCampaign.objects.bulk_create(extra_rewards)
scaled_select_count: int = _capture_dashboard_select_count()
assert scaled_select_count <= baseline_select_count + 2, (
"Dashboard SELECT query count grew with data volume; possible N+1 regression. "
f"baseline={baseline_select_count}, scaled={scaled_select_count}"
)
@pytest.mark.django_db
def test_dashboard_grouping_reuses_selected_game_relation(self) -> None:
"""Dashboard grouping should not issue extra standalone Game queries."""
now: datetime.datetime = timezone.now()
org: Organization = Organization.objects.create(
twitch_id="org_grouping_no_extra_game_select",
name="Org Grouping No Extra Game Select",
)
game: Game = Game.objects.create(
twitch_id="game_grouping_no_extra_game_select",
name="game_grouping_no_extra_game_select",
display_name="Game Grouping No Extra Game Select",
)
game.owners.add(org)
campaigns: list[DropCampaign] = [
DropCampaign(
twitch_id=f"grouping_campaign_{i}",
name=f"Grouping campaign {i}",
game=game,
operation_names=["DropCampaignDetails"],
start_at=now - timedelta(hours=1),
end_at=now + timedelta(hours=1),
)
for i in range(5)
]
DropCampaign.objects.bulk_create(campaigns)
with CaptureQueriesContext(connection) as queries:
grouped: dict[str, dict[str, Any]] = (
DropCampaign.campaigns_by_game_for_dashboard(now)
)
assert game.twitch_id in grouped
assert len(grouped[game.twitch_id]["campaigns"]) == 5
game_select_queries: list[str] = [
query_info["sql"]
for query_info in queries.captured_queries
if query_info["sql"].lstrip().upper().startswith("SELECT")
and 'from "twitch_game"' in query_info["sql"].lower()
and " join " not in query_info["sql"].lower()
]
assert not game_select_queries, (
"Dashboard grouping should reuse DropCampaign.active_for_dashboard() "
"select_related game rows instead of standalone Game SELECTs. "
f"Queries: {game_select_queries}"
)
@pytest.mark.django_db
def test_dashboard_avoids_n_plus_one_game_queries_in_drop_loop(
self,
client: Client,
) -> None:
"""Dashboard should not issue per-campaign Game SELECTs while rendering drops."""
now: datetime.datetime = timezone.now()
org: Organization = Organization.objects.create(
twitch_id="org_no_n_plus_one_game",
name="Org No N+1 Game",
)
game: Game = Game.objects.create(
twitch_id="game_no_n_plus_one_game",
name="game_no_n_plus_one_game",
display_name="Game No N+1 Game",
)
game.owners.add(org)
campaigns: list[DropCampaign] = [
DropCampaign(
twitch_id=f"no_n_plus_one_campaign_{i}",
name=f"No N+1 campaign {i}",
game=game,
operation_names=["DropCampaignDetails"],
start_at=now - timedelta(hours=2),
end_at=now + timedelta(hours=2),
)
for i in range(10)
]
DropCampaign.objects.bulk_create(campaigns)
with CaptureQueriesContext(connection) as queries:
response: _MonkeyPatchedWSGIResponse = client.get(
reverse("twitch:dashboard"),
)
assert response.status_code == 200
context: ContextList | dict[str, Any] = response.context # type: ignore[assignment]
if isinstance(context, list):
context = context[-1]
grouped_campaigns: list[dict[str, Any]] = context["campaigns_by_game"][
game.twitch_id
]["campaigns"]
assert grouped_campaigns
assert all(
"game_display_name" in campaign_data for campaign_data in grouped_campaigns
)
assert all(
"game_twitch_directory_url" in campaign_data
for campaign_data in grouped_campaigns
)
game_select_queries: list[str] = [
query_info["sql"]
for query_info in queries.captured_queries
if query_info["sql"].lstrip().upper().startswith("SELECT")
and "twitch_game" in query_info["sql"].lower()
and "join" not in query_info["sql"].lower()
]
assert len(game_select_queries) <= 1, (
"Expected at most one standalone Game SELECT for dashboard drop grouping; "
f"got {len(game_select_queries)}. Queries: {game_select_queries}"
)
@pytest.mark.django_db
def test_dashboard_avoids_n_plus_one_game_queries_with_multiple_games(
self,
client: Client,
) -> None:
"""Dashboard should keep standalone Game SELECTs bounded with many campaigns and games."""
now: datetime.datetime = timezone.now()
game_ids: list[str] = []
for i in range(5):
org: Organization = Organization.objects.create(
twitch_id=f"org_multi_game_{i}",
name=f"Org Multi Game {i}",
)
game: Game = Game.objects.create(
twitch_id=f"game_multi_game_{i}",
name=f"game_multi_game_{i}",
display_name=f"Game Multi Game {i}",
)
game.owners.add(org)
game_ids.append(game.twitch_id)
campaigns: list[DropCampaign] = [
DropCampaign(
twitch_id=f"multi_game_campaign_{i}_{j}",
name=f"Multi game campaign {i}-{j}",
game=game,
operation_names=["DropCampaignDetails"],
start_at=now - timedelta(hours=2),
end_at=now + timedelta(hours=2),
)
for j in range(20)
]
DropCampaign.objects.bulk_create(campaigns)
with CaptureQueriesContext(connection) as queries:
response: _MonkeyPatchedWSGIResponse = client.get(
reverse("twitch:dashboard"),
)
assert response.status_code == 200
context: ContextList | dict[str, Any] = response.context # type: ignore[assignment]
if isinstance(context, list):
context = context[-1]
campaigns_by_game: dict[str, Any] = context["campaigns_by_game"]
for game_id in game_ids:
assert game_id in campaigns_by_game
grouped_campaigns: list[dict[str, Any]] = campaigns_by_game[game_id][
"campaigns"
]
assert len(grouped_campaigns) == 20
assert all(
"game_display_name" in campaign_data
for campaign_data in grouped_campaigns
)
assert all(
"game_twitch_directory_url" in campaign_data
for campaign_data in grouped_campaigns
)
game_select_queries: list[str] = [
query_info["sql"]
for query_info in queries.captured_queries
if query_info["sql"].lstrip().upper().startswith("SELECT")
and "twitch_game" in query_info["sql"].lower()
and "join" not in query_info["sql"].lower()
]
assert len(game_select_queries) <= 1, (
"Expected a bounded number of standalone Game SELECTs for dashboard grouping; "
f"got {len(game_select_queries)}. Queries: {game_select_queries}"
)
@pytest.mark.django_db
def test_dashboard_does_not_refresh_dropcampaign_rows_for_image_dimensions(
self,
client: Client,
) -> None:
"""Dashboard should not issue per-row DropCampaign refreshes for image dimensions."""
now: datetime.datetime = timezone.now()
org: Organization = Organization.objects.create(
twitch_id="org_image_dimensions",
name="Org Image Dimensions",
)
game: Game = Game.objects.create(
twitch_id="game_image_dimensions",
name="game_image_dimensions",
display_name="Game Image Dimensions",
)
game.owners.add(org)
# 1x1 transparent PNG
png_1x1: bytes = (
b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01"
b"\x00\x00\x00\x01\x08\x06\x00\x00\x00\x1f\x15\xc4\x89"
b"\x00\x00\x00\x0bIDATx\x9cc\x00\x01\x00\x00\x05\x00\x01"
b"\r\n-\xb4\x00\x00\x00\x00IEND\xaeB`\x82"
)
campaigns: list[DropCampaign] = []
for i in range(3):
campaign: DropCampaign = DropCampaign.objects.create(
twitch_id=f"image_dim_campaign_{i}",
name=f"Image dim campaign {i}",
game=game,
operation_names=["DropCampaignDetails"],
start_at=now - timedelta(hours=2),
end_at=now + timedelta(hours=2),
)
assert campaign.image_file is not None
campaign.image_file.save(
f"image_dim_campaign_{i}.png",
ContentFile(png_1x1),
save=True,
)
campaigns.append(campaign)
with CaptureQueriesContext(connection) as queries:
response: _MonkeyPatchedWSGIResponse = client.get(
reverse("twitch:dashboard"),
)
assert response.status_code == 200
context: ContextList | dict[str, Any] = response.context # type: ignore[assignment]
if isinstance(context, list):
context = context[-1]
grouped_campaigns: list[dict[str, Any]] = context["campaigns_by_game"][
game.twitch_id
]["campaigns"]
assert len(grouped_campaigns) == len(campaigns)
per_row_refresh_queries: list[str] = [
query_info["sql"]
for query_info in queries.captured_queries
if query_info["sql"].lstrip().upper().startswith("SELECT")
and 'from "twitch_dropcampaign"' in query_info["sql"].lower()
and 'where "twitch_dropcampaign"."id" =' in query_info["sql"].lower()
]
assert not per_row_refresh_queries, (
"Dashboard unexpectedly refreshed DropCampaign rows one-by-one while "
"resolving image dimensions. Queries: "
f"{per_row_refresh_queries}"
)
@pytest.mark.django_db
def test_dashboard_does_not_refresh_game_rows_for_box_art_dimensions(
self,
client: Client,
) -> None:
"""Dashboard should not issue per-row Game refreshes for box art dimensions."""
now: datetime.datetime = timezone.now()
org: Organization = Organization.objects.create(
twitch_id="org_box_art_dimensions",
name="Org Box Art Dimensions",
)
game: Game = Game.objects.create(
twitch_id="game_box_art_dimensions",
name="game_box_art_dimensions",
display_name="Game Box Art Dimensions",
)
game.owners.add(org)
# 1x1 transparent PNG
png_1x1: bytes = (
b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01"
b"\x00\x00\x00\x01\x08\x06\x00\x00\x00\x1f\x15\xc4\x89"
b"\x00\x00\x00\x0bIDATx\x9cc\x00\x01\x00\x00\x05\x00\x01"
b"\r\n-\xb4\x00\x00\x00\x00IEND\xaeB`\x82"
)
assert game.box_art_file is not None
game.box_art_file.save(
"game_box_art_dimensions.png",
ContentFile(png_1x1),
save=True,
)
DropCampaign.objects.create(
twitch_id="game_box_art_campaign",
name="Game box art campaign",
game=game,
operation_names=["DropCampaignDetails"],
start_at=now - timedelta(hours=2),
end_at=now + timedelta(hours=2),
)
with CaptureQueriesContext(connection) as queries:
response: _MonkeyPatchedWSGIResponse = client.get(
reverse("twitch:dashboard"),
)
assert response.status_code == 200
context: ContextList | dict[str, Any] = response.context # type: ignore[assignment]
if isinstance(context, list):
context = context[-1]
campaigns_by_game: dict[str, Any] = context["campaigns_by_game"]
assert game.twitch_id in campaigns_by_game
per_row_refresh_queries: list[str] = [
query_info["sql"]
for query_info in queries.captured_queries
if query_info["sql"].lstrip().upper().startswith("SELECT")
and 'from "twitch_game"' in query_info["sql"].lower()
and 'where "twitch_game"."id" =' in query_info["sql"].lower()
]
assert not per_row_refresh_queries, (
"Dashboard unexpectedly refreshed Game rows one-by-one while resolving "
"box art dimensions. Queries: "
f"{per_row_refresh_queries}"
)
@pytest.mark.django_db @pytest.mark.django_db
def test_debug_view(self, client: Client) -> None: def test_debug_view(self, client: Client) -> None:
"""Test debug view returns 200 and has games_without_owner in context.""" """Test debug view returns 200 and has games_without_owner in context."""
@ -1505,7 +857,7 @@ class TestChannelListView:
assert "page=2" in content assert "page=2" in content
@pytest.mark.django_db @pytest.mark.django_db
def test_drop_campaign_detail_view(self, client: Client, db: None) -> None: def test_drop_campaign_detail_view(self, client: Client, db: object) -> None:
"""Test campaign detail view returns 200 and has campaign in context.""" """Test campaign detail view returns 200 and has campaign in context."""
game: Game = Game.objects.create( game: Game = Game.objects.create(
twitch_id="g1", twitch_id="g1",
@ -1590,7 +942,7 @@ class TestChannelListView:
assert "games" in response.context assert "games" in response.context
@pytest.mark.django_db @pytest.mark.django_db
def test_game_detail_view(self, client: Client, db: None) -> None: def test_game_detail_view(self, client: Client, db: object) -> None:
"""Test game detail view returns 200 and has game in context.""" """Test game detail view returns 200 and has game in context."""
game: Game = Game.objects.create( game: Game = Game.objects.create(
twitch_id="g2", twitch_id="g2",
@ -1603,7 +955,7 @@ class TestChannelListView:
assert "game" in response.context assert "game" in response.context
@pytest.mark.django_db @pytest.mark.django_db
def test_game_detail_image_aspect_ratio(self, client: Client, db: None) -> None: def test_game_detail_image_aspect_ratio(self, client: Client, db: object) -> None:
"""Box art should render with a width attribute only, preserving aspect ratio.""" """Box art should render with a width attribute only, preserving aspect ratio."""
game: Game = Game.objects.create( game: Game = Game.objects.create(
twitch_id="g3", twitch_id="g3",
@ -1658,7 +1010,7 @@ class TestChannelListView:
assert "orgs" in response.context assert "orgs" in response.context
@pytest.mark.django_db @pytest.mark.django_db
def test_organization_detail_view(self, client: Client, db: None) -> None: def test_organization_detail_view(self, client: Client, db: object) -> None:
"""Test organization detail view returns 200 and has organization in context.""" """Test organization detail view returns 200 and has organization in context."""
org: Organization = Organization.objects.create(twitch_id="o1", name="Org1") org: Organization = Organization.objects.create(twitch_id="o1", name="Org1")
url: str = reverse("twitch:organization_detail", args=[org.twitch_id]) url: str = reverse("twitch:organization_detail", args=[org.twitch_id])
@ -1667,7 +1019,7 @@ class TestChannelListView:
assert "organization" in response.context assert "organization" in response.context
@pytest.mark.django_db @pytest.mark.django_db
def test_channel_detail_view(self, client: Client, db: None) -> None: def test_channel_detail_view(self, client: Client, db: object) -> None:
"""Test channel detail view returns 200 and has channel in context.""" """Test channel detail view returns 200 and has channel in context."""
channel: Channel = Channel.objects.create( channel: Channel = Channel.objects.create(
twitch_id="ch1", twitch_id="ch1",
@ -2903,572 +2255,3 @@ class TestImageObjectStructuredData:
schema: dict[str, Any] = json.loads(response.context["schema_data"]) schema: dict[str, Any] = json.loads(response.context["schema_data"])
assert schema["image"]["creditText"] == "Real Campaign Publisher" assert schema["image"]["creditText"] == "Real Campaign Publisher"
assert schema["organizer"]["name"] == "Real Campaign Publisher" assert schema["organizer"]["name"] == "Real Campaign Publisher"
@pytest.mark.django_db
class TestBadgeListView:
"""Tests for the badge_list_view function."""
def test_badge_list_returns_200(self, client: Client) -> None:
"""Badge list view renders successfully with no badge sets."""
response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:badge_list"))
assert response.status_code == 200
def test_badge_list_context_has_badge_data(self, client: Client) -> None:
"""Badge list view passes badge_data list (not badge_sets queryset) to template."""
badge_set: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="test_vip")
ChatBadge.objects.create(
badge_set=badge_set,
badge_id="1",
image_url_1x="https://example.com/1x.png",
image_url_2x="https://example.com/2x.png",
image_url_4x="https://example.com/4x.png",
title="VIP",
description="VIP badge",
)
response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:badge_list"))
context: ContextList | dict[str, Any] = response.context # type: ignore[assignment]
if isinstance(context, list):
context = context[-1]
assert "badge_data" in context
assert len(context["badge_data"]) == 1
assert context["badge_data"][0]["set"].set_id == "test_vip"
assert len(context["badge_data"][0]["badges"]) == 1
assert "badge_sets" not in context
def test_badge_list_query_count_stays_flat(self, client: Client) -> None:
"""badge_list_view should not issue N+1 queries as badge set count grows."""
for i in range(3):
bs: ChatBadgeSet = ChatBadgeSet.objects.create(set_id=f"set_flat_{i}")
for j in range(4):
ChatBadge.objects.create(
badge_set=bs,
badge_id=str(j),
image_url_1x="https://example.com/1x.png",
image_url_2x="https://example.com/2x.png",
image_url_4x="https://example.com/4x.png",
title=f"Badge {i}-{j}",
description="desc",
)
def _count_selects() -> int:
with CaptureQueriesContext(connection) as ctx:
resp: _MonkeyPatchedWSGIResponse = client.get(
reverse("twitch:badge_list"),
)
assert resp.status_code == 200
return sum(
1
for q in ctx.captured_queries
if q["sql"].lstrip().upper().startswith("SELECT")
)
baseline: int = _count_selects()
# Add 10 more badge sets with badges
for i in range(3, 13):
bs = ChatBadgeSet.objects.create(set_id=f"set_flat_{i}")
for j in range(4):
ChatBadge.objects.create(
badge_set=bs,
badge_id=str(j),
image_url_1x="https://example.com/1x.png",
image_url_2x="https://example.com/2x.png",
image_url_4x="https://example.com/4x.png",
title=f"Badge {i}-{j}",
description="desc",
)
scaled: int = _count_selects()
assert scaled <= baseline + 1, (
f"badge_list_view SELECT count grew with data; possible N+1. "
f"baseline={baseline}, scaled={scaled}"
)
@pytest.mark.django_db
class TestBadgeSetDetailView:
"""Tests for the badge_set_detail_view function."""
@pytest.fixture
def badge_set_with_badges(self) -> dict[str, Any]:
"""Create a badge set with numeric badge IDs and a campaign awarding one badge.
Returns:
Dict with badge_set, badge1-3, campaign, and benefit instances.
"""
org: Organization = Organization.objects.create(
twitch_id="org_badge_test",
name="Badge Test Org",
)
game: Game = Game.objects.create(
twitch_id="game_badge_test",
name="badge_test_game",
display_name="Badge Test Game",
)
game.owners.add(org)
badge_set: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="drops")
badge1: ChatBadge = ChatBadge.objects.create(
badge_set=badge_set,
badge_id="1",
image_url_1x="https://example.com/1x.png",
image_url_2x="https://example.com/2x.png",
image_url_4x="https://example.com/4x.png",
title="Drop 1",
description="First drop badge",
)
badge2: ChatBadge = ChatBadge.objects.create(
badge_set=badge_set,
badge_id="10",
image_url_1x="https://example.com/1x.png",
image_url_2x="https://example.com/2x.png",
image_url_4x="https://example.com/4x.png",
title="Drop 10",
description="Tenth drop badge",
)
badge3: ChatBadge = ChatBadge.objects.create(
badge_set=badge_set,
badge_id="2",
image_url_1x="https://example.com/1x.png",
image_url_2x="https://example.com/2x.png",
image_url_4x="https://example.com/4x.png",
title="Drop 2",
description="Second drop badge",
)
campaign: DropCampaign = DropCampaign.objects.create(
twitch_id="badge_test_campaign",
name="Badge Test Campaign",
game=game,
operation_names=["DropCampaignDetails"],
)
drop: TimeBasedDrop = TimeBasedDrop.objects.create(
twitch_id="badge_test_drop",
name="Badge Test Drop",
campaign=campaign,
)
benefit: DropBenefit = DropBenefit.objects.create(
twitch_id="badge_test_benefit",
name="Drop 1",
distribution_type="BADGE",
)
drop.benefits.add(benefit)
return {
"badge_set": badge_set,
"badge1": badge1,
"badge2": badge2,
"badge3": badge3,
"campaign": campaign,
"benefit": benefit,
}
def test_badge_set_detail_returns_200(
self,
client: Client,
badge_set_with_badges: dict[str, Any],
) -> None:
"""Badge set detail view renders successfully."""
set_id: str = badge_set_with_badges["badge_set"].set_id
url: str = reverse("twitch:badge_set_detail", args=[set_id])
response: _MonkeyPatchedWSGIResponse = client.get(url)
assert response.status_code == 200
def test_badge_set_detail_404_for_missing_set(self, client: Client) -> None:
"""Badge set detail view returns 404 for unknown set_id."""
url: str = reverse("twitch:badge_set_detail", args=["nonexistent"])
response: _MonkeyPatchedWSGIResponse = client.get(url)
assert response.status_code == 404
def test_badges_sorted_numerically(
self,
client: Client,
badge_set_with_badges: dict[str, Any],
) -> None:
"""Numeric badge_ids should be sorted as integers (1, 2, 10) not strings (1, 10, 2)."""
set_id: str = badge_set_with_badges["badge_set"].set_id
url: str = reverse("twitch:badge_set_detail", args=[set_id])
response: _MonkeyPatchedWSGIResponse = client.get(url)
context: ContextList | dict[str, Any] = response.context # type: ignore[assignment]
if isinstance(context, list):
context = context[-1]
badge_ids: list[str] = [b.badge_id for b in context["badges"]]
assert badge_ids == ["1", "2", "10"], (
f"Expected numeric sort order [1, 2, 10], got {badge_ids}"
)
def test_award_campaigns_attached_to_badges(
self,
client: Client,
badge_set_with_badges: dict[str, Any],
) -> None:
"""Badges with matching BADGE benefits should have award_campaigns populated."""
set_id: str = badge_set_with_badges["badge_set"].set_id
url: str = reverse("twitch:badge_set_detail", args=[set_id])
response: _MonkeyPatchedWSGIResponse = client.get(url)
context: ContextList | dict[str, Any] = response.context # type: ignore[assignment]
if isinstance(context, list):
context = context[-1]
badges: list[ChatBadge] = list(context["badges"])
badge_titled_drop1: ChatBadge = next(b for b in badges if b.title == "Drop 1")
badge_titled_drop2: ChatBadge = next(b for b in badges if b.title == "Drop 2")
assert len(badge_titled_drop1.award_campaigns) == 1 # pyright: ignore[reportAttributeAccessIssue]
assert badge_titled_drop1.award_campaigns[0].twitch_id == "badge_test_campaign" # pyright: ignore[reportAttributeAccessIssue]
assert len(badge_titled_drop2.award_campaigns) == 0 # pyright: ignore[reportAttributeAccessIssue]
def test_badge_set_detail_avoids_n_plus_one(
self,
client: Client,
) -> None:
"""badge_set_detail_view should not issue per-badge queries for award campaigns."""
org: Organization = Organization.objects.create(
twitch_id="org_n1_badge",
name="N+1 Badge Org",
)
game: Game = Game.objects.create(
twitch_id="game_n1_badge",
name="game_n1_badge",
display_name="N+1 Badge Game",
)
game.owners.add(org)
badge_set: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="n1_test")
def _make_badge_and_campaign(idx: int) -> None:
badge: ChatBadge = ChatBadge.objects.create(
badge_set=badge_set,
badge_id=str(idx),
image_url_1x="https://example.com/1x.png",
image_url_2x="https://example.com/2x.png",
image_url_4x="https://example.com/4x.png",
title=f"N1 Badge {idx}",
description="desc",
)
campaign: DropCampaign = DropCampaign.objects.create(
twitch_id=f"n1_campaign_{idx}",
name=f"N+1 Campaign {idx}",
game=game,
operation_names=["DropCampaignDetails"],
)
drop: TimeBasedDrop = TimeBasedDrop.objects.create(
twitch_id=f"n1_drop_{idx}",
name=f"N+1 Drop {idx}",
campaign=campaign,
)
benefit: DropBenefit = DropBenefit.objects.create(
twitch_id=f"n1_benefit_{idx}",
name=badge.title,
distribution_type="BADGE",
)
drop.benefits.add(benefit)
for i in range(3):
_make_badge_and_campaign(i)
url: str = reverse("twitch:badge_set_detail", args=[badge_set.set_id])
def _count_selects() -> int:
with CaptureQueriesContext(connection) as ctx:
resp: _MonkeyPatchedWSGIResponse = client.get(url)
assert resp.status_code == 200
return sum(
1
for q in ctx.captured_queries
if q["sql"].lstrip().upper().startswith("SELECT")
)
baseline: int = _count_selects()
# Add 10 more badges, each with their own campaigns
for i in range(3, 13):
_make_badge_and_campaign(i)
scaled: int = _count_selects()
assert scaled <= baseline + 1, (
f"badge_set_detail_view SELECT count grew with badge count; possible N+1. "
f"baseline={baseline}, scaled={scaled}"
)
def test_drop_benefit_index_used_for_badge_award_lookup(self) -> None:
"""DropBenefit queries filtering by distribution_type+name should use indexes."""
org: Organization = Organization.objects.create(
twitch_id="org_benefit_idx",
name="Benefit Index Org",
)
game: Game = Game.objects.create(
twitch_id="game_benefit_idx",
name="game_benefit_idx",
display_name="Benefit Index Game",
)
game.owners.add(org)
# Create enough non-BADGE benefits so the planner has reason to use an index
for i in range(300):
DropBenefit.objects.create(
twitch_id=f"non_badge_{i}",
name=f"Emote {i}",
distribution_type="EMOTE",
)
badge_titles: list[str] = []
for i in range(5):
DropBenefit.objects.create(
twitch_id=f"badge_benefit_idx_{i}",
name=f"Badge Title {i}",
distribution_type="BADGE",
)
badge_titles.append(f"Badge Title {i}")
qs = DropBenefit.objects.filter(
distribution_type="BADGE",
name__in=badge_titles,
)
plan: str = qs.explain()
if connection.vendor == "sqlite":
uses_index: bool = "USING INDEX" in plan.upper()
elif connection.vendor == "postgresql":
uses_index = (
"INDEX SCAN" in plan.upper()
or "BITMAP INDEX SCAN" in plan.upper()
or "INDEX ONLY SCAN" in plan.upper()
)
else:
pytest.skip(
f"Unsupported DB vendor for index-plan assertion: {connection.vendor}",
)
assert uses_index, (
f"DropBenefit query on (distribution_type, name) did not use an index.\n{plan}"
)
@pytest.mark.django_db
class TestDropCampaignListView:
"""Tests for drop_campaign_list_view index usage and fat-model delegation."""
@pytest.fixture
def game_with_campaigns(self) -> dict[str, Any]:
"""Create a game with a mix of imported/not-imported campaigns.
Returns:
Dict with 'org' and 'game' keys for the created Organization and Game.
"""
org: Organization = Organization.objects.create(
twitch_id="org_list_test",
name="List Test Org",
)
game: Game = Game.objects.create(
twitch_id="game_list_test",
name="game_list_test",
display_name="List Test Game",
)
game.owners.add(org)
return {"org": org, "game": game}
def test_campaign_list_returns_200(self, client: Client) -> None:
"""Campaign list view loads successfully."""
response: _MonkeyPatchedWSGIResponse = client.get(
reverse("twitch:campaign_list"),
)
assert response.status_code == 200
def test_only_fully_imported_campaigns_shown(
self,
client: Client,
game_with_campaigns: dict[str, Any],
) -> None:
"""Only campaigns with is_fully_imported=True appear in the list."""
game: Game = game_with_campaigns["game"]
imported: DropCampaign = DropCampaign.objects.create(
twitch_id="cl_imported",
name="Imported Campaign",
game=game,
operation_names=["DropCampaignDetails"],
is_fully_imported=True,
)
DropCampaign.objects.create(
twitch_id="cl_not_imported",
name="Not Imported Campaign",
game=game,
operation_names=["DropCampaignDetails"],
is_fully_imported=False,
)
response: _MonkeyPatchedWSGIResponse = client.get(
reverse("twitch:campaign_list"),
)
context: ContextList | dict[str, Any] = response.context # type: ignore[assignment]
if isinstance(context, list):
context = context[-1]
campaign_ids = {c.twitch_id for c in context["campaigns"].object_list}
assert imported.twitch_id in campaign_ids
assert "cl_not_imported" not in campaign_ids
def test_status_filter_active(
self,
client: Client,
game_with_campaigns: dict[str, Any],
) -> None:
"""Status=active returns only currently-running campaigns."""
game: Game = game_with_campaigns["game"]
now = timezone.now()
active: DropCampaign = DropCampaign.objects.create(
twitch_id="cl_active",
name="Active",
game=game,
operation_names=[],
is_fully_imported=True,
start_at=now - timedelta(hours=1),
end_at=now + timedelta(hours=1),
)
DropCampaign.objects.create(
twitch_id="cl_expired",
name="Expired",
game=game,
operation_names=[],
is_fully_imported=True,
start_at=now - timedelta(days=10),
end_at=now - timedelta(days=1),
)
response: _MonkeyPatchedWSGIResponse = client.get(
reverse("twitch:campaign_list") + "?status=active",
)
context: ContextList | dict[str, Any] = response.context # type: ignore[assignment]
if isinstance(context, list):
context = context[-1]
campaign_ids = {c.twitch_id for c in context["campaigns"].object_list}
assert active.twitch_id in campaign_ids
assert "cl_expired" not in campaign_ids
def test_campaign_list_indexes_exist(self) -> None:
"""Required composite indexes for the campaign list query must exist on DropCampaign."""
expected: set[str] = {
"tw_drop_imported_start_idx",
"tw_drop_imported_start_end_idx",
}
with connection.cursor() as cursor:
constraints = connection.introspection.get_constraints(
cursor,
DropCampaign._meta.db_table,
)
actual: set[str] = {
name for name, meta in constraints.items() if meta.get("index")
}
missing = expected - actual
assert not missing, (
f"Missing expected DropCampaign campaign-list indexes: {sorted(missing)}"
)
@pytest.mark.django_db
def test_campaign_list_query_uses_index(self) -> None:
"""for_campaign_list() should use an index when filtering is_fully_imported."""
now: datetime.datetime = timezone.now()
game: Game = Game.objects.create(
twitch_id="game_cl_idx",
name="game_cl_idx",
display_name="CL Idx Game",
)
# Bulk-create enough rows to give the query planner a reason to use indexes.
rows: list[DropCampaign] = [
DropCampaign(
twitch_id=f"cl_idx_not_imported_{i}",
name=f"Not imported {i}",
game=game,
operation_names=[],
is_fully_imported=False,
start_at=now - timedelta(days=i + 1),
end_at=now + timedelta(days=1),
)
for i in range(300)
]
rows.append(
DropCampaign(
twitch_id="cl_idx_imported",
name="Imported",
game=game,
operation_names=[],
is_fully_imported=True,
start_at=now - timedelta(hours=1),
end_at=now + timedelta(hours=1),
),
)
DropCampaign.objects.bulk_create(rows)
plan: str = DropCampaign.for_campaign_list(now).explain()
if connection.vendor == "sqlite":
uses_index: bool = "USING INDEX" in plan.upper()
elif connection.vendor == "postgresql":
uses_index = (
"INDEX SCAN" in plan.upper()
or "BITMAP INDEX SCAN" in plan.upper()
or "INDEX ONLY SCAN" in plan.upper()
)
else:
pytest.skip(
f"Unsupported DB vendor for index assertion: {connection.vendor}",
)
assert uses_index, f"for_campaign_list() did not use an index.\n{plan}"
def test_campaign_list_query_count_stays_flat(self, client: Client) -> None:
"""Campaign list should not issue N+1 queries as campaign volume grows."""
game: Game = Game.objects.create(
twitch_id="game_cl_flat",
name="game_cl_flat",
display_name="CL Flat Game",
)
now = timezone.now()
def _select_count() -> int:
with CaptureQueriesContext(connection) as ctx:
resp: _MonkeyPatchedWSGIResponse = client.get(
reverse("twitch:campaign_list"),
)
assert resp.status_code == 200
return sum(
1
for q in ctx.captured_queries
if q["sql"].lstrip().upper().startswith("SELECT")
)
DropCampaign.objects.create(
twitch_id="cl_flat_base",
name="Base campaign",
game=game,
operation_names=[],
is_fully_imported=True,
start_at=now - timedelta(hours=1),
end_at=now + timedelta(hours=1),
)
baseline: int = _select_count()
extra = [
DropCampaign(
twitch_id=f"cl_flat_extra_{i}",
name=f"Extra {i}",
game=game,
operation_names=[],
is_fully_imported=True,
start_at=now - timedelta(hours=2),
end_at=now + timedelta(hours=2),
)
for i in range(15)
]
DropCampaign.objects.bulk_create(extra)
scaled: int = _select_count()
assert scaled <= baseline + 2, (
f"Campaign list SELECT count grew; possible N+1. "
f"baseline={baseline}, scaled={scaled}"
)

View file

@ -14,9 +14,11 @@ from django.core.paginator import EmptyPage
from django.core.paginator import Page from django.core.paginator import Page
from django.core.paginator import PageNotAnInteger from django.core.paginator import PageNotAnInteger
from django.core.paginator import Paginator from django.core.paginator import Paginator
from django.db.models import Case
from django.db.models import Count from django.db.models import Count
from django.db.models import Prefetch from django.db.models import Prefetch
from django.db.models import Q from django.db.models import Q
from django.db.models import When
from django.db.models.query import QuerySet from django.db.models.query import QuerySet
from django.http import Http404 from django.http import Http404
from django.http import HttpResponse from django.http import HttpResponse
@ -417,7 +419,7 @@ def organization_detail_view(request: HttpRequest, twitch_id: str) -> HttpRespon
# MARK: /campaigns/ # MARK: /campaigns/
def drop_campaign_list_view(request: HttpRequest) -> HttpResponse: # noqa: PLR0914 def drop_campaign_list_view(request: HttpRequest) -> HttpResponse: # noqa: PLR0914, PLR0915
"""Function-based view for drop campaigns list. """Function-based view for drop campaigns list.
Args: Args:
@ -429,14 +431,24 @@ def drop_campaign_list_view(request: HttpRequest) -> HttpResponse: # noqa: PLR0
game_filter: str | None = request.GET.get("game") game_filter: str | None = request.GET.get("game")
status_filter: str | None = request.GET.get("status") status_filter: str | None = request.GET.get("status")
per_page: int = 100 per_page: int = 100
now: datetime.datetime = timezone.now() queryset: QuerySet[DropCampaign] = DropCampaign.objects.filter(
is_fully_imported=True,
queryset: QuerySet[DropCampaign] = DropCampaign.for_campaign_list(
now,
game_twitch_id=game_filter,
status=status_filter,
) )
if game_filter:
queryset = queryset.filter(game__twitch_id=game_filter)
queryset = queryset.prefetch_related("game__owners").order_by("-start_at")
# Optionally filter by status (active, upcoming, expired)
now: datetime.datetime = timezone.now()
if status_filter == "active":
queryset = queryset.filter(start_at__lte=now, end_at__gte=now)
elif status_filter == "upcoming":
queryset = queryset.filter(start_at__gt=now)
elif status_filter == "expired":
queryset = queryset.filter(end_at__lt=now)
paginator: Paginator[DropCampaign] = Paginator(queryset, per_page) paginator: Paginator[DropCampaign] = Paginator(queryset, per_page)
page: str | Literal[1] = request.GET.get("page") or 1 page: str | Literal[1] = request.GET.get("page") or 1
try: try:
@ -446,34 +458,30 @@ def drop_campaign_list_view(request: HttpRequest) -> HttpResponse: # noqa: PLR0
except EmptyPage: except EmptyPage:
campaigns = paginator.page(paginator.num_pages) campaigns = paginator.page(paginator.num_pages)
status_descriptions: dict[str, str] = {
"active": "Browse active Twitch drops.",
"upcoming": "View upcoming Twitch drops starting soon.",
"expired": "Browse expired Twitch drops.",
}
title = "Twitch Drops" title = "Twitch Drops"
description = "Browse Twitch drops"
if status_filter: if status_filter:
title += f" ({status_filter.capitalize()})" title += f" ({status_filter.capitalize()})"
description = status_descriptions.get(status_filter, description)
if game_filter: if game_filter:
try: try:
game_name: str = ( game: Game = Game.objects.get(twitch_id=game_filter)
Game.objects title += f" - {game.display_name}"
.only("display_name")
.values_list("display_name", flat=True)
.get(twitch_id=game_filter)
)
title += f" - {game_name}"
except Game.DoesNotExist: except Game.DoesNotExist:
pass pass
description = "Browse Twitch drops"
if status_filter == "active":
description = "Browse active Twitch drops."
elif status_filter == "upcoming":
description = "View upcoming Twitch drops starting soon."
elif status_filter == "expired":
description = "Browse expired Twitch drops."
# Build base URL for pagination # Build base URL for pagination
base_url = "/campaigns/" base_url = "/campaigns/"
if status_filter and game_filter: if status_filter:
base_url += f"?status={status_filter}&game={game_filter}"
elif status_filter:
base_url += f"?status={status_filter}" base_url += f"?status={status_filter}"
if game_filter:
base_url += f"&game={game_filter}"
elif game_filter: elif game_filter:
base_url += f"?game={game_filter}" base_url += f"?game={game_filter}"
@ -483,6 +491,7 @@ def drop_campaign_list_view(request: HttpRequest) -> HttpResponse: # noqa: PLR0
base_url, base_url,
) )
# CollectionPage schema for campaign list
collection_schema: dict[str, str] = { collection_schema: dict[str, str] = {
"@context": "https://schema.org", "@context": "https://schema.org",
"@type": "CollectionPage", "@type": "CollectionPage",
@ -579,18 +588,18 @@ def drop_campaign_detail_view(request: HttpRequest, twitch_id: str) -> HttpRespo
queryset=Channel.objects.order_by("display_name"), queryset=Channel.objects.order_by("display_name"),
to_attr="channels_ordered", to_attr="channels_ordered",
), ),
Prefetch(
"time_based_drops",
queryset=TimeBasedDrop.objects.prefetch_related("benefits").order_by(
"required_minutes_watched",
),
),
).get(twitch_id=twitch_id) ).get(twitch_id=twitch_id)
except DropCampaign.DoesNotExist as exc: except DropCampaign.DoesNotExist as exc:
msg = "No campaign found matching the query" msg = "No campaign found matching the query"
raise Http404(msg) from exc raise Http404(msg) from exc
drops: QuerySet[TimeBasedDrop] = campaign.time_based_drops.all() # pyright: ignore[reportAttributeAccessIssue] drops: QuerySet[TimeBasedDrop] = (
TimeBasedDrop.objects
.filter(campaign=campaign)
.select_related("campaign")
.prefetch_related("benefits")
.order_by("required_minutes_watched")
)
now: datetime.datetime = timezone.now() now: datetime.datetime = timezone.now()
enhanced_drops: list[dict[str, Any]] = _enhance_drops_with_context(drops, now) enhanced_drops: list[dict[str, Any]] = _enhance_drops_with_context(drops, now)
@ -860,7 +869,7 @@ class GameDetailView(DetailView):
return game return game
def get_context_data(self, **kwargs) -> dict[str, Any]: # noqa: PLR0914 def get_context_data(self, **kwargs: object) -> dict[str, Any]: # noqa: PLR0914
"""Add additional context data. """Add additional context data.
Args: Args:
@ -1056,13 +1065,48 @@ def dashboard(request: HttpRequest) -> HttpResponse:
HttpResponse: The rendered dashboard template. HttpResponse: The rendered dashboard template.
""" """
now: datetime.datetime = timezone.now() now: datetime.datetime = timezone.now()
campaigns_by_game: OrderedDict[str, dict[str, Any]] = ( active_campaigns: QuerySet[DropCampaign] = (
DropCampaign.campaigns_by_game_for_dashboard(now) DropCampaign.objects
.filter(start_at__lte=now, end_at__gte=now)
.select_related("game")
.prefetch_related("game__owners")
.prefetch_related(
Prefetch(
"allow_channels",
queryset=Channel.objects.order_by("display_name"),
to_attr="channels_ordered",
),
) )
.order_by("-start_at")
)
# Preserve insertion order (newest campaigns first).
# Group by game so games with multiple owners don't render duplicate campaign cards.
campaigns_by_game: OrderedDict[str, dict[str, Any]] = OrderedDict()
for campaign in active_campaigns:
game: Game = campaign.game
game_id: str = game.twitch_id
if game_id not in campaigns_by_game:
campaigns_by_game[game_id] = {
"name": game.display_name,
"box_art": game.box_art_best_url,
"owners": list(game.owners.all()),
"campaigns": [],
}
campaigns_by_game[game_id]["campaigns"].append({
"campaign": campaign,
"allowed_channels": getattr(campaign, "channels_ordered", []),
})
# Get active reward campaigns (Quest rewards) # Get active reward campaigns (Quest rewards)
active_reward_campaigns: QuerySet[RewardCampaign] = ( active_reward_campaigns: QuerySet[RewardCampaign] = (
RewardCampaign.active_for_dashboard(now) RewardCampaign.objects
.filter(starts_at__lte=now, ends_at__gte=now)
.select_related("game")
.order_by("-starts_at")
) )
# WebSite schema with SearchAction for sitelinks search box # WebSite schema with SearchAction for sitelinks search box
@ -1096,6 +1140,7 @@ def dashboard(request: HttpRequest) -> HttpResponse:
request, request,
"twitch/dashboard.html", "twitch/dashboard.html",
{ {
"active_campaigns": active_campaigns,
"campaigns_by_game": campaigns_by_game, "campaigns_by_game": campaigns_by_game,
"active_reward_campaigns": active_reward_campaigns, "active_reward_campaigns": active_reward_campaigns,
"now": now, "now": now,
@ -1424,7 +1469,7 @@ class ChannelDetailView(DetailView):
return channel return channel
def get_context_data(self, **kwargs) -> dict[str, Any]: # noqa: PLR0914 def get_context_data(self, **kwargs: object) -> dict[str, Any]: # noqa: PLR0914
"""Add additional context data. """Add additional context data.
Args: Args:
@ -1570,12 +1615,22 @@ def badge_list_view(request: HttpRequest) -> HttpResponse:
Returns: Returns:
HttpResponse: The rendered badge list page. HttpResponse: The rendered badge list page.
""" """
badge_sets: QuerySet[ChatBadgeSet] = (
ChatBadgeSet.objects
.all()
.prefetch_related(
Prefetch("badges", queryset=ChatBadge.objects.order_by("badge_id")),
)
.order_by("set_id")
)
# Group badges by set for easier display
badge_data: list[dict[str, Any]] = [ badge_data: list[dict[str, Any]] = [
{ {
"set": badge_set, "set": badge_set,
"badges": list(badge_set.badges.all()), # pyright: ignore[reportAttributeAccessIssue] "badges": list(badge_set.badges.all()), # pyright: ignore[reportAttributeAccessIssue]
} }
for badge_set in ChatBadgeSet.for_list_view() for badge_set in badge_sets
] ]
# CollectionPage schema for badges list # CollectionPage schema for badges list
@ -1593,6 +1648,7 @@ def badge_list_view(request: HttpRequest) -> HttpResponse:
seo_meta={"schema_data": collection_schema}, seo_meta={"schema_data": collection_schema},
) )
context: dict[str, Any] = { context: dict[str, Any] = {
"badge_sets": badge_sets,
"badge_data": badge_data, "badge_data": badge_data,
**seo_context, **seo_context,
} }
@ -1615,30 +1671,52 @@ def badge_set_detail_view(request: HttpRequest, set_id: str) -> HttpResponse:
Http404: If the badge set is not found. Http404: If the badge set is not found.
""" """
try: try:
badge_set: ChatBadgeSet = ChatBadgeSet.for_detail_view(set_id) badge_set: ChatBadgeSet = ChatBadgeSet.objects.prefetch_related(
Prefetch("badges", queryset=ChatBadge.objects.order_by("badge_id")),
).get(set_id=set_id)
except ChatBadgeSet.DoesNotExist as exc: except ChatBadgeSet.DoesNotExist as exc:
msg = "No badge set found matching the query" msg = "No badge set found matching the query"
raise Http404(msg) from exc raise Http404(msg) from exc
# Sort badges treating pure-numeric badge_ids as integers, strings alphabetically after def get_sorted_badges(badge_set: ChatBadgeSet) -> QuerySet[ChatBadge]:
badges: list[ChatBadge] = sorted( badges = badge_set.badges.all() # pyright: ignore[reportAttributeAccessIssue]
badge_set.badges.all(), # pyright: ignore[reportAttributeAccessIssue]
key=lambda b: (0, int(b.badge_id)) if b.badge_id.isdigit() else (1, b.badge_id),
)
# Batch-fetch award campaigns for all badge titles (2 queries regardless of badge count) def sort_badges(badge: ChatBadge) -> tuple:
award_map: dict[str, list[DropCampaign]] = ChatBadge.award_campaigns_by_title( """Sort badges by badge_id, treating numeric IDs as integers.
[b.title for b in badges],
Args:
badge: The ChatBadge to sort.
Returns:
A tuple used for sorting, where numeric badge_ids are sorted as integers.
"""
try:
return (int(badge.badge_id),)
except ValueError:
return (badge.badge_id,)
sorted_badges: list[ChatBadge] = sorted(badges, key=sort_badges)
badge_ids: list[int] = [badge.pk for badge in sorted_badges]
preserved_order = Case(
*[When(pk=pk, then=pos) for pos, pk in enumerate(badge_ids)],
) )
return ChatBadge.objects.filter(pk__in=badge_ids).order_by(preserved_order)
badges: QuerySet[ChatBadge, ChatBadge] = get_sorted_badges(badge_set)
# Attach award_campaigns attribute to each badge for template use
for badge in badges: for badge in badges:
badge.award_campaigns = award_map.get(badge.title, []) # pyright: ignore[reportAttributeAccessIssue] benefits: QuerySet[DropBenefit, DropBenefit] = DropBenefit.objects.filter(
distribution_type="BADGE",
name=badge.title,
)
campaigns: QuerySet[DropCampaign, DropCampaign] = DropCampaign.objects.filter(
time_based_drops__benefits__in=benefits,
).distinct()
badge.award_campaigns = list(campaigns) # pyright: ignore[reportAttributeAccessIssue]
badge_set_name: str = badge_set.set_id badge_set_name: str = badge_set.set_id
badge_count: int = len(badges) badge_set_description: str = f"Twitch chat badge set {badge_set_name} with {len(badges)} badge{'s' if len(badges) != 1 else ''} awarded through drop campaigns."
badge_set_description: str = (
f"Twitch chat badge set {badge_set_name} with {badge_count} "
f"badge{'s' if badge_count != 1 else ''} awarded through drop campaigns."
)
badge_schema: dict[str, Any] = { badge_schema: dict[str, Any] = {
"@context": "https://schema.org", "@context": "https://schema.org",