Improve sitemaps
All checks were successful
Deploy to Server / deploy (push) Successful in 9s

This commit is contained in:
Joakim Hellsén 2026-02-27 06:02:30 +01:00
commit 415dd12fd9
Signed by: Joakim Hellsén
SSH key fingerprint: SHA256:/9h/CsExpFp+PRhsfA0xznFx2CGfTT5R/kpuFfUgEQk
16 changed files with 843 additions and 379 deletions

View file

@ -153,6 +153,7 @@ INSTALLED_APPS: list[str] = [
"django.contrib.sessions", "django.contrib.sessions",
"django.contrib.staticfiles", "django.contrib.staticfiles",
"django.contrib.postgres", "django.contrib.postgres",
"django.contrib.sitemaps",
"twitch.apps.TwitchConfig", "twitch.apps.TwitchConfig",
] ]

View file

@ -4,17 +4,44 @@ from typing import TYPE_CHECKING
from django.conf import settings from django.conf import settings
from django.conf.urls.static import static from django.conf.urls.static import static
from django.contrib.sitemaps.views import index
from django.contrib.sitemaps.views import sitemap
from django.urls import include from django.urls import include
from django.urls import path from django.urls import path
from django.views.decorators.cache import cache_page
from twitch import sitemaps as twitch_sitemaps
from twitch import views as twitch_views from twitch import views as twitch_views
if TYPE_CHECKING: if TYPE_CHECKING:
from django.contrib.sitemaps import Sitemap
from django.urls.resolvers import URLPattern from django.urls.resolvers import URLPattern
from django.urls.resolvers import URLResolver from django.urls.resolvers import URLResolver
sitemaps: dict[str, type[Sitemap]] = {
"static": twitch_sitemaps.TwitchSitemapGenerator,
"games": twitch_sitemaps.GameSitemap,
"campaigns": twitch_sitemaps.CampaignSitemap,
"organizations": twitch_sitemaps.OrganizationSitemap,
"channels": twitch_sitemaps.ChannelSitemap,
"badges": twitch_sitemaps.BadgeSitemap,
"reward-campaigns": twitch_sitemaps.RewardCampaignSitemap,
}
urlpatterns: list[URLPattern | URLResolver] = [ urlpatterns: list[URLPattern | URLResolver] = [
path("sitemap.xml", twitch_views.sitemap_view, name="sitemap"), path(
"sitemap.xml",
cache_page(60 * 60)(index),
{"sitemaps": sitemaps},
name="sitemap",
),
path(
"sitemap-<section>.xml",
cache_page(60 * 60)(sitemap),
{"sitemaps": sitemaps},
name="django.contrib.sitemaps.views.sitemap",
),
path("robots.txt", twitch_views.robots_txt_view, name="robots"), path("robots.txt", twitch_views.robots_txt_view, name="robots"),
path(route="", view=include("twitch.urls", namespace="twitch")), path(route="", view=include("twitch.urls", namespace="twitch")),
] ]
@ -26,6 +53,8 @@ if settings.DEBUG:
document_root=settings.MEDIA_ROOT, document_root=settings.MEDIA_ROOT,
) )
# If not testing, include debug toolbar and silk URLs
if not settings.TESTING: if not settings.TESTING:
from debug_toolbar.toolbar import debug_toolbar_urls from debug_toolbar.toolbar import debug_toolbar_urls

243
twitch/sitemaps.py Normal file
View file

@ -0,0 +1,243 @@
from __future__ import annotations
from datetime import UTC
from datetime import datetime
from typing import TYPE_CHECKING
from typing import TypedDict
from django.conf import settings
from django.contrib.sitemaps import Sitemap
from django.db.models import Max
from django.db.models import Prefetch
from django.db.models.query import QuerySet
from django.urls import reverse
from twitch.models import Channel
from twitch.models import ChatBadgeSet
from twitch.models import DropBenefit
from twitch.models import DropCampaign
from twitch.models import Game
from twitch.models import Organization
from twitch.models import RewardCampaign
from twitch.models import TimeBasedDrop
if TYPE_CHECKING:
from pathlib import Path
from django.db.models import QuerySet
class EmoteDict(TypedDict):
"""Type definition for emote dictionary.
Used in TwitchSitemapGenerator to track emotes and their associated campaigns when calculating
lastmod for the emote gallery sitemap item.
"""
image_url: str
campaign: DropCampaign
class TwitchSitemapGenerator(Sitemap):
"""Sitemap for static views on the Twitch site."""
def items(self) -> list[str]:
"""Return list of URL pattern names to include in sitemap."""
# names used in `twitch/urls.py`
return [
"twitch:dashboard",
"twitch:campaign_list",
"twitch:reward_campaign_list",
"twitch:games_grid",
"twitch:games_list",
"twitch:org_list",
"twitch:channel_list",
"twitch:badge_list",
"twitch:emote_gallery",
"twitch:search",
"twitch:dataset_backups",
"twitch:docs_rss",
]
def location(self, item: str) -> str:
"""Return URL for a given item (URL pattern name)."""
return reverse(item)
def lastmod(self, item: str) -> datetime | None:
"""Return latest modified time across models relevant to static views."""
if item == "twitch:search":
return None
if item == "twitch:dashboard":
return DropCampaign.objects.aggregate(latest=Max("updated_at"))["latest"]
if item == "twitch:campaign_list":
return DropCampaign.objects.aggregate(latest=Max("updated_at"))["latest"]
if item == "twitch:reward_campaign_list":
return RewardCampaign.objects.aggregate(latest=Max("updated_at"))["latest"]
if item in {"twitch:games_grid", "twitch:games_list"}:
return Game.objects.aggregate(latest=Max("updated_at"))["latest"]
if item == "twitch:org_list":
return Organization.objects.aggregate(latest=Max("updated_at"))["latest"]
if item == "twitch:channel_list":
# TODO(TheLovinator): This page is paginated, so we should not # noqa: TD003
# return the latest updated_at across all channels, as that would
# cause the entire sitemap to be re-crawled whenever any channel is updated
# Instead, we should consider only returning the latest updated_at across
# channels included in the first page of results, or implementing a more
# sophisticated approach to ensure we don't trigger excessive re-crawling
# while still keeping the sitemap reasonably up to date.
# return Channel.objects.aggregate(latest=Max("updated_at"))["latest"]
return None
if item == "twitch:badge_list":
return ChatBadgeSet.objects.aggregate(latest=Max("updated_at"))["latest"]
if item == "twitch:emote_gallery":
# TODO(TheLovinator): Refactor this to avoid duplicating code from the emote gallery view. # noqa: TD003
emote_benefits: QuerySet[DropBenefit, DropBenefit] = (
DropBenefit.objects
.filter(distribution_type="EMOTE")
.select_related()
.prefetch_related(
Prefetch(
"drops",
queryset=TimeBasedDrop.objects.select_related("campaign"),
to_attr="_emote_drops",
),
)
)
emotes: list[EmoteDict] = []
for benefit in emote_benefits:
# Find the first drop with a campaign for this benefit
drop: TimeBasedDrop | None = next((d for d in getattr(benefit, "_emote_drops", []) if d.campaign), None)
if drop:
drop_campaign: DropCampaign | None = drop.campaign
if drop_campaign:
emotes.append({
"image_url": benefit.image_best_url,
"campaign": drop_campaign,
})
if not emotes:
# If there are no emotes, return None to avoid unnecessarily triggering re-crawls of the sitemap
return None
# Return the latest updated_at across all campaigns associated with emotes
return max(emote["campaign"].updated_at for emote in emotes)
if item == "twitch:docs_rss":
return None
if item == "twitch:dataset_backups":
datasets_root: Path = settings.DATA_DIR / "datasets"
backup_files: list[Path] = list(datasets_root.glob("dataset_backup_*.zip"))
if not backup_files:
return None
latest_backup: Path = max(backup_files, key=lambda f: f.stat().st_mtime)
return datetime.fromtimestamp(latest_backup.stat().st_mtime, tz=UTC)
return None
class GameSitemap(Sitemap):
"""Sitemap for games."""
def items(self) -> QuerySet[Game]:
"""Return queryset of games to include in sitemap."""
return Game.objects.all().only("twitch_id", "updated_at")
def lastmod(self, obj: Game) -> datetime | None:
"""Return last modified time for a given game."""
return obj.updated_at
def location(self, obj: Game) -> str: # pyright: ignore[reportIncompatibleMethodOverride]
"""Return URL for a given game."""
return reverse("twitch:game_detail", args=[obj.twitch_id])
class CampaignSitemap(Sitemap):
"""Sitemap for drop campaigns."""
def items(self) -> QuerySet[DropCampaign]:
"""Return queryset of drop campaigns to include in sitemap."""
return DropCampaign.objects.all().only("twitch_id", "updated_at")
def lastmod(self, obj: DropCampaign) -> datetime | None:
"""Return last modified time for a given drop campaign."""
return obj.updated_at
def location(self, obj: DropCampaign) -> str: # pyright: ignore[reportIncompatibleMethodOverride]
"""Return URL for a given drop campaign."""
return reverse("twitch:campaign_detail", args=[obj.twitch_id])
class OrganizationSitemap(Sitemap):
"""Sitemap for organizations."""
def items(self) -> QuerySet[Organization]:
"""Return queryset of organizations to include in sitemap."""
return Organization.objects.all().only("twitch_id", "updated_at")
def lastmod(self, obj: Organization) -> datetime | None:
"""Return last modified time for a given organization."""
return obj.updated_at
def location(self, obj: Organization) -> str: # pyright: ignore[reportIncompatibleMethodOverride]
"""Return URL for a given organization."""
return reverse("twitch:organization_detail", args=[obj.twitch_id])
class ChannelSitemap(Sitemap):
"""Sitemap for individual channels."""
def items(self) -> QuerySet[Channel]:
"""Return queryset of channels to include in sitemap."""
return Channel.objects.all().only("twitch_id", "updated_at")
def lastmod(self, obj: Channel) -> datetime | None:
"""Return last modified time for a given channel."""
return obj.updated_at
def location(self, obj: Channel) -> str: # pyright: ignore[reportIncompatibleMethodOverride]
"""Return URL for a given channel."""
return reverse("twitch:channel_detail", args=[obj.twitch_id])
class BadgeSitemap(Sitemap):
"""Sitemap for chat badge sets."""
def items(self) -> QuerySet[ChatBadgeSet]:
"""Return queryset of chat badge sets to include in sitemap."""
return ChatBadgeSet.objects.all().only("set_id")
def lastmod(self, obj: ChatBadgeSet) -> datetime | None:
"""Return last modified time for a given badge set."""
return obj.updated_at
def location(self, obj: ChatBadgeSet) -> str: # pyright: ignore[reportIncompatibleMethodOverride]
"""Return URL for a given chat badge set."""
return reverse("twitch:badge_set_detail", args=[obj.set_id])
class RewardCampaignSitemap(Sitemap):
"""Sitemap for reward campaigns."""
def items(self) -> QuerySet[RewardCampaign]:
"""Return queryset of reward campaigns to include in sitemap."""
return RewardCampaign.objects.all().only("twitch_id", "updated_at")
def lastmod(self, obj: RewardCampaign) -> datetime | None:
"""Return last modified time for a given reward campaign."""
return obj.updated_at
def location(self, obj: RewardCampaign) -> str: # pyright: ignore[reportIncompatibleMethodOverride]
"""Return URL for a given reward campaign."""
return reverse("twitch:reward_campaign_detail", args=[obj.twitch_id])

View file

@ -5,6 +5,7 @@ import math
import os import os
import shutil import shutil
from compression import zstd from compression import zstd
from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import pytest import pytest
@ -21,6 +22,7 @@ from twitch.models import Game
from twitch.models import Organization from twitch.models import Organization
if TYPE_CHECKING: if TYPE_CHECKING:
import sqlite3
from pathlib import Path from pathlib import Path
from django.test import Client from django.test import Client
@ -42,12 +44,12 @@ class TestBackupCommand:
# Create test data so tables exist # Create test data so tables exist
Organization.objects.create(twitch_id="test000", name="Test Org") Organization.objects.create(twitch_id="test000", name="Test Org")
output_dir = tmp_path / "backups" output_dir: Path = tmp_path / "backups"
output_dir.mkdir() output_dir.mkdir()
call_command("backup_db", output_dir=str(output_dir), prefix="test") call_command("backup_db", output_dir=str(output_dir), prefix="test")
backup_files = list(output_dir.glob("test-*.sql.zst")) backup_files: list[Path] = list(output_dir.glob("test-*.sql.zst"))
assert len(backup_files) == 1 assert len(backup_files) == 1
assert backup_files[0].exists() assert backup_files[0].exists()
assert backup_files[0].stat().st_size > 0 assert backup_files[0].stat().st_size > 0
@ -55,17 +57,17 @@ class TestBackupCommand:
def test_backup_contains_sql_content(self, tmp_path: Path) -> None: def test_backup_contains_sql_content(self, tmp_path: Path) -> None:
"""Test that backup file contains valid SQL content.""" """Test that backup file contains valid SQL content."""
_skip_if_pg_dump_missing() _skip_if_pg_dump_missing()
output_dir = tmp_path / "backups" output_dir: Path = tmp_path / "backups"
output_dir.mkdir() output_dir.mkdir()
# Create some test data # Create some test data
org = Organization.objects.create(twitch_id="test123", name="Test Org") org: Organization = Organization.objects.create(twitch_id="test123", name="Test Org")
game = Game.objects.create(twitch_id="game456", display_name="Test Game") game: Game = Game.objects.create(twitch_id="game456", display_name="Test Game")
game.owners.add(org) game.owners.add(org)
call_command("backup_db", output_dir=str(output_dir), prefix="test") call_command("backup_db", output_dir=str(output_dir), prefix="test")
backup_file = next(iter(output_dir.glob("test-*.sql.zst"))) backup_file: Path = next(iter(output_dir.glob("test-*.sql.zst")))
# Decompress and read content # Decompress and read content
with ( with (
@ -73,7 +75,7 @@ class TestBackupCommand:
zstd.open(raw_handle, "r") as compressed, zstd.open(raw_handle, "r") as compressed,
io.TextIOWrapper(compressed, encoding="utf-8") as handle, io.TextIOWrapper(compressed, encoding="utf-8") as handle,
): ):
content = handle.read() content: str = handle.read()
if connection.vendor == "postgresql": if connection.vendor == "postgresql":
assert "CREATE TABLE" in content assert "CREATE TABLE" in content
@ -92,19 +94,19 @@ class TestBackupCommand:
# Create test data so tables exist # Create test data so tables exist
Organization.objects.create(twitch_id="test001", name="Test Org") Organization.objects.create(twitch_id="test001", name="Test Org")
output_dir = tmp_path / "backups" output_dir: Path = tmp_path / "backups"
output_dir.mkdir() output_dir.mkdir()
call_command("backup_db", output_dir=str(output_dir), prefix="test") call_command("backup_db", output_dir=str(output_dir), prefix="test")
backup_file = next(iter(output_dir.glob("test-*.sql.zst"))) backup_file: Path = next(iter(output_dir.glob("test-*.sql.zst")))
with ( with (
backup_file.open("rb") as raw_handle, backup_file.open("rb") as raw_handle,
zstd.open(raw_handle, "r") as compressed, zstd.open(raw_handle, "r") as compressed,
io.TextIOWrapper(compressed, encoding="utf-8") as handle, io.TextIOWrapper(compressed, encoding="utf-8") as handle,
): ):
content = handle.read() content: str = handle.read()
# Should NOT contain django admin, silk, or debug toolbar tables # Should NOT contain django admin, silk, or debug toolbar tables
assert "django_session" not in content assert "django_session" not in content
@ -121,12 +123,12 @@ class TestBackupCommand:
# Create test data so tables exist # Create test data so tables exist
Organization.objects.create(twitch_id="test002", name="Test Org") Organization.objects.create(twitch_id="test002", name="Test Org")
output_dir = tmp_path / "backups" output_dir: Path = tmp_path / "backups"
output_dir.mkdir() output_dir.mkdir()
call_command("backup_db", output_dir=str(output_dir), prefix="custom") call_command("backup_db", output_dir=str(output_dir), prefix="custom")
backup_files = list(output_dir.glob("custom-*.sql.zst")) backup_files: list[Path] = list(output_dir.glob("custom-*.sql.zst"))
assert len(backup_files) == 1 assert len(backup_files) == 1
def test_backup_creates_output_directory(self, tmp_path: Path) -> None: def test_backup_creates_output_directory(self, tmp_path: Path) -> None:
@ -135,7 +137,7 @@ class TestBackupCommand:
# Create test data so tables exist # Create test data so tables exist
Organization.objects.create(twitch_id="test003", name="Test Org") Organization.objects.create(twitch_id="test003", name="Test Org")
output_dir = tmp_path / "nonexistent" / "backups" output_dir: Path = tmp_path / "nonexistent" / "backups"
call_command("backup_db", output_dir=str(output_dir), prefix="test") call_command("backup_db", output_dir=str(output_dir), prefix="test")
@ -149,12 +151,12 @@ class TestBackupCommand:
Organization.objects.create(twitch_id="test004", name="Test Org") Organization.objects.create(twitch_id="test004", name="Test Org")
monkeypatch.setattr(settings, "DATA_DIR", tmp_path) monkeypatch.setattr(settings, "DATA_DIR", tmp_path)
datasets_dir = tmp_path / "datasets" datasets_dir: Path = tmp_path / "datasets"
datasets_dir.mkdir(exist_ok=True, parents=True) datasets_dir.mkdir(exist_ok=True, parents=True)
call_command("backup_db") call_command("backup_db")
backup_files = list(datasets_dir.glob("ttvdrops-*.sql.zst")) backup_files: list[Path] = list(datasets_dir.glob("ttvdrops-*.sql.zst"))
assert len(backup_files) >= 1 assert len(backup_files) >= 1
@ -165,7 +167,7 @@ class TestBackupHelperFunctions:
def test_get_allowed_tables_filters_by_prefix(self) -> None: def test_get_allowed_tables_filters_by_prefix(self) -> None:
"""Test that _get_allowed_tables returns only matching tables.""" """Test that _get_allowed_tables returns only matching tables."""
# Use Django's connection to access the test database # Use Django's connection to access the test database
tables = _get_allowed_tables("twitch_") tables: list[str] = _get_allowed_tables("twitch_")
assert len(tables) > 0 assert len(tables) > 0
assert all(table.startswith("twitch_") for table in tables) assert all(table.startswith("twitch_") for table in tables)
@ -175,7 +177,7 @@ class TestBackupHelperFunctions:
def test_get_allowed_tables_excludes_non_matching(self) -> None: def test_get_allowed_tables_excludes_non_matching(self) -> None:
"""Test that _get_allowed_tables excludes non-matching tables.""" """Test that _get_allowed_tables excludes non-matching tables."""
# Use Django's connection to access the test database # Use Django's connection to access the test database
tables = _get_allowed_tables("twitch_") tables: list[str] = _get_allowed_tables("twitch_")
# Should not include django, silk, or debug toolbar tables # Should not include django, silk, or debug toolbar tables
assert not any(table.startswith("django_") for table in tables) assert not any(table.startswith("django_") for table in tables)
@ -212,25 +214,25 @@ class TestBackupHelperFunctions:
# Create test data # Create test data
Organization.objects.create(twitch_id="test789", name="Write Test Org") Organization.objects.create(twitch_id="test789", name="Write Test Org")
tables = _get_allowed_tables("twitch_") tables: list[str] = _get_allowed_tables("twitch_")
if connection.vendor == "postgresql": if connection.vendor == "postgresql":
if not shutil.which("pg_dump"): if not shutil.which("pg_dump"):
pytest.skip("pg_dump is not available") pytest.skip("pg_dump is not available")
output_path = tmp_path / "backup.sql.zst" output_path: Path = tmp_path / "backup.sql.zst"
_write_postgres_dump(output_path, tables) _write_postgres_dump(output_path, tables)
with ( with (
output_path.open("rb") as raw_handle, output_path.open("rb") as raw_handle,
zstd.open(raw_handle, "r") as compressed, zstd.open(raw_handle, "r") as compressed,
io.TextIOWrapper(compressed, encoding="utf-8") as handle, io.TextIOWrapper(compressed, encoding="utf-8") as handle,
): ):
content = handle.read() content: str = handle.read()
assert "CREATE TABLE" in content assert "CREATE TABLE" in content
assert "INSERT INTO" in content assert "INSERT INTO" in content
assert "twitch_organization" in content assert "twitch_organization" in content
assert "Write Test Org" in content assert "Write Test Org" in content
else: else:
db_connection = connection.connection db_connection: sqlite3.Connection = connection.connection
output = io.StringIO() output = io.StringIO()
_write_sqlite_dump(output, db_connection, tables) _write_sqlite_dump(output, db_connection, tables)
content = output.getvalue() content = output.getvalue()
@ -255,7 +257,7 @@ class TestDatasetBackupViews:
Returns: Returns:
Path to the created datasets directory. Path to the created datasets directory.
""" """
datasets_dir = tmp_path / "datasets" datasets_dir: Path = tmp_path / "datasets"
datasets_dir.mkdir() datasets_dir.mkdir()
return datasets_dir return datasets_dir
@ -266,7 +268,7 @@ class TestDatasetBackupViews:
Returns: Returns:
Path to the created backup file. Path to the created backup file.
""" """
backup_file = datasets_dir / "ttvdrops-20260210-120000.sql.zst" backup_file: Path = datasets_dir / "ttvdrops-20260210-120000.sql.zst"
with ( with (
backup_file.open("wb") as raw_handle, backup_file.open("wb") as raw_handle,
zstd.open(raw_handle, "w") as compressed, zstd.open(raw_handle, "w") as compressed,
@ -315,8 +317,8 @@ class TestDatasetBackupViews:
monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent) monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent)
# Create multiple backup files with different timestamps # Create multiple backup files with different timestamps
older_backup = datasets_dir / "ttvdrops-20260210-100000.sql.zst" older_backup: Path = datasets_dir / "ttvdrops-20260210-100000.sql.zst"
newer_backup = datasets_dir / "ttvdrops-20260210-140000.sql.zst" newer_backup: Path = datasets_dir / "ttvdrops-20260210-140000.sql.zst"
for backup in [older_backup, newer_backup]: for backup in [older_backup, newer_backup]:
with ( with (
@ -334,9 +336,9 @@ class TestDatasetBackupViews:
response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:dataset_backups")) response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:dataset_backups"))
content = response.content.decode() content: str = response.content.decode()
newer_pos = content.find("20260210-140000") newer_pos: int = content.find("20260210-140000")
older_pos = content.find("20260210-100000") older_pos: int = content.find("20260210-100000")
# Newer backup should appear first (sorted descending) # Newer backup should appear first (sorted descending)
assert 0 < newer_pos < older_pos assert 0 < newer_pos < older_pos
@ -370,7 +372,9 @@ class TestDatasetBackupViews:
monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent) monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent)
# Attempt path traversal # Attempt path traversal
response = client.get(reverse("twitch:dataset_backup_download", args=["../../../etc/passwd"])) response: _MonkeyPatchedWSGIResponse = client.get(
reverse("twitch:dataset_backup_download", args=["../../../etc/passwd"]),
)
assert response.status_code == 404 assert response.status_code == 404
def test_dataset_download_rejects_invalid_extensions( def test_dataset_download_rejects_invalid_extensions(
@ -383,10 +387,12 @@ class TestDatasetBackupViews:
monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent) monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent)
# Create a file with invalid extension # Create a file with invalid extension
invalid_file = datasets_dir / "malicious.exe" invalid_file: Path = datasets_dir / "malicious.exe"
invalid_file.write_text("not a backup") invalid_file.write_text("not a backup", encoding="utf-8")
response = client.get(reverse("twitch:dataset_backup_download", args=["malicious.exe"])) response: _MonkeyPatchedWSGIResponse = client.get(
reverse("twitch:dataset_backup_download", args=["malicious.exe"]),
)
assert response.status_code == 404 assert response.status_code == 404
def test_dataset_download_file_not_found( def test_dataset_download_file_not_found(
@ -398,7 +404,9 @@ class TestDatasetBackupViews:
"""Test download returns 404 for non-existent file.""" """Test download returns 404 for non-existent file."""
monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent) monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent)
response = client.get(reverse("twitch:dataset_backup_download", args=["nonexistent.sql.zst"])) response: _MonkeyPatchedWSGIResponse = client.get(
reverse("twitch:dataset_backup_download", args=["nonexistent.sql.zst"]),
)
assert response.status_code == 404 assert response.status_code == 404
def test_dataset_list_view_shows_file_sizes( def test_dataset_list_view_shows_file_sizes(
@ -414,8 +422,9 @@ class TestDatasetBackupViews:
response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:dataset_backups")) response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:dataset_backups"))
assert response.status_code == 200 assert response.status_code == 200
# Should contain size information (bytes, KB, MB, or GB) # Should contain size information (bytes, KB, MB, or GB)
content = response.content.decode() content: str = response.content.decode()
assert any(unit in content for unit in ["bytes", "KB", "MB", "GB"]) assert any(unit in content for unit in ["bytes", "KB", "MB", "GB"])
def test_dataset_list_ignores_non_zst_files( def test_dataset_list_ignores_non_zst_files(
@ -434,7 +443,7 @@ class TestDatasetBackupViews:
response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:dataset_backups")) response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:dataset_backups"))
content = response.content.decode() content: str = response.content.decode()
assert "backup.sql.zst" in content assert "backup.sql.zst" in content
assert "readme.txt" not in content assert "readme.txt" not in content
assert "old_backup.gz" not in content assert "old_backup.gz" not in content
@ -449,7 +458,7 @@ class TestDatasetBackupViews:
monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent) monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent)
# Create subdirectory with backup # Create subdirectory with backup
subdir = datasets_dir / "2026" / "02" subdir: Path = datasets_dir / "2026" / "02"
subdir.mkdir(parents=True) subdir.mkdir(parents=True)
backup_file = subdir / "backup.sql.zst" backup_file = subdir / "backup.sql.zst"
with ( with (

View file

@ -12,6 +12,7 @@ from twitch.models import ChatBadgeSet
if TYPE_CHECKING: if TYPE_CHECKING:
from django.test import Client from django.test import Client
from django.test.client import _MonkeyPatchedWSGIResponse
@pytest.mark.django_db @pytest.mark.django_db
@ -20,25 +21,25 @@ class TestBadgeListView:
def test_badge_list_empty(self, client: Client) -> None: def test_badge_list_empty(self, client: Client) -> None:
"""Test badge list view with no badges.""" """Test badge list view with no badges."""
response = client.get(reverse("twitch:badge_list")) response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:badge_list"))
assert response.status_code == 200 assert response.status_code == 200
assert "No badge sets found" in response.content.decode() assert "No badge sets found" in response.content.decode()
def test_badge_list_displays_sets(self, client: Client) -> None: def test_badge_list_displays_sets(self, client: Client) -> None:
"""Test that badge sets are displayed.""" """Test that badge sets are displayed."""
badge_set1 = ChatBadgeSet.objects.create(set_id="vip") badge_set1: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="vip")
badge_set2 = ChatBadgeSet.objects.create(set_id="subscriber") badge_set2: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="subscriber")
response = client.get(reverse("twitch:badge_list")) response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:badge_list"))
assert response.status_code == 200 assert response.status_code == 200
content = response.content.decode() content: str = response.content.decode()
assert badge_set1.set_id in content assert badge_set1.set_id in content
assert badge_set2.set_id in content assert badge_set2.set_id in content
def test_badge_list_displays_badge_count(self, client: Client) -> None: def test_badge_list_displays_badge_count(self, client: Client) -> None:
"""Test that badge version count is displayed.""" """Test that badge version count is displayed."""
badge_set = ChatBadgeSet.objects.create(set_id="bits") badge_set: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="bits")
ChatBadge.objects.create( ChatBadge.objects.create(
badge_set=badge_set, badge_set=badge_set,
badge_id="1", badge_id="1",
@ -58,9 +59,9 @@ class TestBadgeListView:
description="100 Bits", description="100 Bits",
) )
response = client.get(reverse("twitch:badge_list")) response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:badge_list"))
assert response.status_code == 200 assert response.status_code == 200
content = response.content.decode() content: str = response.content.decode()
# Should show version count (the template uses "versions" not "version") # Should show version count (the template uses "versions" not "version")
assert "2" in content assert "2" in content
@ -73,13 +74,15 @@ class TestBadgeSetDetailView:
def test_badge_set_detail_not_found(self, client: Client) -> None: def test_badge_set_detail_not_found(self, client: Client) -> None:
"""Test 404 when badge set doesn't exist.""" """Test 404 when badge set doesn't exist."""
response = client.get(reverse("twitch:badge_set_detail", kwargs={"set_id": "nonexistent"})) response: _MonkeyPatchedWSGIResponse = client.get(
reverse("twitch:badge_set_detail", kwargs={"set_id": "nonexistent"}),
)
assert response.status_code == 404 assert response.status_code == 404
def test_badge_set_detail_displays_badges(self, client: Client) -> None: def test_badge_set_detail_displays_badges(self, client: Client) -> None:
"""Test that badge versions are displayed.""" """Test that badge versions are displayed."""
badge_set = ChatBadgeSet.objects.create(set_id="moderator") badge_set: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="moderator")
badge = ChatBadge.objects.create( badge: ChatBadge = ChatBadge.objects.create(
badge_set=badge_set, badge_set=badge_set,
badge_id="1", badge_id="1",
image_url_1x="https://example.com/1x.png", image_url_1x="https://example.com/1x.png",
@ -91,9 +94,11 @@ class TestBadgeSetDetailView:
click_url="https://help.twitch.tv", click_url="https://help.twitch.tv",
) )
response = client.get(reverse("twitch:badge_set_detail", kwargs={"set_id": "moderator"})) response: _MonkeyPatchedWSGIResponse = client.get(
reverse("twitch:badge_set_detail", kwargs={"set_id": "moderator"}),
)
assert response.status_code == 200 assert response.status_code == 200
content = response.content.decode() content: str = response.content.decode()
assert badge.title in content assert badge.title in content
assert badge.description in content assert badge.description in content
@ -102,7 +107,7 @@ class TestBadgeSetDetailView:
def test_badge_set_detail_displays_metadata(self, client: Client) -> None: def test_badge_set_detail_displays_metadata(self, client: Client) -> None:
"""Test that badge set metadata is displayed.""" """Test that badge set metadata is displayed."""
badge_set = ChatBadgeSet.objects.create(set_id="vip") badge_set: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="vip")
ChatBadge.objects.create( ChatBadge.objects.create(
badge_set=badge_set, badge_set=badge_set,
badge_id="1", badge_id="1",
@ -113,16 +118,16 @@ class TestBadgeSetDetailView:
description="VIP Badge", description="VIP Badge",
) )
response = client.get(reverse("twitch:badge_set_detail", kwargs={"set_id": "vip"})) response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:badge_set_detail", kwargs={"set_id": "vip"}))
assert response.status_code == 200 assert response.status_code == 200
content = response.content.decode() content: str = response.content.decode()
assert "vip" in content assert "vip" in content
assert "1" in content assert "1" in content
def test_badge_set_detail_json_data(self, client: Client) -> None: def test_badge_set_detail_json_data(self, client: Client) -> None:
"""Test that JSON data is displayed.""" """Test that JSON data is displayed."""
badge_set = ChatBadgeSet.objects.create(set_id="test_set") badge_set: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="test_set")
ChatBadge.objects.create( ChatBadge.objects.create(
badge_set=badge_set, badge_set=badge_set,
badge_id="1", badge_id="1",
@ -133,9 +138,11 @@ class TestBadgeSetDetailView:
description="Test Badge", description="Test Badge",
) )
response = client.get(reverse("twitch:badge_set_detail", kwargs={"set_id": "test_set"})) response: _MonkeyPatchedWSGIResponse = client.get(
reverse("twitch:badge_set_detail", kwargs={"set_id": "test_set"}),
)
assert response.status_code == 200 assert response.status_code == 200
content = response.content.decode() content: str = response.content.decode()
assert "test_set" in content assert "test_set" in content
@ -149,16 +156,16 @@ class TestBadgeSearch:
ChatBadgeSet.objects.create(set_id="vip") ChatBadgeSet.objects.create(set_id="vip")
ChatBadgeSet.objects.create(set_id="subscriber") ChatBadgeSet.objects.create(set_id="subscriber")
response = client.get(reverse("twitch:search"), {"q": "vip"}) response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:search"), {"q": "vip"})
assert response.status_code == 200 assert response.status_code == 200
content = response.content.decode() content: str = response.content.decode()
assert "Badge Sets" in content assert "Badge Sets" in content
assert "vip" in content assert "vip" in content
def test_search_finds_badges_by_title(self, client: Client) -> None: def test_search_finds_badges_by_title(self, client: Client) -> None:
"""Test that search finds badges by title.""" """Test that search finds badges by title."""
badge_set = ChatBadgeSet.objects.create(set_id="test") badge_set: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="test")
ChatBadge.objects.create( ChatBadge.objects.create(
badge_set=badge_set, badge_set=badge_set,
badge_id="1", badge_id="1",
@ -169,16 +176,16 @@ class TestBadgeSearch:
description="Test description", description="Test description",
) )
response = client.get(reverse("twitch:search"), {"q": "Moderator"}) response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:search"), {"q": "Moderator"})
assert response.status_code == 200 assert response.status_code == 200
content = response.content.decode() content: str = response.content.decode()
assert "Chat Badges" in content assert "Chat Badges" in content
assert "Moderator Badge" in content assert "Moderator Badge" in content
def test_search_finds_badges_by_description(self, client: Client) -> None: def test_search_finds_badges_by_description(self, client: Client) -> None:
"""Test that search finds badges by description.""" """Test that search finds badges by description."""
badge_set = ChatBadgeSet.objects.create(set_id="test") badge_set: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="test")
ChatBadge.objects.create( ChatBadge.objects.create(
badge_set=badge_set, badge_set=badge_set,
badge_id="1", badge_id="1",
@ -189,8 +196,8 @@ class TestBadgeSearch:
description="Unique description text", description="Unique description text",
) )
response = client.get(reverse("twitch:search"), {"q": "Unique description"}) response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:search"), {"q": "Unique description"})
assert response.status_code == 200 assert response.status_code == 200
content = response.content.decode() content: str = response.content.decode()
assert "Chat Badges" in content or "Test Badge" in content assert "Chat Badges" in content or "Test Badge" in content

View file

@ -2,6 +2,8 @@ from __future__ import annotations
import json import json
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
from unittest import skipIf from unittest import skipIf
from django.db import connection from django.db import connection
@ -17,6 +19,9 @@ from twitch.models import Organization
from twitch.models import TimeBasedDrop from twitch.models import TimeBasedDrop
from twitch.schemas import DropBenefitSchema from twitch.schemas import DropBenefitSchema
if TYPE_CHECKING:
from pytest_django.asserts import QuerySet
class GetOrUpdateBenefitTests(TestCase): class GetOrUpdateBenefitTests(TestCase):
"""Tests for the _get_or_update_benefit method in better_import_drops.Command.""" """Tests for the _get_or_update_benefit method in better_import_drops.Command."""
@ -24,7 +29,6 @@ class GetOrUpdateBenefitTests(TestCase):
def test_defaults_distribution_type_when_missing(self) -> None: def test_defaults_distribution_type_when_missing(self) -> None:
"""Ensure importer sets distribution_type to empty string when absent.""" """Ensure importer sets distribution_type to empty string when absent."""
command = Command() command = Command()
command.benefit_cache = {}
benefit_schema: DropBenefitSchema = DropBenefitSchema.model_validate( benefit_schema: DropBenefitSchema = DropBenefitSchema.model_validate(
{ {
@ -324,7 +328,7 @@ class CampaignStructureDetectionTests(TestCase):
command = Command() command = Command()
# Inventory format with null dropCampaignsInProgress - should not detect as inventory_campaigns # Inventory format with null dropCampaignsInProgress - should not detect as inventory_campaigns
response = { response: dict[str, dict[str, dict[str, str | dict[str, str | None]]]] = {
"data": { "data": {
"currentUser": { "currentUser": {
"id": "123", "id": "123",
@ -462,15 +466,19 @@ class OperationNameFilteringTests(TestCase):
command.process_responses([inventory_payload], Path("inventory.json"), {}) command.process_responses([inventory_payload], Path("inventory.json"), {})
# Verify we can filter by operation_names with JSON containment # Verify we can filter by operation_names with JSON containment
viewer_campaigns = DropCampaign.objects.filter(operation_names__contains=["ViewerDropsDashboard"]) viewer_campaigns: QuerySet[DropCampaign, DropCampaign] = DropCampaign.objects.filter(
inventory_campaigns = DropCampaign.objects.filter(operation_names__contains=["Inventory"]) operation_names__contains=["ViewerDropsDashboard"],
)
inventory_campaigns: QuerySet[DropCampaign, DropCampaign] = DropCampaign.objects.filter(
operation_names__contains=["Inventory"],
)
assert len(viewer_campaigns) >= 1 assert len(viewer_campaigns) >= 1
assert len(inventory_campaigns) >= 1 assert len(inventory_campaigns) >= 1
# Verify the correct campaigns are in each list # Verify the correct campaigns are in each list
viewer_ids = [c.twitch_id for c in viewer_campaigns] viewer_ids: list[str] = [c.twitch_id for c in viewer_campaigns]
inventory_ids = [c.twitch_id for c in inventory_campaigns] inventory_ids: list[str] = [c.twitch_id for c in inventory_campaigns]
assert "viewer-campaign-1" in viewer_ids assert "viewer-campaign-1" in viewer_ids
assert "inventory-campaign-1" in inventory_ids assert "inventory-campaign-1" in inventory_ids
@ -532,7 +540,7 @@ class GameImportTests(TestCase):
assert success is True assert success is True
assert broken_dir is None assert broken_dir is None
game = Game.objects.get(twitch_id="497057") game: Game = Game.objects.get(twitch_id="497057")
assert game.slug == "destiny-2" assert game.slug == "destiny-2"
assert game.display_name == "Destiny 2" assert game.display_name == "Destiny 2"
@ -595,10 +603,8 @@ class ExampleJsonImportTests(TestCase):
assert first_drop.required_minutes_watched == 120 assert first_drop.required_minutes_watched == 120
assert DropBenefit.objects.count() == 1 assert DropBenefit.objects.count() == 1
benefit: DropBenefit = DropBenefit.objects.get(twitch_id="ccb3fb7f-e59b-11ef-aef0-0a58a9feac02") benefit: DropBenefit = DropBenefit.objects.get(twitch_id="ccb3fb7f-e59b-11ef-aef0-0a58a9feac02")
assert ( image_url = "https://static-cdn.jtvnw.net/twitch-quests-assets/REWARD/903496ad-de97-41ff-ad97-12f099e20ea8.jpeg"
benefit.image_asset_url assert benefit.image_asset_url == image_url
== "https://static-cdn.jtvnw.net/twitch-quests-assets/REWARD/903496ad-de97-41ff-ad97-12f099e20ea8.jpeg"
)
class ImporterRobustnessTests(TestCase): class ImporterRobustnessTests(TestCase):
@ -608,7 +614,7 @@ class ImporterRobustnessTests(TestCase):
"""Ensure tuple payloads from json_repair don't crash the importer.""" """Ensure tuple payloads from json_repair don't crash the importer."""
command = Command() command = Command()
parsed = ( parsed: tuple[dict[str, dict[str, dict[str, str | list[Any]]] | dict[str, str]], list[dict[str, str]]] = (
{ {
"data": { "data": {
"currentUser": { "currentUser": {
@ -622,7 +628,7 @@ class ImporterRobustnessTests(TestCase):
[{"json_repair": "log"}], [{"json_repair": "log"}],
) )
normalized = command._normalize_responses(parsed) normalized: list[dict[str, Any]] = command._normalize_responses(parsed)
assert isinstance(normalized, list) assert isinstance(normalized, list)
assert len(normalized) == 1 assert len(normalized) == 1
assert normalized[0]["extensions"]["operationName"] == "ViewerDropsDashboard" assert normalized[0]["extensions"]["operationName"] == "ViewerDropsDashboard"
@ -670,7 +676,7 @@ class ImporterRobustnessTests(TestCase):
assert success is True assert success is True
assert broken_dir is None assert broken_dir is None
campaign = DropCampaign.objects.get(twitch_id="campaign-null-image") campaign: DropCampaign = DropCampaign.objects.get(twitch_id="campaign-null-image")
assert not campaign.image_url assert not campaign.image_url
@ -679,7 +685,7 @@ class ErrorOnlyResponseDetectionTests(TestCase):
def test_detects_error_only_response_with_service_timeout(self) -> None: def test_detects_error_only_response_with_service_timeout(self) -> None:
"""Ensure error-only response with service timeout is detected.""" """Ensure error-only response with service timeout is detected."""
parsed_json = { parsed_json: dict[str, list[dict[str, str | list[str]]]] = {
"errors": [ "errors": [
{ {
"message": "service timeout", "message": "service timeout",
@ -688,12 +694,13 @@ class ErrorOnlyResponseDetectionTests(TestCase):
], ],
} }
result = detect_error_only_response(parsed_json) result: str | None = detect_error_only_response(parsed_json)
assert result == "error_only: service timeout" assert result == "error_only: service timeout"
def test_detects_error_only_response_with_null_data(self) -> None: def test_detects_error_only_response_with_null_data(self) -> None:
"""Ensure error-only response with null data field is detected.""" """Ensure error-only response with null data field is detected."""
parsed_json = { parsed_json: dict[str, list[dict[str, str | list[str]]] | None] = {
"errors": [ "errors": [
{ {
"message": "internal server error", "message": "internal server error",
@ -703,12 +710,13 @@ class ErrorOnlyResponseDetectionTests(TestCase):
"data": None, "data": None,
} }
result = detect_error_only_response(parsed_json) result: str | None = detect_error_only_response(parsed_json)
assert result == "error_only: internal server error" assert result == "error_only: internal server error"
def test_detects_error_only_response_with_empty_data(self) -> None: def test_detects_error_only_response_with_empty_data(self) -> None:
"""Ensure error-only response with empty data dict is allowed through.""" """Ensure error-only response with empty data dict is allowed through."""
parsed_json = { parsed_json: dict[str, list[dict[str, str]] | dict[str, None]] = {
"errors": [ "errors": [
{ {
"message": "unauthorized", "message": "unauthorized",
@ -717,13 +725,14 @@ class ErrorOnlyResponseDetectionTests(TestCase):
"data": {}, "data": {},
} }
result = detect_error_only_response(parsed_json) result: str | None = detect_error_only_response(parsed_json)
# Empty dict {} is considered "data exists" so this should pass # Empty dict {} is considered "data exists" so this should pass
assert result is None assert result is None
def test_detects_error_only_response_without_data_key(self) -> None: def test_detects_error_only_response_without_data_key(self) -> None:
"""Ensure error-only response without data key is detected.""" """Ensure error-only response without data key is detected."""
parsed_json = { parsed_json: dict[str, list[dict[str, str]]] = {
"errors": [ "errors": [
{ {
"message": "missing data", "message": "missing data",
@ -731,7 +740,7 @@ class ErrorOnlyResponseDetectionTests(TestCase):
], ],
} }
result = detect_error_only_response(parsed_json) result: str | None = detect_error_only_response(parsed_json)
assert result == "error_only: missing data" assert result == "error_only: missing data"
def test_allows_response_with_both_errors_and_data(self) -> None: def test_allows_response_with_both_errors_and_data(self) -> None:
@ -749,12 +758,12 @@ class ErrorOnlyResponseDetectionTests(TestCase):
}, },
} }
result = detect_error_only_response(parsed_json) result: str | None = detect_error_only_response(parsed_json)
assert result is None assert result is None
def test_allows_response_with_no_errors(self) -> None: def test_allows_response_with_no_errors(self) -> None:
"""Ensure normal responses without errors are not flagged.""" """Ensure normal responses without errors are not flagged."""
parsed_json = { parsed_json: dict[str, dict[str, dict[str, list[None]]]] = {
"data": { "data": {
"currentUser": { "currentUser": {
"dropCampaigns": [], "dropCampaigns": [],
@ -762,12 +771,12 @@ class ErrorOnlyResponseDetectionTests(TestCase):
}, },
} }
result = detect_error_only_response(parsed_json) result: str | None = detect_error_only_response(parsed_json)
assert result is None assert result is None
def test_detects_error_only_in_list_of_responses(self) -> None: def test_detects_error_only_in_list_of_responses(self) -> None:
"""Ensure error-only detection works with list of responses.""" """Ensure error-only detection works with list of responses."""
parsed_json = [ parsed_json: list[dict[str, list[dict[str, str]]]] = [
{ {
"errors": [ "errors": [
{ {
@ -777,12 +786,12 @@ class ErrorOnlyResponseDetectionTests(TestCase):
}, },
] ]
result = detect_error_only_response(parsed_json) result: str | None = detect_error_only_response(parsed_json)
assert result == "error_only: rate limit exceeded" assert result == "error_only: rate limit exceeded"
def test_handles_json_repair_tuple_format(self) -> None: def test_handles_json_repair_tuple_format(self) -> None:
"""Ensure error-only detection works with json_repair tuple format.""" """Ensure error-only detection works with json_repair tuple format."""
parsed_json = ( parsed_json: tuple[dict[str, list[dict[str, str | list[str]]]], list[dict[str, str]]] = (
{ {
"errors": [ "errors": [
{ {
@ -794,26 +803,26 @@ class ErrorOnlyResponseDetectionTests(TestCase):
[{"json_repair": "log"}], [{"json_repair": "log"}],
) )
result = detect_error_only_response(parsed_json) result: str | None = detect_error_only_response(parsed_json)
assert result == "error_only: service timeout" assert result == "error_only: service timeout"
def test_returns_none_for_non_dict_input(self) -> None: def test_returns_none_for_non_dict_input(self) -> None:
"""Ensure non-dict input is handled gracefully.""" """Ensure non-dict input is handled gracefully."""
result = detect_error_only_response("invalid") result: str | None = detect_error_only_response("invalid")
assert result is None assert result is None
def test_returns_none_for_empty_errors_list(self) -> None: def test_returns_none_for_empty_errors_list(self) -> None:
"""Ensure empty errors list is not flagged as error-only.""" """Ensure empty errors list is not flagged as error-only."""
parsed_json = { parsed_json: dict[str, list[None]] = {
"errors": [], "errors": [],
} }
result = detect_error_only_response(parsed_json) result: str | None = detect_error_only_response(parsed_json)
assert result is None assert result is None
def test_handles_error_without_message_field(self) -> None: def test_handles_error_without_message_field(self) -> None:
"""Ensure errors without message field use default text.""" """Ensure errors without message field use default text."""
parsed_json = { parsed_json: dict[str, list[dict[str, list[str]]]] = {
"errors": [ "errors": [
{ {
"path": ["data"], "path": ["data"],
@ -821,5 +830,5 @@ class ErrorOnlyResponseDetectionTests(TestCase):
], ],
} }
result = detect_error_only_response(parsed_json) result: str | None = detect_error_only_response(parsed_json)
assert result == "error_only: unknown error" assert result == "error_only: unknown error"

View file

@ -19,7 +19,7 @@ class TestChatBadgeSetModel:
def test_create_badge_set(self) -> None: def test_create_badge_set(self) -> None:
"""Test creating a new badge set.""" """Test creating a new badge set."""
badge_set = ChatBadgeSet.objects.create(set_id="vip") badge_set: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="vip")
assert badge_set.set_id == "vip" assert badge_set.set_id == "vip"
assert badge_set.added_at is not None assert badge_set.added_at is not None
assert badge_set.updated_at is not None assert badge_set.updated_at is not None
@ -37,7 +37,7 @@ class TestChatBadgeSetModel:
ChatBadgeSet.objects.create(set_id="bits") ChatBadgeSet.objects.create(set_id="bits")
ChatBadgeSet.objects.create(set_id="vip") ChatBadgeSet.objects.create(set_id="vip")
badge_sets = list(ChatBadgeSet.objects.all()) badge_sets: list[ChatBadgeSet] = list(ChatBadgeSet.objects.all())
assert badge_sets[0].set_id == "bits" assert badge_sets[0].set_id == "bits"
assert badge_sets[1].set_id == "subscriber" assert badge_sets[1].set_id == "subscriber"
assert badge_sets[2].set_id == "vip" assert badge_sets[2].set_id == "vip"
@ -49,8 +49,8 @@ class TestChatBadgeModel:
def test_create_badge(self) -> None: def test_create_badge(self) -> None:
"""Test creating a new badge.""" """Test creating a new badge."""
badge_set = ChatBadgeSet.objects.create(set_id="vip") badge_set: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="vip")
badge = ChatBadge.objects.create( badge: ChatBadge = ChatBadge.objects.create(
badge_set=badge_set, badge_set=badge_set,
badge_id="1", badge_id="1",
image_url_1x="https://example.com/1x.png", image_url_1x="https://example.com/1x.png",
@ -72,7 +72,7 @@ class TestChatBadgeModel:
def test_unique_badge_set_and_id(self) -> None: def test_unique_badge_set_and_id(self) -> None:
"""Test that badge_set and badge_id combination must be unique.""" """Test that badge_set and badge_id combination must be unique."""
badge_set = ChatBadgeSet.objects.create(set_id="vip") badge_set: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="vip")
ChatBadge.objects.create( ChatBadge.objects.create(
badge_set=badge_set, badge_set=badge_set,
badge_id="1", badge_id="1",
@ -96,8 +96,8 @@ class TestChatBadgeModel:
def test_different_badge_ids_same_set(self) -> None: def test_different_badge_ids_same_set(self) -> None:
"""Test that different badge_ids can exist in the same set.""" """Test that different badge_ids can exist in the same set."""
badge_set = ChatBadgeSet.objects.create(set_id="bits") badge_set: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="bits")
badge1 = ChatBadge.objects.create( badge1: ChatBadge = ChatBadge.objects.create(
badge_set=badge_set, badge_set=badge_set,
badge_id="1", badge_id="1",
image_url_1x="https://example.com/1x.png", image_url_1x="https://example.com/1x.png",
@ -106,7 +106,7 @@ class TestChatBadgeModel:
title="Bits 1", title="Bits 1",
description="1 Bit", description="1 Bit",
) )
badge2 = ChatBadge.objects.create( badge2: ChatBadge = ChatBadge.objects.create(
badge_set=badge_set, badge_set=badge_set,
badge_id="100", badge_id="100",
image_url_1x="https://example.com/1x.png", image_url_1x="https://example.com/1x.png",
@ -122,8 +122,8 @@ class TestChatBadgeModel:
def test_nullable_click_fields(self) -> None: def test_nullable_click_fields(self) -> None:
"""Test that click_action and click_url can be null.""" """Test that click_action and click_url can be null."""
badge_set = ChatBadgeSet.objects.create(set_id="moderator") badge_set: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="moderator")
badge = ChatBadge.objects.create( badge: ChatBadge = ChatBadge.objects.create(
badge_set=badge_set, badge_set=badge_set,
badge_id="1", badge_id="1",
image_url_1x="https://example.com/1x.png", image_url_1x="https://example.com/1x.png",
@ -140,7 +140,7 @@ class TestChatBadgeModel:
def test_badge_cascade_delete(self) -> None: def test_badge_cascade_delete(self) -> None:
"""Test that badges are deleted when their badge set is deleted.""" """Test that badges are deleted when their badge set is deleted."""
badge_set = ChatBadgeSet.objects.create(set_id="test_set") badge_set: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="test_set")
ChatBadge.objects.create( ChatBadge.objects.create(
badge_set=badge_set, badge_set=badge_set,
badge_id="1", badge_id="1",
@ -161,7 +161,7 @@ class TestChatBadgeSchemas:
def test_chat_badge_version_schema_valid(self) -> None: def test_chat_badge_version_schema_valid(self) -> None:
"""Test that ChatBadgeVersionSchema validates correct data.""" """Test that ChatBadgeVersionSchema validates correct data."""
data = { data: dict[str, str] = {
"id": "1", "id": "1",
"image_url_1x": "https://static-cdn.jtvnw.net/badges/v1/example/1", "image_url_1x": "https://static-cdn.jtvnw.net/badges/v1/example/1",
"image_url_2x": "https://static-cdn.jtvnw.net/badges/v1/example/2", "image_url_2x": "https://static-cdn.jtvnw.net/badges/v1/example/2",
@ -172,14 +172,14 @@ class TestChatBadgeSchemas:
"click_url": "https://help.twitch.tv", "click_url": "https://help.twitch.tv",
} }
schema = ChatBadgeVersionSchema.model_validate(data) schema: ChatBadgeVersionSchema = ChatBadgeVersionSchema.model_validate(data)
assert schema.badge_id == "1" assert schema.badge_id == "1"
assert schema.title == "VIP" assert schema.title == "VIP"
assert schema.click_action == "visit_url" assert schema.click_action == "visit_url"
def test_chat_badge_version_schema_nullable_fields(self) -> None: def test_chat_badge_version_schema_nullable_fields(self) -> None:
"""Test that nullable fields in ChatBadgeVersionSchema work correctly.""" """Test that nullable fields in ChatBadgeVersionSchema work correctly."""
data = { data: dict[str, str | None] = {
"id": "1", "id": "1",
"image_url_1x": "https://static-cdn.jtvnw.net/badges/v1/example/1", "image_url_1x": "https://static-cdn.jtvnw.net/badges/v1/example/1",
"image_url_2x": "https://static-cdn.jtvnw.net/badges/v1/example/2", "image_url_2x": "https://static-cdn.jtvnw.net/badges/v1/example/2",
@ -190,13 +190,13 @@ class TestChatBadgeSchemas:
"click_url": None, "click_url": None,
} }
schema = ChatBadgeVersionSchema.model_validate(data) schema: ChatBadgeVersionSchema = ChatBadgeVersionSchema.model_validate(data)
assert schema.click_action is None assert schema.click_action is None
assert schema.click_url is None assert schema.click_url is None
def test_chat_badge_version_schema_missing_required(self) -> None: def test_chat_badge_version_schema_missing_required(self) -> None:
"""Test that ChatBadgeVersionSchema raises error on missing required fields.""" """Test that ChatBadgeVersionSchema raises error on missing required fields."""
data = { data: dict[str, str] = {
"id": "1", "id": "1",
"title": "VIP", "title": "VIP",
# Missing required image URLs and description # Missing required image URLs and description
@ -207,7 +207,7 @@ class TestChatBadgeSchemas:
def test_chat_badge_set_schema_valid(self) -> None: def test_chat_badge_set_schema_valid(self) -> None:
"""Test that ChatBadgeSetSchema validates correct data.""" """Test that ChatBadgeSetSchema validates correct data."""
data = { data: dict[str, str | list[dict[str, str]]] = {
"set_id": "vip", "set_id": "vip",
"versions": [ "versions": [
{ {
@ -223,14 +223,14 @@ class TestChatBadgeSchemas:
], ],
} }
schema = ChatBadgeSetSchema.model_validate(data) schema: ChatBadgeSetSchema = ChatBadgeSetSchema.model_validate(data)
assert schema.set_id == "vip" assert schema.set_id == "vip"
assert len(schema.versions) == 1 assert len(schema.versions) == 1
assert schema.versions[0].badge_id == "1" assert schema.versions[0].badge_id == "1"
def test_chat_badge_set_schema_multiple_versions(self) -> None: def test_chat_badge_set_schema_multiple_versions(self) -> None:
"""Test that ChatBadgeSetSchema handles multiple badge versions.""" """Test that ChatBadgeSetSchema handles multiple badge versions."""
data = { data: dict[str, str | list[dict[str, str | None]]] = {
"set_id": "bits", "set_id": "bits",
"versions": [ "versions": [
{ {
@ -256,7 +256,7 @@ class TestChatBadgeSchemas:
], ],
} }
schema = ChatBadgeSetSchema.model_validate(data) schema: ChatBadgeSetSchema = ChatBadgeSetSchema.model_validate(data)
assert schema.set_id == "bits" assert schema.set_id == "bits"
assert len(schema.versions) == 2 assert len(schema.versions) == 2
assert schema.versions[0].badge_id == "1" assert schema.versions[0].badge_id == "1"
@ -264,7 +264,7 @@ class TestChatBadgeSchemas:
def test_global_chat_badges_response_valid(self) -> None: def test_global_chat_badges_response_valid(self) -> None:
"""Test that GlobalChatBadgesResponse validates correct API response.""" """Test that GlobalChatBadgesResponse validates correct API response."""
data = { data: dict[str, list[dict[str, str | list[dict[str, str]]]]] = {
"data": [ "data": [
{ {
"set_id": "vip", "set_id": "vip",
@ -284,20 +284,20 @@ class TestChatBadgeSchemas:
], ],
} }
response = GlobalChatBadgesResponse.model_validate(data) response: GlobalChatBadgesResponse = GlobalChatBadgesResponse.model_validate(data)
assert len(response.data) == 1 assert len(response.data) == 1
assert response.data[0].set_id == "vip" assert response.data[0].set_id == "vip"
def test_global_chat_badges_response_empty(self) -> None: def test_global_chat_badges_response_empty(self) -> None:
"""Test that GlobalChatBadgesResponse validates empty response.""" """Test that GlobalChatBadgesResponse validates empty response."""
data = {"data": []} data: dict[str, list] = {"data": []}
response = GlobalChatBadgesResponse.model_validate(data) response: GlobalChatBadgesResponse = GlobalChatBadgesResponse.model_validate(data)
assert len(response.data) == 0 assert len(response.data) == 0
def test_chat_badge_schema_extra_forbidden(self) -> None: def test_chat_badge_schema_extra_forbidden(self) -> None:
"""Test that extra fields are forbidden in schemas.""" """Test that extra fields are forbidden in schemas."""
data = { data: dict[str, str | None] = {
"id": "1", "id": "1",
"image_url_1x": "https://example.com/1x.png", "image_url_1x": "https://example.com/1x.png",
"image_url_2x": "https://example.com/2x.png", "image_url_2x": "https://example.com/2x.png",

View file

@ -1,7 +1,9 @@
from __future__ import annotations from __future__ import annotations
import datetime
import json import json
from datetime import timedelta from datetime import timedelta
from typing import TYPE_CHECKING
from django.test import Client from django.test import Client
from django.test import TestCase from django.test import TestCase
@ -11,6 +13,9 @@ from twitch.models import DropCampaign
from twitch.models import Game from twitch.models import Game
from twitch.models import Organization from twitch.models import Organization
if TYPE_CHECKING:
from django.test.client import _MonkeyPatchedWSGIResponse
class ExportViewsTestCase(TestCase): class ExportViewsTestCase(TestCase):
"""Test export views for CSV and JSON formats.""" """Test export views for CSV and JSON formats."""
@ -20,13 +25,13 @@ class ExportViewsTestCase(TestCase):
self.client = Client() self.client = Client()
# Create test organization # Create test organization
self.org = Organization.objects.create( self.org: Organization = Organization.objects.create(
twitch_id="org123", twitch_id="org123",
name="Test Organization", name="Test Organization",
) )
# Create test game # Create test game
self.game = Game.objects.create( self.game: Game = Game.objects.create(
twitch_id="game123", twitch_id="game123",
name="Test Game", name="Test Game",
display_name="Test Game Display", display_name="Test Game Display",
@ -34,8 +39,8 @@ class ExportViewsTestCase(TestCase):
self.game.owners.add(self.org) self.game.owners.add(self.org)
# Create test campaign # Create test campaign
now = timezone.now() now: datetime.datetime = timezone.now()
self.campaign = DropCampaign.objects.create( self.campaign: DropCampaign = DropCampaign.objects.create(
twitch_id="campaign123", twitch_id="campaign123",
name="Test Campaign", name="Test Campaign",
description="A test campaign description", description="A test campaign description",
@ -46,7 +51,7 @@ class ExportViewsTestCase(TestCase):
def test_export_campaigns_csv(self) -> None: def test_export_campaigns_csv(self) -> None:
"""Test CSV export of campaigns.""" """Test CSV export of campaigns."""
response = self.client.get("/export/campaigns/csv/") response: _MonkeyPatchedWSGIResponse = self.client.get("/export/campaigns/csv/")
assert response.status_code == 200 assert response.status_code == 200
assert response["Content-Type"] == "text/csv" assert response["Content-Type"] == "text/csv"
assert b"Twitch ID" in response.content assert b"Twitch ID" in response.content
@ -55,7 +60,7 @@ class ExportViewsTestCase(TestCase):
def test_export_campaigns_json(self) -> None: def test_export_campaigns_json(self) -> None:
"""Test JSON export of campaigns.""" """Test JSON export of campaigns."""
response = self.client.get("/export/campaigns/json/") response: _MonkeyPatchedWSGIResponse = self.client.get("/export/campaigns/json/")
assert response.status_code == 200 assert response.status_code == 200
assert response["Content-Type"] == "application/json" assert response["Content-Type"] == "application/json"
@ -68,7 +73,7 @@ class ExportViewsTestCase(TestCase):
def test_export_games_csv(self) -> None: def test_export_games_csv(self) -> None:
"""Test CSV export of games.""" """Test CSV export of games."""
response = self.client.get("/export/games/csv/") response: _MonkeyPatchedWSGIResponse = self.client.get("/export/games/csv/")
assert response.status_code == 200 assert response.status_code == 200
assert response["Content-Type"] == "text/csv" assert response["Content-Type"] == "text/csv"
assert b"Twitch ID" in response.content assert b"Twitch ID" in response.content
@ -77,7 +82,7 @@ class ExportViewsTestCase(TestCase):
def test_export_games_json(self) -> None: def test_export_games_json(self) -> None:
"""Test JSON export of games.""" """Test JSON export of games."""
response = self.client.get("/export/games/json/") response: _MonkeyPatchedWSGIResponse = self.client.get("/export/games/json/")
assert response.status_code == 200 assert response.status_code == 200
assert response["Content-Type"] == "application/json" assert response["Content-Type"] == "application/json"
@ -89,7 +94,7 @@ class ExportViewsTestCase(TestCase):
def test_export_organizations_csv(self) -> None: def test_export_organizations_csv(self) -> None:
"""Test CSV export of organizations.""" """Test CSV export of organizations."""
response = self.client.get("/export/organizations/csv/") response: _MonkeyPatchedWSGIResponse = self.client.get("/export/organizations/csv/")
assert response.status_code == 200 assert response.status_code == 200
assert response["Content-Type"] == "text/csv" assert response["Content-Type"] == "text/csv"
assert b"Twitch ID" in response.content assert b"Twitch ID" in response.content
@ -98,7 +103,7 @@ class ExportViewsTestCase(TestCase):
def test_export_organizations_json(self) -> None: def test_export_organizations_json(self) -> None:
"""Test JSON export of organizations.""" """Test JSON export of organizations."""
response = self.client.get("/export/organizations/json/") response: _MonkeyPatchedWSGIResponse = self.client.get("/export/organizations/json/")
assert response.status_code == 200 assert response.status_code == 200
assert response["Content-Type"] == "application/json" assert response["Content-Type"] == "application/json"
@ -110,13 +115,13 @@ class ExportViewsTestCase(TestCase):
def test_export_campaigns_csv_with_filters(self) -> None: def test_export_campaigns_csv_with_filters(self) -> None:
"""Test CSV export of campaigns with status filter.""" """Test CSV export of campaigns with status filter."""
response = self.client.get("/export/campaigns/csv/?status=active") response: _MonkeyPatchedWSGIResponse = self.client.get("/export/campaigns/csv/?status=active")
assert response.status_code == 200 assert response.status_code == 200
assert b"campaign123" in response.content assert b"campaign123" in response.content
def test_export_campaigns_json_with_filters(self) -> None: def test_export_campaigns_json_with_filters(self) -> None:
"""Test JSON export of campaigns with status filter.""" """Test JSON export of campaigns with status filter."""
response = self.client.get("/export/campaigns/json/?status=active") response: _MonkeyPatchedWSGIResponse = self.client.get("/export/campaigns/json/?status=active")
assert response.status_code == 200 assert response.status_code == 200
data = json.loads(response.content) data = json.loads(response.content)

View file

@ -159,11 +159,11 @@ class RSSFeedTestCase(TestCase):
def test_organization_campaign_feed_filters_correctly(self) -> None: def test_organization_campaign_feed_filters_correctly(self) -> None:
"""Test organization campaign feed only shows campaigns for that organization.""" """Test organization campaign feed only shows campaigns for that organization."""
# Create another organization with a game and campaign # Create another organization with a game and campaign
other_org = Organization.objects.create( other_org: Organization = Organization.objects.create(
twitch_id="other-org-123", twitch_id="other-org-123",
name="Other Organization", name="Other Organization",
) )
other_game = Game.objects.create( other_game: Game = Game.objects.create(
twitch_id="other-game-456", twitch_id="other-game-456",
slug="other-game-2", slug="other-game-2",
name="Other Game 2", name="Other Game 2",
@ -299,21 +299,48 @@ def test_campaign_feed_queries_do_not_scale_with_items(
) )
game.owners.add(org) game.owners.add(org)
campaigns: list[DropCampaign] = []
channels: list[Channel] = []
benefits: list[DropBenefit] = []
for i in range(50): for i in range(50):
campaign: DropCampaign = DropCampaign.objects.create( campaigns.append(
DropCampaign(
twitch_id=f"scale-campaign-{i}", twitch_id=f"scale-campaign-{i}",
name=f"Scale Campaign {i}", name=f"Scale Campaign {i}",
game=game, game=game,
start_at=timezone.now(), start_at=timezone.now(),
end_at=timezone.now() + timedelta(days=7), end_at=timezone.now() + timedelta(days=7),
operation_names=["DropCampaignDetails"], operation_names=["DropCampaignDetails"],
),
) )
channel: Channel = Channel.objects.create( channels.append(
Channel(
twitch_id=f"scale-channel-{i}", twitch_id=f"scale-channel-{i}",
name=f"scalechannel{i}", name=f"scalechannel{i}",
display_name=f"ScaleChannel{i}", display_name=f"ScaleChannel{i}",
),
) )
campaign.allow_channels.add(channel) benefits.append(
DropBenefit(
twitch_id=f"scale-benefit-{i}",
name=f"Scale Benefit {i}",
distribution_type="ITEM",
),
)
DropCampaign.objects.bulk_create(campaigns)
Channel.objects.bulk_create(channels)
DropBenefit.objects.bulk_create(benefits)
assert len(DropCampaign.objects.all()) == 50
assert len(Channel.objects.all()) == 50
assert len(DropBenefit.objects.all()) == 50
channels_by_id: dict[str, Channel] = {c.twitch_id: c for c in channels}
benefits_by_id: dict[str, DropBenefit] = {b.twitch_id: b for b in benefits}
for i, campaign in enumerate(campaigns):
drop: TimeBasedDrop = TimeBasedDrop.objects.create( drop: TimeBasedDrop = TimeBasedDrop.objects.create(
twitch_id=f"scale-drop-{i}", twitch_id=f"scale-drop-{i}",
name=f"Scale Drop {i}", name=f"Scale Drop {i}",
@ -322,12 +349,8 @@ def test_campaign_feed_queries_do_not_scale_with_items(
start_at=timezone.now(), start_at=timezone.now(),
end_at=timezone.now() + timedelta(hours=1), end_at=timezone.now() + timedelta(hours=1),
) )
benefit: DropBenefit = DropBenefit.objects.create( campaign.allow_channels.add(channels_by_id[f"scale-channel-{i}"])
twitch_id=f"scale-benefit-{i}", drop.benefits.add(benefits_by_id[f"scale-benefit-{i}"])
name=f"Scale Benefit {i}",
distribution_type="ITEM",
)
drop.benefits.add(benefit)
url: str = reverse("twitch:campaign_feed") url: str = reverse("twitch:campaign_feed")

View file

@ -67,7 +67,7 @@ class GameOwnerOrganizationTests(TestCase):
game: Game = Game.objects.get(twitch_id="263490") game: Game = Game.objects.get(twitch_id="263490")
org1: Organization = Organization.objects.get(twitch_id="d32de13d-937e-4196-8198-1a7f875f295a") org1: Organization = Organization.objects.get(twitch_id="d32de13d-937e-4196-8198-1a7f875f295a")
org2: Organization = Organization.objects.get(twitch_id="other-org-id") org2: Organization = Organization.objects.get(twitch_id="other-org-id")
owners = list(game.owners.all()) owners: list[Organization] = list(game.owners.all())
assert org1 in owners assert org1 in owners
assert org2 in owners assert org2 in owners
assert any(o.name == "Twitch Gaming" for o in owners) assert any(o.name == "Twitch Gaming" for o in owners)

View file

@ -43,7 +43,7 @@ class TestGetFormatUrl:
def test_url_with_query_params(self) -> None: def test_url_with_query_params(self) -> None:
"""Test URL with query parameters preserves them.""" """Test URL with query parameters preserves them."""
result = get_format_url("/static/img/photo.jpg?v=123", "webp") result: str = get_format_url("/static/img/photo.jpg?v=123", "webp")
assert result == "/static/img/photo.webp?v=123" assert result == "/static/img/photo.webp?v=123"
def test_full_url(self) -> None: def test_full_url(self) -> None:
@ -110,7 +110,7 @@ class TestPictureTag:
def test_xss_prevention_in_src(self) -> None: def test_xss_prevention_in_src(self) -> None:
"""Test that XSS attempts in src are escaped.""" """Test that XSS attempts in src are escaped."""
malicious_src = '"><script>alert("xss")</script><img src="' malicious_src = '"><script>alert("xss")</script><img src="'
result = picture(malicious_src) result: SafeString = picture(malicious_src)
# Should escape the malicious code # Should escape the malicious code
assert "<script>" not in result assert "<script>" not in result

View file

@ -6,6 +6,8 @@ from twitch.schemas import DropBenefitSchema
from twitch.schemas import DropCampaignSchema from twitch.schemas import DropCampaignSchema
from twitch.schemas import GameSchema from twitch.schemas import GameSchema
from twitch.schemas import GraphQLResponse from twitch.schemas import GraphQLResponse
from twitch.schemas import Reward
from twitch.schemas import RewardCampaign
from twitch.schemas import TimeBasedDropSchema from twitch.schemas import TimeBasedDropSchema
@ -466,7 +468,7 @@ def test_reward_campaigns_available_to_user() -> None:
assert response.data.reward_campaigns_available_to_user is not None assert response.data.reward_campaigns_available_to_user is not None
assert len(response.data.reward_campaigns_available_to_user) == 1 assert len(response.data.reward_campaigns_available_to_user) == 1
reward_campaign = response.data.reward_campaigns_available_to_user[0] reward_campaign: RewardCampaign = response.data.reward_campaigns_available_to_user[0]
assert reward_campaign.twitch_id == "dc4ff0b4-4de0-11ef-9ec3-621fb0811846" assert reward_campaign.twitch_id == "dc4ff0b4-4de0-11ef-9ec3-621fb0811846"
assert reward_campaign.name == "Buy 1 new sub, get 3 months of Apple TV+" assert reward_campaign.name == "Buy 1 new sub, get 3 months of Apple TV+"
assert reward_campaign.brand == "Apple TV+" assert reward_campaign.brand == "Apple TV+"
@ -487,7 +489,7 @@ def test_reward_campaigns_available_to_user() -> None:
# Verify rewards # Verify rewards
assert len(reward_campaign.rewards) == 1 assert len(reward_campaign.rewards) == 1
reward = reward_campaign.rewards[0] reward: Reward = reward_campaign.rewards[0]
assert reward.twitch_id == "dc2e9810-4de0-11ef-9ec3-621fb0811846" assert reward.twitch_id == "dc2e9810-4de0-11ef-9ec3-621fb0811846"
assert reward.name == "3 months of Apple TV+" assert reward.name == "3 months of Apple TV+"
assert reward.banner_image is not None assert reward.banner_image is not None

View file

@ -0,0 +1,113 @@
from __future__ import annotations
import os
from datetime import UTC
from datetime import datetime
from typing import TYPE_CHECKING
import pytest
from django.conf import settings
from django.test import TestCase
from django.urls import reverse
from twitch import sitemaps
from twitch.models import Channel
from twitch.models import ChatBadgeSet
from twitch.models import DropCampaign
from twitch.models import Game
from twitch.models import Organization
from twitch.models import RewardCampaign
if TYPE_CHECKING:
from pathlib import Path
class SitemapTests(TestCase):
"""Tests for Twitch sitemaps."""
def test_static_view_sitemap_items_and_location(self) -> None:
"""Test that StaticViewSitemap returns expected items and correct locations."""
sitemap = sitemaps.TwitchSitemapGenerator()
items: list[str] = sitemap.items()
expected: list[str] = [
"twitch:dashboard",
"twitch:campaign_list",
"twitch:reward_campaign_list",
"twitch:games_grid",
"twitch:games_list",
"twitch:org_list",
"twitch:channel_list",
"twitch:badge_list",
"twitch:emote_gallery",
"twitch:search",
# the two items below were added later and need coverage
"twitch:dataset_backups",
"twitch:docs_rss",
]
assert set(items) == set(expected)
for name in items:
assert sitemap.location(name) == reverse(name)
def test_game_sitemap_items_and_location(self) -> None:
"""Test that GameSitemap returns expected items and correct locations."""
game: Game = Game.objects.create(twitch_id="g-1", display_name="Test Game")
sitemap = sitemaps.GameSitemap()
items: list[Game] = list(sitemap.items())
assert game in items
assert sitemap.location(game) == reverse("twitch:game_detail", args=[game.twitch_id])
org: Organization = Organization.objects.create(twitch_id="o-1", name="Org One")
channel: Channel = Channel.objects.create(twitch_id="c-1", name="chan", display_name="Chan One")
badge_set: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="b-1")
game: Game = Game.objects.create(twitch_id="g-2", display_name="Game Two")
campaign: DropCampaign = DropCampaign.objects.create(twitch_id="dc-1", name="Campaign One", game=game)
reward: RewardCampaign = RewardCampaign.objects.create(twitch_id="rc-1", name="Reward One")
campaign_sitemap = sitemaps.CampaignSitemap()
assert campaign in list(campaign_sitemap.items())
assert campaign_sitemap.location(campaign) == reverse("twitch:campaign_detail", args=[campaign.twitch_id])
org_sitemap = sitemaps.OrganizationSitemap()
assert org in list(org_sitemap.items())
assert org_sitemap.location(org) == reverse("twitch:organization_detail", args=[org.twitch_id])
channel_sitemap = sitemaps.ChannelSitemap()
assert channel in list(channel_sitemap.items())
assert channel_sitemap.location(channel) == reverse("twitch:channel_detail", args=[channel.twitch_id])
badge_sitemap = sitemaps.BadgeSitemap()
assert badge_set in list(badge_sitemap.items())
assert badge_sitemap.location(badge_set) == reverse("twitch:badge_set_detail", args=[badge_set.set_id])
reward_sitemap = sitemaps.RewardCampaignSitemap()
assert reward in list(reward_sitemap.items())
assert reward_sitemap.location(reward) == reverse("twitch:reward_campaign_detail", args=[reward.twitch_id])
@pytest.mark.django_db
def test_static_view_lastmod_behavior(tmp_path: sitemaps.Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""Standalone pytest test for :meth:`TwitchSitemapGenerator.lastmod`.
We exercise both the docs RSS branch (which should always return ``None``)
and the dataset backups branch, including scenarios with and without
backup files present.
"""
sitemap = sitemaps.TwitchSitemapGenerator()
assert sitemap.lastmod("twitch:docs_rss") is None
monkeypatch.setattr(settings, "DATA_DIR", tmp_path)
assert sitemap.lastmod("twitch:dataset_backups") is None
datasets: Path = tmp_path / "datasets"
datasets.mkdir()
older: Path = datasets / "dataset_backup_old.zip"
newer: Path = datasets / "dataset_backup_new.zip"
older.write_text("old", encoding="utf-8")
newer.write_text("new", encoding="utf-8")
os.utime(older, (1_000, 1_000))
os.utime(newer, (2_000, 2_000))
expected: datetime = datetime.fromtimestamp(2_000, tz=UTC)
assert sitemap.lastmod("twitch:dataset_backups") == expected

View file

@ -0,0 +1,113 @@
from __future__ import annotations
from datetime import datetime
from datetime import timedelta
from django.test import TestCase
from django.utils import timezone
from twitch.models import Channel
from twitch.models import ChatBadgeSet
from twitch.models import DropBenefit
from twitch.models import DropCampaign
from twitch.models import Game
from twitch.models import Organization
from twitch.models import RewardCampaign
from twitch.models import TimeBasedDrop
from twitch.sitemaps import TwitchSitemapGenerator
class StaticViewSitemapLastModTests(TestCase):
"""Tests for StaticViewSitemap lastmod method."""
def setUp(self) -> None:
"""Set up StaticViewSitemap instance for testing."""
self.sitemap = TwitchSitemapGenerator()
def test_search_is_none(self) -> None:
"""Test that lastmod for search sitemap item is None."""
assert self.sitemap.lastmod("twitch:search") is None
def test_campaigns_latest(self) -> None:
"""Test that lastmod for campaign list sitemap item reflects latest updated_at among DropCampaigns."""
game: Game = Game.objects.create(twitch_id="g1")
older: datetime = timezone.now() - timedelta(days=2)
newer: datetime = timezone.now() - timedelta(days=1)
c1: DropCampaign = DropCampaign.objects.create(twitch_id="dc1", name="c1", game=game)
c2: DropCampaign = DropCampaign.objects.create(twitch_id="dc2", name="c2", game=game)
DropCampaign.objects.filter(pk=c1.pk).update(updated_at=older)
DropCampaign.objects.filter(pk=c2.pk).update(updated_at=newer)
assert self.sitemap.lastmod("twitch:campaign_list") == newer
assert self.sitemap.lastmod("twitch:dashboard") == newer
def test_reward_campaigns_latest(self) -> None:
"""Test that lastmod for reward campaign list sitemap item reflects latest updated_at among RewardCampaigns."""
older: datetime = timezone.now() - timedelta(days=3)
newer: datetime = timezone.now() - timedelta(days=1)
r1: RewardCampaign = RewardCampaign.objects.create(twitch_id="r1", name="r1")
r2: RewardCampaign = RewardCampaign.objects.create(twitch_id="r2", name="r2")
RewardCampaign.objects.filter(pk=r1.pk).update(updated_at=older)
RewardCampaign.objects.filter(pk=r2.pk).update(updated_at=newer)
assert self.sitemap.lastmod("twitch:reward_campaign_list") == newer
def test_games_latest(self) -> None:
"""Test that lastmod for games grid sitemap item reflects latest updated_at among Games."""
older: datetime = timezone.now() - timedelta(days=4)
newer: datetime = timezone.now() - timedelta(days=1)
g1: Game = Game.objects.create(twitch_id="g2")
g2: Game = Game.objects.create(twitch_id="g3")
Game.objects.filter(pk=g1.pk).update(updated_at=older)
Game.objects.filter(pk=g2.pk).update(updated_at=newer)
assert self.sitemap.lastmod("twitch:games_grid") == newer
def test_orgs_latest(self) -> None:
"""Test that lastmod for org list sitemap item reflects latest updated_at among Organizations."""
older: datetime = timezone.now() - timedelta(days=5)
newer: datetime = timezone.now() - timedelta(days=1)
o1: Organization = Organization.objects.create(twitch_id="o1", name="Org1")
o2: Organization = Organization.objects.create(twitch_id="o2", name="Org2")
Organization.objects.filter(pk=o1.pk).update(updated_at=older)
Organization.objects.filter(pk=o2.pk).update(updated_at=newer)
assert self.sitemap.lastmod("twitch:org_list") == newer
def test_channel_list_none(self) -> None:
"""Test that lastmod for channel list sitemap item is None since Channel doesn't have updated_at."""
Channel.objects.create(twitch_id="ch1", name="n1", display_name="D1")
assert self.sitemap.lastmod("twitch:channel_list") is None
def test_badge_list_latest(self) -> None:
"""Test that lastmod for badge list sitemap item reflects latest updated_at among ChatBadgeSets."""
older: datetime = timezone.now() - timedelta(days=6)
newer: datetime = timezone.now() - timedelta(days=1)
b1: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="s1")
b2: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="s2")
ChatBadgeSet.objects.filter(pk=b1.pk).update(updated_at=older)
ChatBadgeSet.objects.filter(pk=b2.pk).update(updated_at=newer)
assert self.sitemap.lastmod("twitch:badge_list") == newer
def test_emote_gallery_uses_campaign_updated(self) -> None:
"""Test that lastmod for emote gallery sitemap item reflects latest updated_at among DropCampaigns associated with emote benefits.""" # noqa: E501
game: Game = Game.objects.create(twitch_id="g_emote")
campaign: DropCampaign = DropCampaign.objects.create(twitch_id="dc_emote", name="em", game=game)
benefit: DropBenefit = DropBenefit.objects.create(
twitch_id="b_emote",
distribution_type="EMOTE",
image_asset_url="http://example.com/e.png",
)
drop: TimeBasedDrop = TimeBasedDrop.objects.create(twitch_id="tbd_emote", name="drop1", campaign=campaign)
drop.benefits.add(benefit)
newer: datetime = timezone.now() - timedelta(hours=1)
DropCampaign.objects.filter(pk=campaign.pk).update(updated_at=newer)
assert self.sitemap.lastmod("twitch:emote_gallery") == newer

View file

@ -2,12 +2,17 @@ from __future__ import annotations
import datetime import datetime
import json import json
import os
from datetime import UTC
from datetime import timedelta from datetime import timedelta
from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Any from typing import Any
from typing import Literal from typing import Literal
import pytest import pytest
from django.conf import settings
from django.core.cache import cache
from django.core.handlers.wsgi import WSGIRequest from django.core.handlers.wsgi import WSGIRequest
from django.core.paginator import Paginator from django.core.paginator import Paginator
from django.test import RequestFactory from django.test import RequestFactory
@ -21,6 +26,7 @@ from twitch.models import DropBenefit
from twitch.models import DropCampaign from twitch.models import DropCampaign
from twitch.models import Game from twitch.models import Game
from twitch.models import Organization from twitch.models import Organization
from twitch.models import RewardCampaign
from twitch.models import TimeBasedDrop from twitch.models import TimeBasedDrop
from twitch.views import Page from twitch.views import Page
from twitch.views import _build_breadcrumb_schema from twitch.views import _build_breadcrumb_schema
@ -29,6 +35,8 @@ from twitch.views import _build_seo_context
from twitch.views import _truncate_description from twitch.views import _truncate_description
if TYPE_CHECKING: if TYPE_CHECKING:
from pathlib import Path
from django.core.handlers.wsgi import WSGIRequest from django.core.handlers.wsgi import WSGIRequest
from django.test import Client from django.test import Client
from django.test.client import _MonkeyPatchedWSGIResponse from django.test.client import _MonkeyPatchedWSGIResponse
@ -423,7 +431,7 @@ class TestChannelListView:
@pytest.mark.django_db @pytest.mark.django_db
def test_dashboard_dedupes_campaigns_for_multi_owner_game(self, client: Client) -> None: def test_dashboard_dedupes_campaigns_for_multi_owner_game(self, client: Client) -> None:
"""Dashboard should not render duplicate campaign cards when a game has multiple owners.""" """Dashboard should not render duplicate campaign cards when a game has multiple owners."""
now = timezone.now() now: datetime.datetime = timezone.now()
org1: Organization = Organization.objects.create(twitch_id="org_a", name="Org A") org1: Organization = Organization.objects.create(twitch_id="org_a", name="Org A")
org2: Organization = Organization.objects.create(twitch_id="org_b", name="Org B") org2: Organization = Organization.objects.create(twitch_id="org_b", name="Org B")
game: Game = Game.objects.create(twitch_id="game_multi_owner", name="game", display_name="Multi Owner") game: Game = Game.objects.create(twitch_id="game_multi_owner", name="game", display_name="Multi Owner")
@ -474,7 +482,7 @@ class TestChannelListView:
now: datetime.datetime = timezone.now() now: datetime.datetime = timezone.now()
# Create 150 campaigns to test pagination # Create 150 campaigns to test pagination
campaigns = [ campaigns: list[DropCampaign] = [
DropCampaign( DropCampaign(
twitch_id=f"c{i}", twitch_id=f"c{i}",
name=f"Campaign {i}", name=f"Campaign {i}",
@ -698,7 +706,7 @@ class TestChannelListView:
now: datetime.datetime = timezone.now() now: datetime.datetime = timezone.now()
# Create 150 active campaigns for game g1 # Create 150 active campaigns for game g1
campaigns = [ campaigns: list[DropCampaign] = [
DropCampaign( DropCampaign(
twitch_id=f"c{i}", twitch_id=f"c{i}",
name=f"Campaign {i}", name=f"Campaign {i}",
@ -752,7 +760,7 @@ class TestChannelListView:
operation_names=["DropCampaignDetails"], operation_names=["DropCampaignDetails"],
) )
drop = TimeBasedDrop.objects.create( drop: TimeBasedDrop = TimeBasedDrop.objects.create(
twitch_id="d1", twitch_id="d1",
name="Drop", name="Drop",
campaign=campaign, campaign=campaign,
@ -760,14 +768,14 @@ class TestChannelListView:
required_subs=1, required_subs=1,
) )
benefit = DropBenefit.objects.create( benefit: DropBenefit = DropBenefit.objects.create(
twitch_id="b1", twitch_id="b1",
name="Diana", name="Diana",
distribution_type="BADGE", distribution_type="BADGE",
) )
drop.benefits.add(benefit) drop.benefits.add(benefit)
badge_set = ChatBadgeSet.objects.create(set_id="diana") badge_set: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="diana")
ChatBadge.objects.create( ChatBadge.objects.create(
badge_set=badge_set, badge_set=badge_set,
badge_id="1", badge_id="1",
@ -783,7 +791,7 @@ class TestChannelListView:
assert response.status_code == 200 assert response.status_code == 200
# The campaign detail page prints a syntax-highlighted JSON block; the badge description should be present. # The campaign detail page prints a syntax-highlighted JSON block; the badge description should be present.
html = response.content.decode("utf-8") html: str = response.content.decode("utf-8")
assert "This badge was earned by subscribing." in html assert "This badge was earned by subscribing." in html
@pytest.mark.django_db @pytest.mark.django_db
@ -1007,11 +1015,12 @@ class TestSEOMetaTags:
) -> None: ) -> None:
"""Test campaign detail view has breadcrumb schema.""" """Test campaign detail view has breadcrumb schema."""
campaign: DropCampaign = game_with_campaign["campaign"] campaign: DropCampaign = game_with_campaign["campaign"]
url = reverse("twitch:campaign_detail", args=[campaign.twitch_id]) url: str = reverse("twitch:campaign_detail", args=[campaign.twitch_id])
response: _MonkeyPatchedWSGIResponse = client.get(url) response: _MonkeyPatchedWSGIResponse = client.get(url)
assert response.status_code == 200 assert response.status_code == 200
assert "breadcrumb_schema" in response.context assert "breadcrumb_schema" in response.context
# breadcrumb_schema is JSON-dumped in context # breadcrumb_schema is JSON-dumped in context
breadcrumb_str = response.context["breadcrumb_schema"] breadcrumb_str = response.context["breadcrumb_schema"]
breadcrumb = json.loads(breadcrumb_str) breadcrumb = json.loads(breadcrumb_str)
@ -1025,7 +1034,7 @@ class TestSEOMetaTags:
) -> None: ) -> None:
"""Test campaign detail view has modified_date.""" """Test campaign detail view has modified_date."""
campaign: DropCampaign = game_with_campaign["campaign"] campaign: DropCampaign = game_with_campaign["campaign"]
url = reverse("twitch:campaign_detail", args=[campaign.twitch_id]) url: str = reverse("twitch:campaign_detail", args=[campaign.twitch_id])
response: _MonkeyPatchedWSGIResponse = client.get(url) response: _MonkeyPatchedWSGIResponse = client.get(url)
assert response.status_code == 200 assert response.status_code == 200
@ -1075,16 +1084,18 @@ class TestSEOMetaTags:
@pytest.mark.django_db @pytest.mark.django_db
class TestSitemapView: class TestSitemapViews:
"""Tests for the sitemap.xml view.""" """Tests for the split sitemap index and section files."""
@pytest.fixture @pytest.fixture
def sample_entities(self) -> dict[str, Any]: def sample_entities(
"""Create sample entities for sitemap testing. self,
) -> dict[str, Organization | Game | Channel | DropCampaign | RewardCampaign | ChatBadgeSet]:
"""Fixture to create sample entities for testing sitemap sections.
Returns: Returns:
dict[str, Any]: A dictionary containing the created organization, game, channel, campaign, and badge set. dict[str, Organization | Game | Channel | DropCampaign | RewardCampaign | ChatBadgeSet]: Dictionary of sample entities created for testing.
""" """ # noqa: E501
org: Organization = Organization.objects.create(twitch_id="org1", name="Test Org") org: Organization = Organization.objects.create(twitch_id="org1", name="Test Org")
game: Game = Game.objects.create( game: Game = Game.objects.create(
twitch_id="game1", twitch_id="game1",
@ -1092,7 +1103,11 @@ class TestSitemapView:
display_name="Test Game", display_name="Test Game",
) )
game.owners.add(org) game.owners.add(org)
channel: Channel = Channel.objects.create(twitch_id="ch1", name="ch1", display_name="Channel 1") channel: Channel = Channel.objects.create(
twitch_id="ch1",
name="ch1",
display_name="Channel 1",
)
campaign: DropCampaign = DropCampaign.objects.create( campaign: DropCampaign = DropCampaign.objects.create(
twitch_id="camp1", twitch_id="camp1",
name="Test Campaign", name="Test Campaign",
@ -1100,118 +1115,122 @@ class TestSitemapView:
game=game, game=game,
operation_names=["DropCampaignDetails"], operation_names=["DropCampaignDetails"],
) )
reward: RewardCampaign = RewardCampaign.objects.create(
twitch_id="reward1",
name="Test Reward",
)
badge: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="badge1") badge: ChatBadgeSet = ChatBadgeSet.objects.create(set_id="badge1")
return { return {
"org": org, "org": org,
"game": game, "game": game,
"channel": channel, "channel": channel,
"campaign": campaign, "campaign": campaign,
"reward": reward,
"badge": badge, "badge": badge,
} }
def test_sitemap_view_returns_xml(self, client: Client, sample_entities: dict[str, Any]) -> None: def test_index_contains_sections(self, client: Client) -> None:
"""Test sitemap view returns XML content.""" """Test that the sitemap index references all expected sections.
Args:
client (Client): Django test client fixture for making HTTP requests.
"""
response: _MonkeyPatchedWSGIResponse = client.get("/sitemap.xml") response: _MonkeyPatchedWSGIResponse = client.get("/sitemap.xml")
assert response.status_code == 200 assert response.status_code == 200
assert response["Content-Type"] == "application/xml" assert response["Content-Type"] == "application/xml"
def test_sitemap_contains_xml_declaration(self, client: Client, sample_entities: dict[str, Any]) -> None:
"""Test sitemap contains proper XML declaration."""
response: _MonkeyPatchedWSGIResponse = client.get("/sitemap.xml")
content = response.content.decode()
assert content.startswith('<?xml version="1.0" encoding="UTF-8"?>')
def test_sitemap_contains_urlset(self, client: Client, sample_entities: dict[str, Any]) -> None:
"""Test sitemap contains urlset element."""
response: _MonkeyPatchedWSGIResponse = client.get("/sitemap.xml")
content: str = response.content.decode() content: str = response.content.decode()
assert "<urlset" in content assert "<sitemapindex" in content
assert "</urlset>" in content for section in [
"sitemap-static.xml",
"sitemap-games.xml",
"sitemap-campaigns.xml",
"sitemap-organizations.xml",
"sitemap-channels.xml",
"sitemap-badges.xml",
"sitemap-reward-campaigns.xml",
]:
assert section in content
def test_sitemap_contains_static_pages(self, client: Client, sample_entities: dict[str, Any]) -> None: def test_sections_provide_expected_urls(self, client: Client, sample_entities: dict[str, Any]) -> None:
"""Test sitemap includes static pages.""" """Test that each sitemap section returns expected URLs for the entities created in the fixture.
response: _MonkeyPatchedWSGIResponse = client.get("/sitemap.xml")
content: str = response.content.decode()
# Check for some static pages
assert "<loc>http://testserver/</loc>" in content or "<loc>http://localhost:8000/</loc>" in content
assert "/campaigns/" in content
assert "/games/" in content
def test_sitemap_contains_game_detail_pages( Args:
self, client (Client): Django test client fixture for making HTTP requests.
client: Client, sample_entities (dict[str, Any]): Dictionary of sample entities created in the fixture.
sample_entities: dict[str, Any], """
) -> None: # games
"""Test sitemap includes game detail pages."""
game: Game = sample_entities["game"] game: Game = sample_entities["game"]
response: _MonkeyPatchedWSGIResponse = client.get("/sitemap.xml") response: _MonkeyPatchedWSGIResponse = client.get("/sitemap-games.xml")
content: str = response.content.decode() assert response.status_code == 200
assert f"/games/{game.twitch_id}/" in content assert "<urlset" in response.content.decode()
assert f"/games/{game.twitch_id}/" in response.content.decode()
def test_sitemap_contains_campaign_detail_pages( # campaigns
self,
client: Client,
sample_entities: dict[str, Any],
) -> None:
"""Test sitemap includes campaign detail pages."""
campaign: DropCampaign = sample_entities["campaign"] campaign: DropCampaign = sample_entities["campaign"]
response: _MonkeyPatchedWSGIResponse = client.get("/sitemap.xml") response: _MonkeyPatchedWSGIResponse = client.get("/sitemap-campaigns.xml")
content: str = response.content.decode() assert f"/campaigns/{campaign.twitch_id}/" in response.content.decode()
assert f"/campaigns/{campaign.twitch_id}/" in content
def test_sitemap_contains_organization_detail_pages( # organizations
self,
client: Client,
sample_entities: dict[str, Any],
) -> None:
"""Test sitemap includes organization detail pages."""
org: Organization = sample_entities["org"] org: Organization = sample_entities["org"]
response: _MonkeyPatchedWSGIResponse = client.get("/sitemap.xml") response: _MonkeyPatchedWSGIResponse = client.get("/sitemap-organizations.xml")
content: str = response.content.decode() assert f"/organizations/{org.twitch_id}/" in response.content.decode()
assert f"/organizations/{org.twitch_id}/" in content
def test_sitemap_contains_channel_detail_pages( # channels
self,
client: Client,
sample_entities: dict[str, Any],
) -> None:
"""Test sitemap includes channel detail pages."""
channel: Channel = sample_entities["channel"] channel: Channel = sample_entities["channel"]
response: _MonkeyPatchedWSGIResponse = client.get("/sitemap.xml") response: _MonkeyPatchedWSGIResponse = client.get("/sitemap-channels.xml")
content: str = response.content.decode() assert f"/channels/{channel.twitch_id}/" in response.content.decode()
assert f"/channels/{channel.twitch_id}/" in content
def test_sitemap_contains_badge_detail_pages( # badges
badge: ChatBadgeSet = sample_entities["badge"]
response: _MonkeyPatchedWSGIResponse = client.get("/sitemap-badges.xml")
assert f"/badges/{badge.set_id}/" in response.content.decode()
# reward campaigns
reward: RewardCampaign = sample_entities["reward"]
response: _MonkeyPatchedWSGIResponse = client.get("/sitemap-reward-campaigns.xml")
assert f"/reward-campaigns/{reward.twitch_id}/" in response.content.decode()
# static
response: _MonkeyPatchedWSGIResponse = client.get("/sitemap-static.xml")
static_content: str = response.content.decode()
assert "<loc>http://testserver/</loc>" in static_content
assert "/campaigns/" in static_content
assert "/games/" in static_content
def test_static_sitemap_lastmod_and_docs_rss(
self, self,
client: Client, client: Client,
sample_entities: dict[str, Any], tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None: ) -> None:
"""Test sitemap includes badge detail pages.""" """Ensure the XML output contains correct lastmod for backups and skips docs RSS."""
badge: ChatBadge = sample_entities["badge"] # configure a fake DATA_DIR with backup files
response: _MonkeyPatchedWSGIResponse = client.get("/sitemap.xml") cache.clear()
content: str = response.content.decode() monkeypatch.setattr(settings, "DATA_DIR", tmp_path)
assert f"/badges/{badge.set_id}/" in content # pyright: ignore[reportAttributeAccessIssue] datasets: Path = tmp_path / "datasets"
datasets.mkdir()
older: Path = datasets / "dataset_backup_old.zip"
newer: Path = datasets / "dataset_backup_new.zip"
older.write_text("old", encoding="utf-8")
newer.write_text("new", encoding="utf-8")
os.utime(older, (1_000, 1_000))
os.utime(newer, (2_000, 2_000))
def test_sitemap_includes_priority(self, client: Client, sample_entities: dict[str, Any]) -> None: response: _MonkeyPatchedWSGIResponse = client.get("/sitemap-static.xml")
"""Test sitemap includes priority values."""
response: _MonkeyPatchedWSGIResponse = client.get("/sitemap.xml")
content: str = response.content.decode() content: str = response.content.decode()
assert "<priority>" in content
assert "</priority>" in content
def test_sitemap_includes_changefreq(self, client: Client, sample_entities: dict[str, Any]) -> None: # lastmod should match the newer file's timestamp. Django's
"""Test sitemap includes changefreq values.""" # sitemap renderer outputs only the date portion, so check for that.
response: _MonkeyPatchedWSGIResponse = client.get("/sitemap.xml") expected_date: str = datetime.datetime.fromtimestamp(2_000, tz=UTC).date().isoformat()
content: str = response.content.decode() assert f"<lastmod>{expected_date}</lastmod>" in content
assert "<changefreq>" in content
assert "</changefreq>" in content
def test_sitemap_includes_lastmod(self, client: Client, sample_entities: dict[str, Any]) -> None: # docs RSS entry must not include a lastmod element
"""Test sitemap includes lastmod for detail pages.""" # find the docs_rss loc and assert no <lastmod> on the next line
response: _MonkeyPatchedWSGIResponse = client.get("/sitemap.xml") assert "<loc>http://testserver/docs/rss/</loc>" in content
content: str = response.content.decode() sections: list[str] = content.split("<loc>http://testserver/docs/rss/</loc>")
# Check for lastmod in game or campaign entries assert len(sections) >= 2
assert "<lastmod>" in content after: str = sections[1]
assert "<lastmod>" not in after.split("</url>", maxsplit=1)[0]
@pytest.mark.django_db @pytest.mark.django_db

View file

@ -268,6 +268,8 @@ def search_view(request: HttpRequest) -> HttpResponse:
Returns: Returns:
HttpResponse: The rendered search results. HttpResponse: The rendered search results.
""" """
# TODO(TheLovinator): Move from Twitch app to a separate "core" app # noqa: TD003
# since this is not Twitch-specific.
query: str = request.GET.get("q", "") query: str = request.GET.get("q", "")
results: dict[str, QuerySet] = {} results: dict[str, QuerySet] = {}
@ -587,6 +589,8 @@ def dataset_backups_view(request: HttpRequest) -> HttpResponse:
Returns: Returns:
HttpResponse: The rendered dataset backups page. HttpResponse: The rendered dataset backups page.
""" """
# TODO(TheLovinator): Move from Twitch app to a separate "core" app # noqa: TD003
# since this is not Twitch-specific.
datasets_root: Path = settings.DATA_DIR / "datasets" datasets_root: Path = settings.DATA_DIR / "datasets"
search_dirs: list[Path] = [datasets_root] search_dirs: list[Path] = [datasets_root]
seen_paths: set[str] = set() seen_paths: set[str] = set()
@ -1250,6 +1254,8 @@ def dashboard(request: HttpRequest) -> HttpResponse:
Returns: Returns:
HttpResponse: The rendered dashboard template. HttpResponse: The rendered dashboard template.
""" """
# TODO(TheLovinator): Move from Twitch app to a separate "core" app # noqa: TD003
# since this won't be Twitch-specific in the future if we add support for other platforms.
now: datetime.datetime = timezone.now() now: datetime.datetime = timezone.now()
active_campaigns: QuerySet[DropCampaign] = ( active_campaigns: QuerySet[DropCampaign] = (
DropCampaign.objects DropCampaign.objects
@ -2457,123 +2463,6 @@ def export_organizations_json(request: HttpRequest) -> HttpResponse: # noqa: AR
return response return response
# MARK: /sitemap.xml
def sitemap_view(request: HttpRequest) -> HttpResponse:
"""Generate a dynamic XML sitemap for search engines.
Args:
request: The HTTP request.
Returns:
HttpResponse: XML sitemap.
"""
base_url: str = f"{request.scheme}://{request.get_host()}"
# Start building sitemap XML
sitemap_urls: list[dict[str, str | dict[str, str]]] = []
# Static pages
sitemap_urls.extend([
{"url": f"{base_url}/", "priority": "1.0", "changefreq": "daily"},
{"url": f"{base_url}/campaigns/", "priority": "0.9", "changefreq": "daily"},
{"url": f"{base_url}/reward-campaigns/", "priority": "0.9", "changefreq": "daily"},
{"url": f"{base_url}/games/", "priority": "0.9", "changefreq": "weekly"},
{"url": f"{base_url}/organizations/", "priority": "0.8", "changefreq": "weekly"},
{"url": f"{base_url}/channels/", "priority": "0.8", "changefreq": "weekly"},
{"url": f"{base_url}/badges/", "priority": "0.7", "changefreq": "monthly"},
{"url": f"{base_url}/emotes/", "priority": "0.7", "changefreq": "monthly"},
{"url": f"{base_url}/search/", "priority": "0.6", "changefreq": "monthly"},
])
# Dynamic detail pages - Games
games: QuerySet[Game] = Game.objects.all()
for game in games:
entry: dict[str, str | dict[str, str]] = {
"url": f"{base_url}{reverse('twitch:game_detail', args=[game.twitch_id])}",
"priority": "0.8",
"changefreq": "weekly",
}
if game.updated_at:
entry["lastmod"] = game.updated_at.isoformat()
sitemap_urls.append(entry)
# Dynamic detail pages - Campaigns
campaigns: QuerySet[DropCampaign] = DropCampaign.objects.all()
for campaign in campaigns:
entry: dict[str, str | dict[str, str]] = {
"url": f"{base_url}{reverse('twitch:campaign_detail', args=[campaign.twitch_id])}",
"priority": "0.7",
"changefreq": "weekly",
}
if campaign.updated_at:
entry["lastmod"] = campaign.updated_at.isoformat()
sitemap_urls.append(entry)
# Dynamic detail pages - Organizations
orgs: QuerySet[Organization] = Organization.objects.all()
for org in orgs:
entry: dict[str, str | dict[str, str]] = {
"url": f"{base_url}{reverse('twitch:organization_detail', args=[org.twitch_id])}",
"priority": "0.7",
"changefreq": "weekly",
}
if org.updated_at:
entry["lastmod"] = org.updated_at.isoformat()
sitemap_urls.append(entry)
# Dynamic detail pages - Channels
channels: QuerySet[Channel] = Channel.objects.all()
for channel in channels:
entry: dict[str, str | dict[str, str]] = {
"url": f"{base_url}{reverse('twitch:channel_detail', args=[channel.twitch_id])}",
"priority": "0.6",
"changefreq": "weekly",
}
if channel.updated_at:
entry["lastmod"] = channel.updated_at.isoformat()
sitemap_urls.append(entry)
# Dynamic detail pages - Badges
badge_sets: QuerySet[ChatBadgeSet] = ChatBadgeSet.objects.all()
sitemap_urls.extend(
{
"url": f"{base_url}{reverse('twitch:badge_set_detail', args=[badge_set.set_id])}",
"priority": "0.5",
"changefreq": "monthly",
}
for badge_set in badge_sets
)
# Dynamic detail pages - Reward Campaigns
reward_campaigns: QuerySet[RewardCampaign] = RewardCampaign.objects.all()
for reward_campaign in reward_campaigns:
entry: dict[str, str | dict[str, str]] = {
"url": f"{base_url}{reverse('twitch:reward_campaign_detail', args=[reward_campaign.twitch_id])}",
"priority": "0.6",
"changefreq": "weekly",
}
if reward_campaign.updated_at:
entry["lastmod"] = reward_campaign.updated_at.isoformat()
sitemap_urls.append(entry)
# Build XML
xml_content = '<?xml version="1.0" encoding="UTF-8"?>\n'
xml_content += '<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">\n'
for url_entry in sitemap_urls:
xml_content += " <url>\n"
xml_content += f" <loc>{url_entry['url']}</loc>\n"
if url_entry.get("lastmod"):
xml_content += f" <lastmod>{url_entry['lastmod']}</lastmod>\n"
xml_content += f" <changefreq>{url_entry.get('changefreq', 'monthly')}</changefreq>\n"
xml_content += f" <priority>{url_entry.get('priority', '0.5')}</priority>\n"
xml_content += " </url>\n"
xml_content += "</urlset>"
return HttpResponse(xml_content, content_type="application/xml")
# MARK: /robots.txt # MARK: /robots.txt
def robots_txt_view(request: HttpRequest) -> HttpResponse: def robots_txt_view(request: HttpRequest) -> HttpResponse:
"""Generate robots.txt for search engine crawlers. """Generate robots.txt for search engine crawlers.
@ -2584,6 +2473,8 @@ def robots_txt_view(request: HttpRequest) -> HttpResponse:
Returns: Returns:
HttpResponse: robots.txt content. HttpResponse: robots.txt content.
""" """
# TODO(TheLovinator): Move from Twitch app to a separate "core" app # noqa: TD003
# since this is not Twitch-specific.
base_url: str = f"{request.scheme}://{request.get_host()}" base_url: str = f"{request.scheme}://{request.get_host()}"
robots_content: str = f"""User-agent: * robots_content: str = f"""User-agent: *