Compare commits

..

3 commits

Author SHA1 Message Date
77d9d448d7
Refactor GameFeed and GameDetailView to use 'owners' instead of 'owner'; update related tests
All checks were successful
Deploy to Server / deploy (push) Successful in 10s
2026-03-09 06:33:40 +01:00
6b936f4cf7
Remove /rss/organizations/<org_id>/campaigns/ 2026-03-09 05:56:57 +01:00
44cd440a17
Add watch_imports command 2026-03-09 05:27:11 +01:00
11 changed files with 437 additions and 330 deletions

View file

@ -13,12 +13,6 @@
<div>
<a href="{% url 'twitch:game_campaign_feed' game.twitch_id %}"
title="RSS feed for {{ game.display_name }} campaigns">RSS feed for {{ game.display_name }} campaigns</a>
{% if owners %}
{% for owner in owners %}
<a href="{% url 'twitch:organization_campaign_feed' owner.twitch_id %}"
title="RSS feed for {{ owner.name }} campaigns">RSS feed for {{ owner.name }} campaigns</a>
{% endfor %}
{% endif %}
<a href="{% url 'twitch:campaign_feed' %}"
title="RSS feed for all campaigns">RSS feed for all campaigns</a>
</div>

View file

@ -4,12 +4,6 @@
{% endblock title %}
{% block content %}
<h1 id="org-name">{{ organization.name }}</h1>
<!-- RSS Feeds -->
<div style="margin-bottom: 1rem;">
<a href="{% url 'twitch:organization_campaign_feed' organization.twitch_id %}"
style="margin-right: 1rem"
title="RSS feed for {{ organization.name }} campaigns">RSS feed for {{ organization.name }} campaigns</a>
</div>
<theader>
<h2 id="games-header">Games by {{ organization.name }}</h2>
</theader>

View file

@ -1,12 +1,32 @@
[Unit]
Description=TTVDrops import drops from pending directory
Description=TTVDrops watch and import drops from pending directory
After=network-online.target
Wants=network-online.target
[Service]
Type=oneshot
Type=simple
User=ttvdrops
Group=ttvdrops
WorkingDirectory=/home/ttvdrops/ttvdrops
EnvironmentFile=/home/ttvdrops/ttvdrops/.env
ExecStart=/usr/bin/uv run python manage.py better_import_drops /mnt/fourteen/Data/Responses/pending
-ExecStartPost=/usr/bin/uv run python manage.py download_box_art
-ExecStartPost=/usr/bin/uv run python manage.py download_campaign_images
ExecStart=/usr/bin/uv run python manage.py watch_imports /mnt/fourteen/Data/Responses/pending --verbose
# Restart policy
Restart=on-failure
RestartSec=5s
# Process management
KillMode=mixed
KillSignal=SIGTERM
# Resource limits
MemoryLimit=512M
CPUQuota=50%
# Logging
StandardOutput=journal
StandardError=journal
SyslogIdentifier=ttvdrops-watch
[Install]
WantedBy=multi-user.target

View file

@ -1,10 +0,0 @@
[Unit]
Description=Frequent TTVDrops import drops timer
[Timer]
OnBootSec=0
OnUnitActiveSec=1min
Persistent=true
[Install]
WantedBy=timers.target

View file

@ -306,9 +306,8 @@ def _construct_drops_summary(
class OrganizationRSSFeed(Feed):
"""RSS feed for latest organizations."""
# Spec: https://cyber.harvard.edu/rss/rss.html
feed_type = feedgenerator.Rss201rev2Feed
title: str = "TTVDrops Organizations"
title: str = "TTVDrops Twitch Organizations"
link: str = "/organizations/"
description: str = "Latest organizations on TTVDrops"
feed_copyright: str = "Information wants to be free."
@ -378,11 +377,11 @@ class OrganizationRSSFeed(Feed):
# MARK: /rss/games/
class GameFeed(Feed):
"""RSS feed for latest games."""
"""RSS feed for newly added games."""
title: str = "Games - TTVDrops"
link: str = "/games/"
description: str = "Latest games on TTVDrops"
description: str = "Newly added games on TTVDrops"
feed_copyright: str = "Information wants to be free."
_limit: int | None = None
@ -410,12 +409,14 @@ class GameFeed(Feed):
return super().__call__(request, *args, **kwargs)
def items(self) -> list[Game]:
"""Return the latest games (default 200, or limited by ?limit query param)."""
limit: int = self._limit if self._limit is not None else 200
return list(Game.objects.order_by("-added_at")[:limit])
"""Return the latest games (default 20, or limited by ?limit query param)."""
limit: int = self._limit if self._limit is not None else 20
return list(
Game.objects.prefetch_related("owners").order_by("-added_at")[:limit],
)
def item_title(self, item: Game) -> SafeText:
"""Return the game name as the item title (SafeText for RSS)."""
"""Return the game name as the item title."""
return SafeText(item.get_game_name)
def item_description(self, item: Game) -> SafeText:
@ -425,7 +426,7 @@ class GameFeed(Feed):
name: str = getattr(item, "name", "")
display_name: str = getattr(item, "display_name", "")
box_art: str = item.box_art_best_url
owner: Organization | None = getattr(item, "owner", None)
owner: Organization | None = item.owners.first()
description_parts: list[SafeText] = []
@ -439,17 +440,28 @@ class GameFeed(Feed):
),
)
# Get the full URL for TTVDrops game detail page
game_url: str = reverse("twitch:game_detail", args=[twitch_id])
rss_feed_url: str = reverse("twitch:game_campaign_feed", args=[twitch_id])
twitch_directory_url: str = getattr(item, "twitch_directory_url", "")
if slug:
description_parts.append(
SafeText(
f"<p><a href='https://www.twitch.tv/directory/game/{slug}'>{game_name} by {game_owner}</a></p>",
f"<p>New game has been added to ttvdrops.lovinator.space: {game_name} by {game_owner}\n"
f"<a href='{game_url}'>Game Details</a>\n"
f"<a href='{twitch_directory_url}'>Twitch</a>\n"
f"<a href='{rss_feed_url}'>RSS feed</a>\n</p>",
),
)
else:
description_parts.append(SafeText(f"<p>{game_name} by {game_owner}</p>"))
if twitch_id:
description_parts.append(SafeText(f"<small>Twitch ID: {twitch_id}</small>"))
description_parts.append(
SafeText(
f"<p>A new game has been added to ttvdrops.lovinator.space: {game_name} by {game_owner}\n"
f"<a href='{game_url}'>Game Details</a>\n"
f"<a href='{twitch_directory_url}'>Twitch</a>\n"
f"<a href='{rss_feed_url}'>RSS feed</a>\n</p>",
),
)
return SafeText("".join(str(part) for part in description_parts))
@ -481,7 +493,7 @@ class GameFeed(Feed):
def item_author_name(self, item: Game) -> str:
"""Return the author name for the game, typically the owner organization name."""
owner: Organization | None = getattr(item, "owner", None)
owner: Organization | None = item.owners.first()
if owner and owner.name:
return owner.name
@ -810,159 +822,6 @@ class GameCampaignFeed(Feed):
return "image/jpeg"
# MARK: /rss/organizations/<twitch_id>/campaigns/
class OrganizationCampaignFeed(Feed):
"""RSS feed for campaigns of a specific organization."""
_limit: int | None = None
def __call__(
self,
request: HttpRequest,
*args: object,
**kwargs: object,
) -> HttpResponse:
"""Override to capture limit parameter from request.
Args:
request (HttpRequest): The incoming HTTP request, potentially containing a 'limit' query parameter
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
HttpResponse: The HTTP response generated by the parent Feed class after processing the request.
"""
if request.GET.get("limit"):
try:
self._limit = int(request.GET.get("limit", 200))
except ValueError, TypeError:
self._limit = None
return super().__call__(request, *args, **kwargs)
def get_object(self, request: HttpRequest, twitch_id: str) -> Organization: # noqa: ARG002
"""Retrieve the Organization instance for the given Twitch ID.
Returns:
Organization: The corresponding Organization object.
"""
return Organization.objects.get(twitch_id=twitch_id)
def item_link(self, item: DropCampaign) -> str:
"""Return the link to the campaign detail."""
return reverse("twitch:campaign_detail", args=[item.twitch_id])
def title(self, obj: Organization) -> str:
"""Return the feed title for the organization's campaigns."""
return f"TTVDrops: {obj.name} Campaigns"
def link(self, obj: Organization) -> str:
"""Return the absolute URL to the organization detail page."""
return reverse("twitch:organization_detail", args=[obj.twitch_id])
def description(self, obj: Organization) -> str:
"""Return a description for the feed."""
return f"Latest drop campaigns for organization {obj.name}"
def items(self, obj: Organization) -> list[DropCampaign]:
"""Return the latest drop campaigns for this organization, ordered by most recent start date (default 200, or limited by ?limit query param)."""
limit: int = self._limit if self._limit is not None else 200
queryset: QuerySet[DropCampaign] = DropCampaign.objects.filter(
game__owners=obj,
).order_by("-start_at")
return list(_with_campaign_related(queryset)[:limit])
def item_author_name(self, item: DropCampaign) -> str:
"""Return the author name for the campaign, typically the game name."""
return item.get_feed_author_name()
def item_guid(self, item: DropCampaign) -> str:
"""Return a unique identifier for each campaign."""
return item.get_feed_guid()
def item_enclosure_url(self, item: DropCampaign) -> str:
"""Returns the URL of the campaign image for enclosure."""
return item.get_feed_enclosure_url()
def item_enclosure_length(self, item: DropCampaign) -> int: # noqa: ARG002
"""Returns the length of the enclosure."""
# TODO(TheLovinator): Track image size for proper length # noqa: TD003
return 0
def item_enclosure_mime_type(self, item: DropCampaign) -> str: # noqa: ARG002
"""Returns the MIME type of the enclosure."""
# TODO(TheLovinator): Determine actual MIME type if needed # noqa: TD003
return "image/jpeg"
def item_categories(self, item: DropCampaign) -> tuple[str, ...]:
"""Returns the associated game's name as a category."""
return item.get_feed_categories()
def item_updateddate(self, item: DropCampaign) -> datetime.datetime:
"""Returns the campaign's last update time."""
return item.updated_at
def item_pubdate(self, item: DropCampaign) -> datetime.datetime:
"""Returns the publication date to the feed item.
Uses start_at (when the drop starts). Fallback to added_at or now if missing.
"""
if item.start_at:
return item.start_at
if item.added_at:
return item.added_at
return timezone.now()
def item_description(self, item: DropCampaign) -> SafeText:
"""Return a description of the campaign."""
drops_data: list[dict] = []
channels: list[Channel] | None = getattr(item, "channels_ordered", None)
channel_name: str | None = channels[0].name if channels else None
drops: QuerySet[TimeBasedDrop] | None = getattr(item, "time_based_drops", None)
if drops:
drops_data = _build_drops_data(drops.all())
parts: list[SafeText] = []
image_url: str | None = getattr(item, "image_url", None)
if image_url:
item_name: str = getattr(item, "name", str(object=item))
parts.append(
format_html(
'<img src="{}" alt="{}" width="160" height="160" />',
image_url,
item_name,
),
)
desc_text: str | None = getattr(item, "description", None)
if desc_text:
parts.append(format_html("<p>{}</p>", desc_text))
# Insert start and end date info
insert_date_info(item, parts)
if drops_data:
parts.append(
format_html(
"<p>{}</p>",
_construct_drops_summary(drops_data, channel_name=channel_name),
),
)
# Only show channels if drop is not subscription only
if not getattr(item, "is_subscription_only", False) and channels is not None:
game: Game | None = getattr(item, "game", None)
parts.append(_build_channels_html(channels, game=game))
details_url: str | None = getattr(item, "details_url", None)
if details_url:
parts.append(format_html('<a href="{}">About</a>', details_url))
return SafeText("".join(str(p) for p in parts))
# MARK: /rss/reward-campaigns/
class RewardCampaignFeed(Feed):
"""RSS feed for latest reward campaigns (Quest rewards)."""

View file

@ -0,0 +1,116 @@
import logging
from pathlib import Path
from time import sleep
from typing import TYPE_CHECKING
from django.core.management.base import BaseCommand
from django.core.management.base import CommandError
from twitch.management.commands.better_import_drops import (
Command as BetterImportDropsCommand,
)
if TYPE_CHECKING:
from django.core.management.base import CommandParser
logger: logging.Logger = logging.getLogger("ttvdrops.watch_imports")
class Command(BaseCommand):
"""Watch for JSON files in a directory and import them automatically."""
help = "Watch a directory for JSON files and import them automatically"
requires_migrations_checks = True
def add_arguments(self, parser: CommandParser) -> None:
"""Populate the command with arguments."""
parser.add_argument(
"path",
type=str,
help="Path to directory to watch for JSON files",
)
def handle(self, *args, **options) -> None: # noqa: ARG002
"""Main entry point for the watch command.
Args:
*args: Variable length argument list (unused).
**options: Arbitrary keyword arguments.
Raises:
CommandError: If the provided path does not exist or is not a directory.
"""
watch_path: Path = self.get_watch_path(options)
self.stdout.write(
self.style.SUCCESS(f"Watching {watch_path} for JSON files..."),
)
self.stdout.write("Press Ctrl+C to stop\n")
importer_command: BetterImportDropsCommand = BetterImportDropsCommand()
while True:
try:
sleep(1)
self.import_json_files(
importer_command=importer_command,
watch_path=watch_path,
)
except KeyboardInterrupt:
msg = "Received keyboard interrupt. Stopping watch..."
self.stdout.write(self.style.WARNING(msg))
break
except CommandError as e:
msg = f"Import command error: {e}"
self.stdout.write(self.style.ERROR(msg))
except Exception as e:
msg: str = f"Error while watching directory: {e}"
raise CommandError(msg) from e
logger.info("Stopped watching directory: %s", watch_path)
def import_json_files(
self,
importer_command: BetterImportDropsCommand,
watch_path: Path,
) -> None:
"""Import all JSON files in the watch directory using the provided importer command.
Args:
importer_command: An instance of the BetterImportDropsCommand to handle the import logic.
watch_path: The directory path to watch for JSON files.
"""
# TODO(TheLovinator): Implement actual file watching using watchdog or similar library. # noqa: TD003
json_files: list[Path] = [
f for f in watch_path.iterdir() if f.suffix == ".json" and f.is_file()
]
if not json_files:
return
for json_file in json_files:
self.stdout.write(f"Importing {json_file}...")
importer_command.handle(path=json_file)
def get_watch_path(self, options: dict[str, str]) -> Path:
"""Validate and return the watch path from the command options.
Args:
options: The command options containing the 'path' key.
Returns:
The validated watch path as a Path object.
Raises:
CommandError: If the provided path does not exist or is not a directory.
"""
watch_path: Path = Path(options["path"]).resolve()
if not watch_path.exists():
msg: str = f"Path does not exist: {watch_path}"
raise CommandError(msg)
if not watch_path.is_dir():
msg: str = f"Path is not a directory: {watch_path}, it is a {watch_path.stat().st_mode}"
raise CommandError(msg)
return watch_path

View file

@ -64,6 +64,14 @@ class RSSFeedTestCase(TestCase):
response: _MonkeyPatchedWSGIResponse = self.client.get(url)
assert response.status_code == 200
assert response["Content-Type"] == "application/rss+xml; charset=utf-8"
content: str = response.content.decode("utf-8")
assert "Test Game by Test Organization" in content
expected_rss_link: str = reverse(
"twitch:game_campaign_feed",
args=[self.game.twitch_id],
)
assert expected_rss_link in content
def test_campaign_feed(self) -> None:
"""Test campaign feed returns 200."""
@ -115,19 +123,6 @@ class RSSFeedTestCase(TestCase):
content: str = response.content.decode("utf-8")
assert "Test Game" in content
def test_organization_campaign_feed(self) -> None:
"""Test organization-specific campaign feed returns 200."""
url: str = reverse(
"twitch:organization_campaign_feed",
args=[self.org.twitch_id],
)
response: _MonkeyPatchedWSGIResponse = self.client.get(url)
assert response.status_code == 200
assert response["Content-Type"] == "application/rss+xml; charset=utf-8"
# Verify the organization name is in the feed
content: str = response.content.decode("utf-8")
assert "Test Organization" in content
def test_game_campaign_feed_filters_correctly(self) -> None:
"""Test game campaign feed only shows campaigns for that game."""
# Create another game with a campaign
@ -157,42 +152,6 @@ class RSSFeedTestCase(TestCase):
# Should NOT contain other campaign
assert "Other Campaign" not in content
def test_organization_campaign_feed_filters_correctly(self) -> None:
"""Test organization campaign feed only shows campaigns for that organization."""
# Create another organization with a game and campaign
other_org = Organization.objects.create(
twitch_id="other-org-123",
name="Other Organization",
)
other_game = Game.objects.create(
twitch_id="other-game-456",
slug="other-game-2",
name="Other Game 2",
display_name="Other Game 2",
)
other_game.owners.add(other_org)
DropCampaign.objects.create(
twitch_id="other-campaign-456",
name="Other Campaign 2",
game=other_game,
start_at=timezone.now(),
end_at=timezone.now() + timedelta(days=7),
operation_names=["DropCampaignDetails"],
)
# Get feed for first organization
url: str = reverse(
"twitch:organization_campaign_feed",
args=[self.org.twitch_id],
)
response: _MonkeyPatchedWSGIResponse = self.client.get(url)
content: str = response.content.decode("utf-8")
# Should contain first campaign
assert "Test Campaign" in content
# Should NOT contain other campaign
assert "Other Campaign 2" not in content
QueryAsserter = Callable[..., AbstractContextManager[object]]
@ -441,65 +400,8 @@ def test_game_feed_queries_bounded(
game.owners.add(org)
url: str = reverse("twitch:game_feed")
with django_assert_num_queries(1, exact=True):
response: _MonkeyPatchedWSGIResponse = client.get(url)
assert response.status_code == 200
@pytest.mark.django_db
def test_organization_campaign_feed_queries_bounded(
client: Client,
django_assert_num_queries: QueryAsserter,
) -> None:
"""Organization campaign feed should not regress in query count."""
org: Organization = Organization.objects.create(
twitch_id="org-campaign-feed",
name="Org Campaign Feed",
)
game: Game = Game.objects.create(
twitch_id="org-campaign-game",
slug="org-campaign-game",
name="Org Campaign Game",
display_name="Org Campaign Game",
)
game.owners.add(org)
for i in range(3):
_build_campaign(game, i)
url: str = reverse("twitch:organization_campaign_feed", args=[org.twitch_id])
# TODO(TheLovinator): 12 queries is still quite high for a feed - we should be able to optimize this further, but this is a good starting point to prevent regressions for now. # noqa: TD003
with django_assert_num_queries(12, exact=False):
response: _MonkeyPatchedWSGIResponse = client.get(url)
assert response.status_code == 200
@pytest.mark.django_db
def test_organization_campaign_feed_queries_do_not_scale_with_items(
client: Client,
django_assert_num_queries: QueryAsserter,
) -> None:
"""Organization campaign RSS feed query count should remain bounded as item count grows."""
org: Organization = Organization.objects.create(
twitch_id="test-org-org-scale-queries",
name="Org Scale Query Org",
)
game: Game = Game.objects.create(
twitch_id="test-game-org-scale-queries",
slug="org-scale-game",
name="Org Scale Game",
display_name="Org Scale Game",
)
game.owners.add(org)
for i in range(50):
_build_campaign(game, i)
url: str = reverse("twitch:organization_campaign_feed", args=[org.twitch_id])
with django_assert_num_queries(15, exact=False):
# One query for games + one prefetch query for owners.
with django_assert_num_queries(2, exact=True):
response: _MonkeyPatchedWSGIResponse = client.get(url)
assert response.status_code == 200
@ -591,7 +493,6 @@ URL_NAMES: list[tuple[str, dict[str, str]]] = [
("twitch:game_feed", {}),
("twitch:game_campaign_feed", {"twitch_id": "test-game-123"}),
("twitch:organization_feed", {}),
("twitch:organization_campaign_feed", {"twitch_id": "test-org-123"}),
("twitch:reward_campaign_feed", {}),
]

View file

@ -930,6 +930,36 @@ class TestChannelListView:
assert response.status_code == 200
assert "game" in response.context
@pytest.mark.django_db
def test_game_detail_view_serializes_owners_field(
self,
client: Client,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Game detail JSON payload should use `owners` (M2M), not stale `owner`."""
org: Organization = Organization.objects.create(
twitch_id="org-game-detail",
name="Org Game Detail",
)
game: Game = Game.objects.create(
twitch_id="g2-owners",
name="Game2 Owners",
display_name="Game2 Owners",
)
game.owners.add(org)
monkeypatch.setattr("twitch.views.format_and_color_json", lambda data: data)
url: str = reverse("twitch:game_detail", args=[game.twitch_id])
response: _MonkeyPatchedWSGIResponse = client.get(url)
assert response.status_code == 200
game_data: dict[str, Any] = response.context["game_data"]
fields: dict[str, Any] = game_data["fields"]
assert "owners" in fields
assert fields["owners"] == [org.pk]
assert "owner" not in fields
@pytest.mark.django_db
def test_org_list_view(self, client: Client) -> None:
"""Test org list view returns 200 and has orgs in context."""

View file

@ -0,0 +1,214 @@
from pathlib import Path
from typing import TYPE_CHECKING
from unittest.mock import MagicMock
import pytest
from _pytest.capture import CaptureResult
from django.core.management.base import CommandError
from twitch.management.commands.watch_imports import Command
if TYPE_CHECKING:
from pathlib import Path
from _pytest.capture import CaptureResult
class TestWatchImportsCommand:
"""Tests for the watch_imports management command."""
def test_get_watch_path_returns_resolved_directory(self, tmp_path: Path) -> None:
"""It should return the resolved path when a valid directory is provided."""
command = Command()
result: Path = command.get_watch_path({"path": str(tmp_path)})
assert result == tmp_path.resolve()
def test_get_watch_path_raises_for_missing_path(self, tmp_path: Path) -> None:
"""It should raise CommandError when the path does not exist."""
command = Command()
missing_path: Path = tmp_path / "missing"
with pytest.raises(CommandError, match="Path does not exist"):
command.get_watch_path({"path": str(missing_path)})
def test_get_watch_path_raises_for_file_path(self, tmp_path: Path) -> None:
"""It should raise CommandError when the path is not a directory."""
command = Command()
file_path: Path = tmp_path / "data.json"
file_path.write_text("{}", encoding="utf-8")
with pytest.raises(CommandError, match="Path is not a directory"):
command.get_watch_path({"path": str(file_path)})
def test_import_json_files_imports_only_json_files(self, tmp_path: Path) -> None:
"""It should call importer for .json files and ignore other entries."""
command = Command()
importer_command = MagicMock()
json_file_1: Path = tmp_path / "one.json"
json_file_2: Path = tmp_path / "two.json"
ignored_txt: Path = tmp_path / "notes.txt"
ignored_dir: Path = tmp_path / "nested.json"
json_file_1.write_text("{}", encoding="utf-8")
json_file_2.write_text("[]", encoding="utf-8")
ignored_txt.write_text("ignored", encoding="utf-8")
ignored_dir.mkdir()
command.import_json_files(
importer_command=importer_command,
watch_path=tmp_path,
)
imported_paths: list[Path] = [
call.kwargs["path"] for call in importer_command.handle.call_args_list
]
assert set(imported_paths) == {json_file_1, json_file_2}
def test_import_json_files_no_json_files_does_nothing(self, tmp_path: Path) -> None:
"""It should not call importer when no JSON files are present."""
command = Command()
importer_command = MagicMock()
(tmp_path / "notes.txt").write_text("ignored", encoding="utf-8")
command.import_json_files(
importer_command=importer_command,
watch_path=tmp_path,
)
importer_command.handle.assert_not_called()
def test_handle_stops_cleanly_on_keyboard_interrupt(
self,
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
capsys: pytest.CaptureFixture[str],
) -> None:
"""It should print a warning and stop when interrupted by keyboard."""
command = Command()
importer_instance = MagicMock()
monkeypatch.setattr(command, "get_watch_path", lambda options: tmp_path)
monkeypatch.setattr(
"twitch.management.commands.watch_imports.BetterImportDropsCommand",
lambda: importer_instance,
)
monkeypatch.setattr(
"twitch.management.commands.watch_imports.sleep",
lambda _seconds: (_ for _ in ()).throw(KeyboardInterrupt()),
)
command.handle(path=str(tmp_path))
captured: CaptureResult[str] = capsys.readouterr()
assert "Watching" in captured.out
assert "Received keyboard interrupt. Stopping watch..." in captured.out
def test_handle_reports_command_errors_and_continues_until_interrupt(
self,
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
capsys: pytest.CaptureFixture[str],
) -> None:
"""It should report CommandError from import and continue the loop."""
command = Command()
importer_instance = MagicMock()
sleep_calls: dict[str, int] = {"count": 0}
def fake_sleep(_seconds: int) -> None:
"""Simulate sleep and raise KeyboardInterrupt after 2 calls.
Args:
_seconds: The number of seconds to sleep (ignored).
Raises:
KeyboardInterrupt: After being called twice to simulate user interrupt.
"""
sleep_calls["count"] += 1
if sleep_calls["count"] == 2:
raise KeyboardInterrupt
def fake_import_json_files(
*,
importer_command: MagicMock,
watch_path: Path,
) -> None:
"""Simulate an import that raises CommandError.
Args:
importer_command: The mock importer command instance (ignored).
watch_path: The path being watched (ignored).
Raises:
CommandError: Always raised to simulate an import error.
"""
msg = "bad import"
raise CommandError(msg)
monkeypatch.setattr(
command,
"get_watch_path",
lambda options: tmp_path,
)
monkeypatch.setattr(
"twitch.management.commands.watch_imports.BetterImportDropsCommand",
lambda: importer_instance,
)
monkeypatch.setattr(
"twitch.management.commands.watch_imports.sleep",
fake_sleep,
)
monkeypatch.setattr(
command,
"import_json_files",
fake_import_json_files,
)
command.handle(path=str(tmp_path))
captured: CaptureResult[str] = capsys.readouterr()
assert "Import command error: bad import" in captured.out
assert "Received keyboard interrupt. Stopping watch..." in captured.out
def test_handle_wraps_unexpected_errors_in_command_error(
self,
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
"""It should wrap unexpected exceptions in a CommandError."""
command = Command()
importer_instance = MagicMock()
monkeypatch.setattr(
command,
"get_watch_path",
lambda options: tmp_path,
)
monkeypatch.setattr(
"twitch.management.commands.watch_imports.BetterImportDropsCommand",
lambda: importer_instance,
)
monkeypatch.setattr(
"twitch.management.commands.watch_imports.sleep",
lambda _seconds: None,
)
def fake_import_json_files(
*,
importer_command: MagicMock,
watch_path: Path,
) -> None:
msg = "boom"
raise RuntimeError(msg)
monkeypatch.setattr(
command,
"import_json_files",
fake_import_json_files,
)
with pytest.raises(CommandError, match="Error while watching directory: boom"):
command.handle(path=str(tmp_path))

View file

@ -6,7 +6,6 @@ from twitch import views
from twitch.feeds import DropCampaignFeed
from twitch.feeds import GameCampaignFeed
from twitch.feeds import GameFeed
from twitch.feeds import OrganizationCampaignFeed
from twitch.feeds import OrganizationRSSFeed
from twitch.feeds import RewardCampaignFeed
@ -83,18 +82,27 @@ urlpatterns: list[URLPattern] = [
views.export_organizations_json,
name="export_organizations_json",
),
# RSS feeds
# /rss/campaigns/ - all active campaigns
path("rss/campaigns/", DropCampaignFeed(), name="campaign_feed"),
# /rss/games/ - newly added games
path("rss/games/", GameFeed(), name="game_feed"),
# /rss/games/<twitch_id>/campaigns/ - active campaigns for a specific game
path(
"rss/games/<str:twitch_id>/campaigns/",
GameCampaignFeed(),
name="game_campaign_feed",
),
path("rss/organizations/", OrganizationRSSFeed(), name="organization_feed"),
# /rss/organizations/ - newly added organizations
path(
"rss/organizations/<str:twitch_id>/campaigns/",
OrganizationCampaignFeed(),
name="organization_campaign_feed",
"rss/organizations/",
OrganizationRSSFeed(),
name="organization_feed",
),
# /rss/reward-campaigns/ - all active reward campaigns
path(
"rss/reward-campaigns/",
RewardCampaignFeed(),
name="reward_campaign_feed",
),
path("rss/reward-campaigns/", RewardCampaignFeed(), name="reward_campaign_feed"),
]

View file

@ -41,7 +41,6 @@ from pygments.lexers.data import JsonLexer
from twitch.feeds import DropCampaignFeed
from twitch.feeds import GameCampaignFeed
from twitch.feeds import GameFeed
from twitch.feeds import OrganizationCampaignFeed
from twitch.feeds import OrganizationRSSFeed
from twitch.feeds import RewardCampaignFeed
from twitch.models import Channel
@ -1217,7 +1216,7 @@ class GameDetailView(DetailView):
"name",
"display_name",
"box_art",
"owner",
"owners",
"added_at",
"updated_at",
),
@ -1857,24 +1856,6 @@ def docs_rss_view(request: HttpRequest) -> HttpResponse:
if sample_game
else "",
},
{
"title": "Campaigns for an Organization",
"description": "Drop campaigns across games owned by one organization.",
"url": (
absolute(
reverse(
"twitch:organization_campaign_feed",
args=[sample_org.twitch_id],
),
)
if sample_org
else absolute("/rss/organizations/<org_id>/campaigns/")
),
"has_sample": bool(sample_org),
"example_xml": render_feed(OrganizationCampaignFeed(), sample_org.twitch_id)
if sample_org
else "",
},
]
seo_context: dict[str, Any] = _build_seo_context(