Refactor feeds.py
All checks were successful
Deploy to Server / deploy (push) Successful in 10s

This commit is contained in:
Joakim Hellsén 2026-03-10 19:51:10 +01:00
commit 17ef09465d
Signed by: Joakim Hellsén
SSH key fingerprint: SHA256:/9h/CsExpFp+PRhsfA0xznFx2CGfTT5R/kpuFfUgEQk
2 changed files with 280 additions and 179 deletions

View file

@ -58,13 +58,112 @@ def _with_campaign_related(queryset: QuerySet[DropCampaign]) -> QuerySet[DropCam
)
def insert_date_info(item: Model, parts: list[SafeText]) -> None:
"""Insert start and end date information into parts list.
def genereate_details_link_html(item: DropCampaign) -> list[SafeText]:
"""Helper method to append a details link to the description if available.
Args:
item (DropCampaign): The drop campaign item to check for a details URL.
Returns:
list[SafeText]: A list containing the formatted details link if a details URL is present, otherwise an empty list.
"""
parts: list[SafeText] = []
details_url: str | None = getattr(item, "details_url", None)
if details_url:
parts.append(format_html('<a href="{}">About</a>', details_url))
return parts
def generate_item_image(item: DropCampaign) -> list[SafeText]:
"""Helper method to generate an image tag for the campaign image if available.
Args:
item (DropCampaign): The drop campaign item to check for an image URL.
Returns:
list[SafeText]: A list containing the formatted image tag if an image URL is present, otherwise an empty list.
"""
parts: list[SafeText] = []
image_url: str | None = getattr(item, "image_url", None)
if image_url:
item_name: str = getattr(item, "name", str(object=item))
parts.append(
format_html(
'<img src="{}" alt="{}" width="160" height="160" />',
image_url,
item_name,
),
)
return parts
def generate_item_image_tag(item: DropCampaign) -> list[SafeString]:
"""Helper method to append an image tag for the campaign image if available.
Args:
item (DropCampaign): The drop campaign item to check for an image URL.
Returns:
list[SafeString]: A list containing the formatted image tag if an image URL is present, otherwise an empty list.
"""
parts: list[SafeText] = []
image_url: str | None = getattr(item, "image_url", None)
if image_url:
item_name: str = getattr(item, "name", str(object=item))
parts.append(
format_html(
'<img src="{}" alt="{}" width="160" height="160" />',
image_url,
item_name,
),
)
return parts
def generate_details_link(item: DropCampaign) -> list[SafeString]:
"""Helper method to append a details link to the description if available.
Args:
item (DropCampaign): The drop campaign item to check for a details URL.
Returns:
list[SafeString]: A list containing the formatted details link if a details URL is present, otherwise an empty list.
"""
parts: list[SafeText] = []
details_url: str | None = getattr(item, "details_url", None)
if details_url:
parts.append(format_html('<a href="{}">Details</a>', details_url))
return parts
def generate_description_html(item: DropCampaign) -> list[SafeText]:
"""Generate additional description HTML for a drop campaign item, such as the description text and details link.
Args:
item (DropCampaign): The drop campaign item to generate description HTML for.
Returns:
list[SafeText]: A list of SafeText elements containing the generated description HTML.
"""
parts: list[SafeText] = []
desc_text: str | None = getattr(item, "description", None)
if desc_text:
parts.append(format_html("<p>{}</p>", desc_text))
return parts
def generate_date_html(item: Model) -> list[SafeText]:
"""Generate HTML snippets for the start and end dates of a campaign item, formatted with both absolute and relative times.
Args:
item (Model): The campaign item containing start_at and end_at.
parts (list[SafeText]): The list of HTML parts to append to.
Returns:
list[SafeText]: A list of SafeText elements with formatted start and end date info.
"""
parts: list[SafeText] = []
end_at: datetime.datetime | None = getattr(item, "end_at", None)
start_at: datetime.datetime | None = getattr(item, "start_at", None)
@ -95,6 +194,157 @@ def insert_date_info(item: Model, parts: list[SafeText]) -> None:
elif end_part:
parts.append(format_html("<p>{}</p>", end_part))
return parts
def generate_drops_summary_html(item: DropCampaign) -> list[SafeString]:
"""Generate HTML summary for drops and append to parts list.
Args:
item (DropCampaign): The drop campaign item containing the drops to summarize.
Returns:
list[SafeString]: A list of SafeText elements summarizing the drops, or empty if no drops.
"""
parts: list[SafeText] = []
drops_data: list[dict] = []
channels: list[Channel] | None = getattr(item, "channels_ordered", None)
channel_name: str | None = channels[0].name if channels else None
drops: QuerySet[TimeBasedDrop] | None = getattr(item, "time_based_drops", None)
if drops:
drops_data = _build_drops_data(drops.all())
if drops_data:
parts.append(
format_html(
"<p>{}</p>",
_construct_drops_summary(drops_data, channel_name=channel_name),
),
)
return parts
def generate_channels_html(item: Model) -> list[SafeText]:
"""Generate HTML for the list of channels associated with a drop campaign, if applicable.
Only generates channel links if the drop is not subscription-only. If it is subscription-only, it will skip channel links to avoid confusion.
Args:
item (Model): The campaign item which may have an 'is_subscription_only' attribute.
Returns:
list[SafeText]: A list containing the HTML for the channels section, or empty if subscription-only or no channels.
"""
max_links = 5
parts: list[SafeText] = []
channels: list[Channel] | None = getattr(item, "channels_ordered", None)
if not channels:
return parts
if getattr(item, "is_subscription_only", False):
return parts
game: Game | None = getattr(item, "game", None)
channels_all: list[Channel] = (
list(channels)
if isinstance(channels, list)
else list(channels.all())
if channels is not None
else []
)
total: int = len(channels_all)
if channels_all:
create_channel_list_html(max_links, parts, channels_all, total)
return parts
if not game:
logger.warning(
"No game associated with drop campaign for channel fallback link",
)
parts.append(
format_html(
"{}",
"<ul><li>Drop has no game and no channels connected to the drop.</li></ul>",
),
)
return parts
if not game.twitch_directory_url:
logger.warning(
"Game %s has no Twitch directory URL for channel fallback link",
game,
)
if "twitch-chat-badges-guide" in getattr(game, "details_url", ""):
# TODO(TheLovinator): Improve detection of global emotes # noqa: TD003
parts.append(
format_html(
"{}",
"<ul><li>Game is Twitch chat badges guide, likely meaning these are global Twitch emotes?</li></ul>",
),
)
return parts
parts.append(
format_html(
"{}",
"<ul><li>Game has no Twitch directory URL for channel fallback link.</li></ul>",
),
)
return parts
display_name: str = getattr(game, "display_name", "this game")
parts.append(
format_html(
'<ul><li><a href="{}" title="Browse {} category">Category-wide for {}</a></li></ul>',
game.twitch_directory_url,
display_name,
display_name,
),
)
return parts
def create_channel_list_html(
max_links: int,
parts: list[SafeText],
channels_all: list[Channel],
total: int,
) -> None:
"""Helper function to create HTML for a list of channels, limited to max_links with a note if there are more.
Args:
max_links (int): The maximum number of channel links to display.
parts (list[SafeText]): The list to append the generated HTML to.
channels_all (list[Channel]): The full list of channels to generate links for.
total (int): The total number of channels, used for the "... and X more" message if there are more than max_links.
"""
items: list[SafeString] = [
format_html(
'<li><a href="https://twitch.tv/{}">{}</a></li>',
channel.name,
channel.display_name,
)
for channel in channels_all[:max_links]
]
if total > max_links:
items.append(format_html("<li>... and {} more</li>", total - max_links))
html: SafeText = format_html(
"<ul>{}</ul>",
format_html_join("", "{}", [(item,) for item in items]),
)
parts.append(format_html("<p>Channels with this drop:</p>{}", html))
def _build_drops_data(drops_qs: QuerySet[TimeBasedDrop]) -> list[dict]:
"""Build a simplified data structure for rendering drops in a template.
@ -137,78 +387,6 @@ def _build_drops_data(drops_qs: QuerySet[TimeBasedDrop]) -> list[dict]:
return drops_data
def _build_channels_html(
channels: list[Channel] | QuerySet[Channel],
game: Game | None,
) -> SafeText:
"""Render up to max_links channel links as <li>, then a count of additional channels, or fallback to game category link.
If only one channel and drop_requirements is '1 subscriptions required',
merge the Twitch link with the '1 subs' row.
Args:
channels (list[Channel] | QuerySet[Channel]): The channels (already ordered).
game (Game | None): The game object for fallback link.
Returns:
SafeText: HTML <ul> with up to max_links channel links, count of more, or fallback link.
"""
max_links = 5
channels_all: list[Channel] = (
list(channels) if isinstance(channels, list) else list(channels.all())
)
total: int = len(channels_all)
if channels_all:
items: list[SafeString] = [
format_html(
'<li><a href="https://twitch.tv/{}">{}</a></li>',
channel.name,
channel.display_name,
)
for channel in channels_all[:max_links]
]
if total > max_links:
items.append(format_html("<li>... and {} more</li>", total - max_links))
return format_html(
"<ul>{}</ul>",
format_html_join("", "{}", [(item,) for item in items]),
)
if not game:
logger.warning(
"No game associated with drop campaign for channel fallback link",
)
return format_html(
"{}",
"<ul><li>Drop has no game and no channels connected to the drop.</li></ul>",
)
if not game.twitch_directory_url:
logger.warning(
"Game %s has no Twitch directory URL for channel fallback link",
game,
)
if "twitch-chat-badges-guide" in getattr(game, "details_url", ""):
# TODO(TheLovinator): Improve detection of global emotes # noqa: TD003
return format_html("{}", "<ul><li>Global Twitch Emote?</li></ul>")
return format_html(
"{}",
"<ul><li>Failed to get Twitch category URL :(</li></ul>",
)
display_name: str = getattr(game, "display_name", "this game")
return format_html(
'<ul><li><a href="{}" title="Browse {} category">Category-wide for {}</a></li></ul>',
game.twitch_directory_url,
display_name,
display_name,
)
def _construct_drops_summary(
drops_data: list[dict],
channel_name: str | None = None,
@ -335,8 +513,9 @@ class OrganizationRSSFeed(Feed):
def items(self) -> QuerySet[Organization]:
"""Return the latest organizations (default 200, or limited by ?limit query param)."""
limit: int = self._limit if self._limit is not None else 200
return Organization.objects.order_by("-added_at")[:limit]
if self._limit is None:
self._limit = 200 # Default limit
return Organization.objects.order_by("-added_at")[: self._limit]
def item_title(self, item: Organization) -> SafeText:
"""Return the organization name as the item title."""
@ -344,7 +523,7 @@ class OrganizationRSSFeed(Feed):
def item_description(self, item: Organization) -> SafeText:
"""Return a description of the organization."""
return SafeText(item.feed_description)
return SafeText(item.feed_description())
def item_link(self, item: Organization) -> str:
"""Return the link to the organization detail."""
@ -563,50 +742,14 @@ class DropCampaignFeed(Feed):
def item_description(self, item: DropCampaign) -> SafeText:
"""Return a description of the campaign."""
drops_data: list[dict] = []
channels: list[Channel] | None = getattr(item, "channels_ordered", None)
channel_name: str | None = channels[0].name if channels else None
drops: QuerySet[TimeBasedDrop] | None = getattr(item, "time_based_drops", None)
if drops:
drops_data = _build_drops_data(drops.all())
parts: list[SafeText] = []
image_url: str | None = getattr(item, "image_url", None)
if image_url:
item_name: str = getattr(item, "name", str(object=item))
parts.append(
format_html(
'<img src="{}" alt="{}" width="160" height="160" />',
image_url,
item_name,
),
)
desc_text: str | None = getattr(item, "description", None)
if desc_text:
parts.append(format_html("<p>{}</p>", desc_text))
# Insert start and end date info
insert_date_info(item, parts)
if drops_data:
parts.append(
format_html(
"<p>{}</p>",
_construct_drops_summary(drops_data, channel_name=channel_name),
),
)
# Only show channels if drop is not subscription only
if not getattr(item, "is_subscription_only", False) and channels is not None:
game: Game | None = getattr(item, "game", None)
parts.append(_build_channels_html(channels, game=game))
details_url: str | None = getattr(item, "details_url", None)
if details_url:
parts.append(format_html('<a href="{}">About</a>', details_url))
parts.extend(generate_item_image(item))
parts.extend(generate_description_html(item=item))
parts.extend(generate_date_html(item=item))
parts.extend(generate_drops_summary_html(item=item))
parts.extend(generate_channels_html(item))
parts.extend(genereate_details_link_html(item))
return SafeText("".join(str(p) for p in parts))
@ -642,23 +785,15 @@ class DropCampaignFeed(Feed):
return item.get_feed_enclosure_url()
def item_enclosure_length(self, item: DropCampaign) -> int:
"""Returns the length of the enclosure.
Reads the `image_size_bytes` field added to the model. If the field is
unset it returns `0` to match previous behavior.
"""
"""Returns the length of the enclosure."""
try:
size = getattr(item, "image_size_bytes", None)
size: int | None = getattr(item, "image_size_bytes", None)
return int(size) if size is not None else 0
except TypeError, ValueError:
return 0
def item_enclosure_mime_type(self, item: DropCampaign) -> str:
"""Returns the MIME type of the enclosure.
Uses `image_mime_type` on the campaign if set, falling back to the
previous hard-coded `image/jpeg`.
"""
"""Returns the MIME type of the enclosure."""
mime: str = getattr(item, "image_mime_type", "")
return mime or "image/jpeg"
@ -735,46 +870,13 @@ class GameCampaignFeed(Feed):
def item_description(self, item: DropCampaign) -> SafeText:
"""Return a description of the campaign."""
drops_data: list[dict] = []
channels: list[Channel] | None = getattr(item, "channels_ordered", None)
channel_name: str | None = channels[0].name if channels else None
drops: QuerySet[TimeBasedDrop] | None = getattr(item, "time_based_drops", None)
if drops:
drops_data = _build_drops_data(drops.all())
parts: list[SafeText] = []
image_url: str | None = getattr(item, "image_url", None)
if image_url:
item_name: str = getattr(item, "name", str(object=item))
parts.append(
format_html(
'<img src="{}" alt="{}" width="160" height="160" />',
image_url,
item_name,
),
)
# Insert start and end date info
insert_date_info(item, parts)
if drops_data:
parts.append(
format_html(
"<p>{}</p>",
_construct_drops_summary(drops_data, channel_name=channel_name),
),
)
# Only show channels if drop is not subscription only
if not getattr(item, "is_subscription_only", False) and channels is not None:
game: Game | None = getattr(item, "game", None)
parts.append(_build_channels_html(channels, game=game))
details_url: str | None = getattr(item, "details_url", None)
if details_url:
parts.append(format_html('<a href="{}">About</a>', details_url))
parts.extend(generate_item_image_tag(item))
parts.extend(generate_details_link(item))
parts.extend(generate_date_html(item))
parts.extend(generate_drops_summary_html(item))
parts.extend(generate_channels_html(item))
return SafeText("".join(str(p) for p in parts))
@ -811,23 +913,15 @@ class GameCampaignFeed(Feed):
return item.get_feed_enclosure_url()
def item_enclosure_length(self, item: DropCampaign) -> int:
"""Returns the length of the enclosure.
Reads the ``image_size_bytes`` field added to the model when rendering a
game-specific campaign feed.
"""
"""Returns the length of the enclosure."""
try:
size = getattr(item, "image_size_bytes", None)
size: int | None = getattr(item, "image_size_bytes", None)
return int(size) if size is not None else 0
except TypeError, ValueError:
return 0
def item_enclosure_mime_type(self, item: DropCampaign) -> str:
"""Returns the MIME type of the enclosure.
Prefers `image_mime_type` on the campaign object; falls back to
`image/jpeg` when not available.
"""
"""Returns the MIME type of the enclosure."""
mime: str = getattr(item, "image_mime_type", "")
return mime or "image/jpeg"

View file

@ -1,5 +1,6 @@
"""Test RSS feeds."""
import logging
from collections.abc import Callable
from contextlib import AbstractContextManager
from datetime import timedelta
@ -25,6 +26,8 @@ from twitch.models import Organization
from twitch.models import RewardCampaign
from twitch.models import TimeBasedDrop
logger: logging.Logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from django.test.client import _MonkeyPatchedWSGIResponse
@ -40,6 +43,8 @@ class RSSFeedTestCase(TestCase):
twitch_id="test-org-123",
name="Test Organization",
)
self.org.save()
self.game: Game = Game.objects.create(
twitch_id="test-game-123",
slug="test-game",
@ -59,12 +64,14 @@ class RSSFeedTestCase(TestCase):
# populate the new enclosure metadata fields so feeds can return them
self.game.box_art_size_bytes = 42
self.game.box_art_mime_type = "image/png"
# provide a URL so that the RSS enclosure element is emitted
self.game.box_art = "https://example.com/box.png"
self.game.save()
self.campaign.image_size_bytes = 314
self.campaign.image_mime_type = "image/gif"
# feed will only include an enclosure if there is some image URL/field
self.campaign.image_url = "https://example.com/campaign.png"
self.campaign.save()