diff --git a/.vscode/settings.json b/.vscode/settings.json index 6170aae..1188f98 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -27,6 +27,8 @@ "Mailgun", "makemigrations", "McCabe", + "noopener", + "noreferrer", "platformdirs", "prefetcher", "psutil", diff --git a/twitch/feeds.py b/twitch/feeds.py index e52fa7e..2378a3d 100644 --- a/twitch/feeds.py +++ b/twitch/feeds.py @@ -1,14 +1,22 @@ from __future__ import annotations +import logging +import re from typing import TYPE_CHECKING +from typing import Literal +from django.contrib.humanize.templatetags.humanize import naturaltime from django.contrib.syndication.views import Feed from django.db.models.query import QuerySet from django.urls import reverse from django.utils import timezone from django.utils.html import format_html +from django.utils.html import format_html_join +from django.utils.safestring import SafeString from django.utils.safestring import SafeText +from twitch.models import Channel +from twitch.models import DropBenefit from twitch.models import DropCampaign from twitch.models import Game from twitch.models import Organization @@ -21,6 +29,8 @@ if TYPE_CHECKING: from django.db.models import QuerySet from django.http import HttpRequest +logger: logging.Logger = logging.getLogger("ttvdrops") + # MARK: /rss/organizations/ class OrganizationFeed(Feed): @@ -76,6 +86,111 @@ class GameFeed(Feed): class DropCampaignFeed(Feed): """RSS feed for latest drop campaigns.""" + def _get_channel_name_from_drops(self, drops: QuerySet[TimeBasedDrop]) -> str | None: + for d in drops: + campaign: DropCampaign | None = getattr(d, "campaign", None) + if campaign: + allow_channels: QuerySet[Channel] | None = getattr(campaign, "allow_channels", None) + if allow_channels: + channels: QuerySet[Channel, Channel] = allow_channels.all() + if channels: + return channels[0].name + return None + + def get_channel_from_benefit(self, benefit: Model) -> str | None: + """Get the Twitch channel name associated with a drop benefit. + + Args: + benefit (Model): The drop benefit model instance. + + Returns: + str | None: The Twitch channel name if found, else None. + """ + drop_obj: QuerySet[TimeBasedDrop] | None = getattr(benefit, "drops", None) + if drop_obj and hasattr(drop_obj, "all"): + try: + return self._get_channel_name_from_drops(drop_obj.all()) + except AttributeError: + logger.exception("Exception occurred while resolving channel name for benefit") + return None + + def _resolve_channel_name(self, drop: dict) -> str | None: + """Try to resolve the Twitch channel name for a drop dict's benefits or fallback keys. + + Args: + drop (dict): The drop data dictionary. + + Returns: + str | None: The Twitch channel name if found, else None. + """ + benefits: list[Model] = drop.get("benefits", []) + benefit0: Model | None = benefits[0] if benefits else None + if benefit0 and hasattr(benefit0, "drops"): + channel_name: str | None = self.get_channel_from_benefit(benefit0) + if channel_name: + return channel_name + return None + + def _build_channels_html(self, channels: QuerySet[Channel], game: Game | None) -> SafeText: + """Render up to max_links channel links as
{}
", desc_text) - start_at: datetime.datetime | None = getattr(item, "start_at", None) - end_at: datetime.datetime | None = getattr(item, "end_at", None) - if start_at: - description += f"Starts: {start_at.strftime('%Y-%m-%d %H:%M %Z')}
" - if end_at: - description += f"Ends: {end_at.strftime('%Y-%m-%d %H:%M %Z')}
" - drops: QuerySet[TimeBasedDrop] | None = getattr( - item, - "time_based_drops", - None, - ) - if drops: - drops_qs: QuerySet[TimeBasedDrop] = drops.select_related().prefetch_related("benefits").all() - if drops_qs: - description += "| Benefits | ' - 'Drop Name | ' - 'Requirements | ' - 'Period | ' - "
|---|---|---|---|
| '
- for benefit in drop.benefits.all():
- if getattr(benefit, "image_asset_url", None):
- description += format_html(
- ' | "
- description += (
- f'{getattr(drop, "name", str(drop))} | ' - ) - requirements: str = "" - if getattr(drop, "required_minutes_watched", None): - requirements = f"{drop.required_minutes_watched} minutes watched" - if getattr(drop, "required_subs", 0) > 0: - if requirements: - requirements += f" and {drop.required_subs} subscriptions required" - else: - requirements = f"{drop.required_subs} subscriptions required" - description += f'{requirements} | ' - period: str = "" - start_at = getattr(drop, "start_at", None) - end_at = getattr(drop, "end_at", None) - if start_at is not None: - period += start_at.strftime("%Y-%m-%d %H:%M %Z") - if end_at is not None: - if period: - period += " - " + end_at.strftime( - "%Y-%m-%d %H:%M %Z", - ) - else: - period = end_at.strftime("%Y-%m-%d %H:%M %Z") - description += f'{period} | ' - description += "
{}
", desc_text)) + + # Insert start and end date info + self.insert_date_info(item, parts) + + if drops_data: + parts.append(format_html("{}
", self._construct_drops_summary(drops_data))) + + # Only show channels if drop is not subscription only + if not getattr(item, "is_subscription_only", False): + channels: QuerySet[Channel] | None = getattr(item, "allow_channels", None) + if channels is not None: + game: Game | None = getattr(item, "game", None) + parts.append(self._build_channels_html(channels, game=game)) + details_url: str | None = getattr(item, "details_url", None) if details_url: - description += format_html( - '', - details_url, + parts.append(format_html('About', details_url)) + + return SafeText("".join(str(p) for p in parts)) + + def insert_date_info(self, item: Model, parts: list[SafeText]) -> None: + """Insert start and end date information into parts list. + + Args: + item (Model): The campaign item containing start_at and end_at. + parts (list[SafeText]): The list of HTML parts to append to. + """ + end_at: datetime.datetime | None = getattr(item, "end_at", None) + start_at: datetime.datetime | None = getattr(item, "start_at", None) + + if start_at or end_at: + start_part: SafeString = ( + format_html("Starts: {} ({})", start_at.strftime("%Y-%m-%d %H:%M %Z"), naturaltime(start_at)) + if start_at + else SafeText("") ) - return SafeText(description) + end_part: SafeString = ( + format_html("Ends: {} ({})", end_at.strftime("%Y-%m-%d %H:%M %Z"), naturaltime(end_at)) + if end_at + else SafeText("") + ) + # Start date and end date separated by a line break if both present + if start_part and end_part: + parts.append(format_html("{}
{}
{}
", start_part)) + elif end_part: + parts.append(format_html("{}
", end_part)) def item_link(self, item: Model) -> str: """Return the link to the campaign detail.""" - return reverse("twitch:campaign_detail", args=[item.pk]) + if not isinstance(item, DropCampaign): + logger.error("item_link called with non-DropCampaign item: %s", type(item)) + return reverse("twitch:dashboard") + + return reverse("twitch:campaign_detail", args=[item.twitch_id]) def item_pubdate(self, item: Model) -> datetime.datetime: """Returns the publication date to the feed item. @@ -209,12 +392,16 @@ class DropCampaignFeed(Feed): def item_categories(self, item: DropCampaign) -> tuple[str, ...]: """Returns the associated game's name as a category.""" - if item.game: - return ( - "twitch", - item.game.get_game_name, - ) - return () + categories: list[str] = ["twitch", "drops"] + + item_game: Game | None = getattr(item, "game", None) + if item_game: + categories.append(item_game.get_game_name) + item_game_owner: Organization | None = getattr(item_game, "owner", None) + if item_game_owner: + categories.extend((str(item_game_owner.name), str(item_game_owner.twitch_id))) + + return tuple(categories) def item_guid(self, item: DropCampaign) -> str: """Return a unique identifier for each campaign.""" @@ -222,8 +409,10 @@ class DropCampaignFeed(Feed): def item_author_name(self, item: DropCampaign) -> str: """Return the author name for the campaign, typically the game name.""" - if item.game and item.game.display_name: - return item.game.display_name + item_game: Game | None = getattr(item, "game", None) + if item_game and item_game.display_name: + return item_game.display_name + return "Twitch" def item_enclosure_url(self, item: DropCampaign) -> str: @@ -231,14 +420,14 @@ class DropCampaignFeed(Feed): return item.image_url def item_enclosure_length(self, item: DropCampaign) -> int: # noqa: ARG002 - """Returns the length of the enclosure. + """Returns the length of the enclosure.""" + # TODO(TheLovinator): Track image size for proper length # noqa: TD003 - Currently not tracked, so return 0. - """ return 0 def item_enclosure_mime_type(self, item: DropCampaign) -> str: # noqa: ARG002 """Returns the MIME type of the enclosure.""" + # TODO(TheLovinator): Determine actual MIME type if needed # noqa: TD003 return "image/jpeg" @@ -258,23 +447,21 @@ class GameCampaignFeed(DropCampaignFeed): """ return Game.objects.get(twitch_id=twitch_id) - def title(self, obj: Game) -> str: + def title(self, obj: Game) -> str: # pyright: ignore[reportIncompatibleVariableOverride] """Return the feed title.""" return f"TTVDrops: {obj.display_name} Campaigns" - def link(self, obj: Game) -> str: + def link(self, obj: Game) -> str: # pyright: ignore[reportIncompatibleVariableOverride] """Return the link to the game detail.""" return reverse("twitch:game_detail", args=[obj.twitch_id]) - def description(self, obj: Game) -> str: + def description(self, obj: Game) -> str: # pyright: ignore[reportIncompatibleVariableOverride] """Return the feed description.""" return f"Latest drop campaigns for {obj.display_name}" - def items(self, obj: Game) -> list[DropCampaign]: + def items(self, obj: Game) -> list[DropCampaign]: # pyright: ignore[reportIncompatibleMethodOverride] """Return the latest 100 campaigns for this game.""" - return list( - DropCampaign.objects.filter(game=obj).select_related("game").order_by("-added_at")[:100], - ) + return list(DropCampaign.objects.filter(game=obj).select_related("game").order_by("-added_at")[:100]) # MARK: /rss/organizations/