import logging
import re
from typing import TYPE_CHECKING
from typing import Literal
from django.contrib.humanize.templatetags.humanize import naturaltime
from django.contrib.syndication.views import Feed
from django.db.models import Prefetch
from django.db.models.query import QuerySet
from django.urls import reverse
from django.utils import feedgenerator
from django.utils import timezone
from django.utils.html import format_html
from django.utils.html import format_html_join
from django.utils.safestring import SafeText
from twitch.models import Channel
from twitch.models import ChatBadge
from twitch.models import DropCampaign
from twitch.models import Game
from twitch.models import Organization
from twitch.models import RewardCampaign
from twitch.models import TimeBasedDrop
if TYPE_CHECKING:
import datetime
from django.db.models import Model
from django.db.models import QuerySet
from django.http import HttpRequest
from django.http import HttpResponse
from django.utils.safestring import SafeString
from twitch.models import DropBenefit
logger: logging.Logger = logging.getLogger("ttvdrops")
def _with_campaign_related(queryset: QuerySet[DropCampaign]) -> QuerySet[DropCampaign]:
"""Apply related-selects/prefetches needed by feed rendering to avoid N+1 queries.
Returns:
QuerySet[DropCampaign]: Queryset with related data preloaded for feed rendering.
"""
drops_prefetch: Prefetch = Prefetch(
"time_based_drops",
queryset=TimeBasedDrop.objects.prefetch_related("benefits"),
)
return queryset.select_related("game").prefetch_related(
"game__owners",
Prefetch(
"allow_channels",
queryset=Channel.objects.order_by("display_name"),
to_attr="channels_ordered",
),
drops_prefetch,
)
def _active_drop_campaigns(queryset: QuerySet[DropCampaign]) -> QuerySet[DropCampaign]:
"""Filter a campaign queryset down to campaigns active right now.
Returns:
QuerySet[DropCampaign]: Queryset with only active drop campaigns.
"""
now: datetime.datetime = timezone.now()
return queryset.filter(start_at__lte=now, end_at__gte=now)
def _active_reward_campaigns(
queryset: QuerySet[RewardCampaign],
) -> QuerySet[RewardCampaign]:
"""Filter a reward campaign queryset down to campaigns active right now.
Returns:
QuerySet[RewardCampaign]: Queryset with only active reward campaigns.
"""
now: datetime.datetime = timezone.now()
return queryset.filter(starts_at__lte=now, ends_at__gte=now)
def genereate_details_link_html(item: DropCampaign) -> list[SafeText]:
"""Helper method to append a details link to the description if available.
Args:
item (DropCampaign): The drop campaign item to check for a details URL.
Returns:
list[SafeText]: A list containing the formatted details link if a details URL is present, otherwise an empty list.
"""
parts: list[SafeText] = []
details_url: str | None = getattr(item, "details_url", None)
if details_url:
parts.append(format_html('About', details_url))
return parts
def generate_item_image(item: DropCampaign) -> list[SafeText]:
"""Helper method to generate an image tag for the campaign image if available.
Args:
item (DropCampaign): The drop campaign item to check for an image URL.
Returns:
list[SafeText]: A list containing the formatted image tag if an image URL is present, otherwise an empty list.
"""
parts: list[SafeText] = []
image_url: str | None = getattr(item, "image_url", None)
if image_url:
item_name: str = getattr(item, "name", str(object=item))
parts.append(
format_html(
'',
image_url,
item_name,
),
)
return parts
def generate_item_image_tag(item: DropCampaign) -> list[SafeString]:
"""Helper method to append an image tag for the campaign image if available.
Args:
item (DropCampaign): The drop campaign item to check for an image URL.
Returns:
list[SafeString]: A list containing the formatted image tag if an image URL is present, otherwise an empty list.
"""
parts: list[SafeText] = []
image_url: str | None = getattr(item, "image_url", None)
if image_url:
item_name: str = getattr(item, "name", str(object=item))
parts.append(
format_html(
'
',
image_url,
item_name,
),
)
return parts
def generate_details_link(item: DropCampaign) -> list[SafeString]:
"""Helper method to append a details link to the description if available.
Args:
item (DropCampaign): The drop campaign item to check for a details URL.
Returns:
list[SafeString]: A list containing the formatted details link if a details URL is present, otherwise an empty list.
"""
parts: list[SafeText] = []
details_url: str | None = getattr(item, "details_url", None)
if details_url:
parts.append(format_html('Details', details_url))
return parts
def generate_description_html(item: DropCampaign) -> list[SafeText]:
"""Generate additional description HTML for a drop campaign item, such as the description text and details link.
Args:
item (DropCampaign): The drop campaign item to generate description HTML for.
Returns:
list[SafeText]: A list of SafeText elements containing the generated description HTML.
"""
parts: list[SafeText] = []
desc_text: str | None = getattr(item, "description", None)
if desc_text:
parts.append(format_html("
{}
", desc_text)) return parts def generate_date_html(item: Model) -> list[SafeText]: """Generate HTML snippets for the start and end dates of a campaign item, formatted with both absolute and relative times. Args: item (Model): The campaign item containing start_at and end_at. Returns: list[SafeText]: A list of SafeText elements with formatted start and end date info. """ parts: list[SafeText] = [] end_at: datetime.datetime | None = getattr(item, "end_at", None) start_at: datetime.datetime | None = getattr(item, "start_at", None) if start_at or end_at: start_part: SafeString = ( format_html( "Starts: {} ({})", start_at.strftime("%Y-%m-%d %H:%M %Z"), naturaltime(start_at), ) if start_at else SafeText("") ) end_part: SafeString = ( format_html( "Ends: {} ({})", end_at.strftime("%Y-%m-%d %H:%M %Z"), naturaltime(end_at), ) if end_at else SafeText("") ) # Start date and end date separated by a line break if both present if start_part and end_part: parts.append(format_html("{}
{}
{}
", start_part)) elif end_part: parts.append(format_html("{}
", end_part)) return parts def generate_drops_summary_html(item: DropCampaign) -> list[SafeString]: """Generate HTML summary for drops and append to parts list. Args: item (DropCampaign): The drop campaign item containing the drops to summarize. Returns: list[SafeString]: A list of SafeText elements summarizing the drops, or empty if no drops. """ parts: list[SafeText] = [] drops_data: list[dict] = [] channels: list[Channel] | None = getattr(item, "channels_ordered", None) channel_name: str | None = channels[0].name if channels else None drops: QuerySet[TimeBasedDrop] | None = getattr(item, "time_based_drops", None) if drops: drops_data = _build_drops_data(drops.all()) if drops_data: parts.append( format_html( "{}
", _construct_drops_summary(drops_data, channel_name=channel_name), ), ) return parts def generate_channels_html(item: Model) -> list[SafeText]: """Generate HTML for the list of channels associated with a drop campaign, if applicable. Only generates channel links if the drop is not subscription-only. If it is subscription-only, it will skip channel links to avoid confusion. Args: item (Model): The campaign item which may have an 'is_subscription_only' attribute. Returns: list[SafeText]: A list containing the HTML for the channels section, or empty if subscription-only or no channels. """ max_links = 5 parts: list[SafeText] = [] channels: list[Channel] | None = getattr(item, "channels_ordered", None) if not channels: return parts if getattr(item, "is_subscription_only", False): return parts game: Game | None = getattr(item, "game", None) channels_all: list[Channel] = ( list(channels) if isinstance(channels, list) else list(channels.all()) if channels is not None else [] ) total: int = len(channels_all) if channels_all: create_channel_list_html(max_links, parts, channels_all, total) return parts if not game: logger.warning( "No game associated with drop campaign for channel fallback link", ) parts.append( format_html( "{}", "Channels with this drop:
{}", html)) def _build_drops_data(drops_qs: QuerySet[TimeBasedDrop]) -> list[dict]: """Build a simplified data structure for rendering drops in a template. Returns: list[dict]: A list of dictionaries each containing `name`, `benefits`, `requirements`, and `period` for a drop, suitable for template rendering. """ drops_data: list[dict] = [] for drop in drops_qs: requirements: str = "" required_minutes: int | None = getattr(drop, "required_minutes_watched", None) required_subs: int = getattr(drop, "required_subs", 0) or 0 if required_minutes: requirements = f"{required_minutes} minutes watched" if required_subs > 0: sub_word: Literal["subs", "sub"] = "subs" if required_subs > 1 else "sub" if requirements: requirements += f" and {required_subs} {sub_word} required" else: requirements = f"{required_subs} {sub_word} required" period: str = "" drop_start: datetime.datetime | None = getattr(drop, "start_at", None) drop_end: datetime.datetime | None = getattr(drop, "end_at", None) if drop_start is not None: period += drop_start.strftime("%Y-%m-%d %H:%M %Z") if drop_end is not None: if period: period += " - " + drop_end.strftime("%Y-%m-%d %H:%M %Z") else: period = drop_end.strftime("%Y-%m-%d %H:%M %Z") drops_data.append({ "name": getattr(drop, "name", str(drop)), "benefits": list(drop.benefits.all()), "requirements": requirements, "period": period, }) return drops_data def _construct_drops_summary( drops_data: list[dict], channel_name: str | None = None, ) -> SafeText: """Construct a safe HTML summary of drops and their benefits. If the requirements indicate a subscription is required, link the benefit names to the Twitch channel. Args: drops_data (list[dict]): List of drop data dicts. channel_name (str | None): Optional channel name to link benefit names to on Twitch. Returns: SafeText: A single safe HTML line summarizing all drops, or empty SafeText if none. """ if not drops_data: return SafeText("") badge_titles: set[str] = set() for drop in drops_data: for b in drop.get("benefits", []): if getattr(b, "distribution_type", "") == "BADGE" and getattr( b, "name", "", ): badge_titles.add(b.name) badge_descriptions_by_title: dict[str, str] = {} if badge_titles: badge_descriptions_by_title = dict( ChatBadge.objects.filter(title__in=badge_titles).values_list( "title", "description", ), ) def sort_key(drop: dict) -> tuple[bool, int]: req: str = drop.get("requirements", "") m: re.Match[str] | None = re.search(r"(\d+) minutes watched", req) minutes: int | None = int(m.group(1)) if m else None is_sub: bool = "sub required" in req or "subs required" in req return (is_sub, minutes if minutes is not None else 99999) sorted_drops: list[dict] = sorted(drops_data, key=sort_key) items: list[SafeText] = [] for drop in sorted_drops: requirements: str = drop.get("requirements", "") benefits: list[DropBenefit] = drop.get("benefits", []) is_sub_required: bool = ( "sub required" in requirements or "subs required" in requirements ) benefit_names: list[tuple[str]] = [] for b in benefits: benefit_name: str = getattr(b, "name", str(b)) badge_desc: str | None = badge_descriptions_by_title.get(benefit_name) if is_sub_required and channel_name: linked_name: SafeString = format_html( '{}', channel_name, channel_name, ) if badge_desc: benefit_names.append(( format_html("{} ({})", linked_name, badge_desc), )) else: benefit_names.append((linked_name,)) elif badge_desc: benefit_names.append(( format_html("{} ({})", benefit_name, badge_desc), )) else: benefit_names.append((benefit_name,)) benefits_str: SafeString = ( format_html_join(", ", "{}", benefit_names) if benefit_names else SafeText("") ) if requirements: items.append(format_html("{game_name} has been added to ttvdrops.lovinator.space!\nOwned by {game_owner}.\n\n" f"[Details] " f"[Twitch] " f"[RSS feed]\n
", ), ) return SafeText("".join(str(part) for part in description_parts)) def item_link(self, item: Game) -> str: """Return the link to the game detail.""" return reverse("twitch:game_detail", args=[item.twitch_id]) def item_pubdate(self, item: Game) -> datetime.datetime: """Returns the publication date to the feed item. Fallback to added_at or now if missing. """ added_at: datetime.datetime | None = getattr(item, "added_at", None) if added_at: return added_at return timezone.now() def item_updateddate(self, item: Game) -> datetime.datetime: """Returns the game's last update time.""" updated_at: datetime.datetime | None = getattr(item, "updated_at", None) if updated_at: return updated_at return timezone.now() def item_guid(self, item: Game) -> str: """Return a unique identifier for each game.""" twitch_id: str = getattr(item, "twitch_id", "unknown") return twitch_id + "@ttvdrops.com" def item_author_name(self, item: Game) -> str: """Return the author name for the game, typically the owner organization name.""" owner: Organization | None = item.owners.first() if owner and owner.name: return owner.name return "Twitch" def item_enclosure_url(self, item: Game) -> str: """Returns the URL of the game's box art for enclosure.""" box_art: str = item.box_art_best_url if box_art: return box_art return "" def item_enclosure_length(self, item: Game) -> int: """Returns the length of the enclosure. Prefer the newly-added ``box_art_size_bytes`` field so that the RSS feed can include an accurate ``length`` attribute. Fall back to 0 if the value is missing or ``None``. """ try: size = getattr(item, "box_art_size_bytes", None) return int(size) if size is not None else 0 except TypeError, ValueError: return 0 def item_enclosure_mime_type(self, item: Game) -> str: """Returns the MIME type of the enclosure. Use the ``box_art_mime_type`` field when available, otherwise fall back to a generic JPEG string (as was previously hard-coded). """ mime: str = getattr(item, "box_art_mime_type", "") return mime or "image/jpeg" # MARK: /rss/campaigns/ class DropCampaignFeed(Feed): """RSS feed for latest drop campaigns.""" title: str = "Twitch Drop Campaigns" link: str = "/campaigns/" description: str = "Latest Twitch drop campaigns on TTVDrops" feed_url: str = "/rss/campaigns/" feed_copyright: str = "Information wants to be free." _limit: int | None = None def __call__( self, request: HttpRequest, *args: object, **kwargs: object, ) -> HttpResponse: """Override to capture limit parameter from request. Args: request (HttpRequest): The incoming HTTP request, potentially containing a 'limit' query parameter. *args: Additional positional arguments. **kwargs: Additional keyword arguments. Returns: HttpResponse: The HTTP response generated by the parent Feed class after processing the request. """ if request.GET.get("limit"): try: self._limit = int(request.GET.get("limit", 200)) except ValueError, TypeError: self._limit = None return super().__call__(request, *args, **kwargs) def items(self) -> list[DropCampaign]: """Return the latest drop campaigns ordered by most recent start date (default 200, or limited by ?limit query param).""" limit: int = self._limit if self._limit is not None else 200 queryset: QuerySet[DropCampaign] = _active_drop_campaigns( DropCampaign.objects.order_by("-start_at"), ) return list(_with_campaign_related(queryset)[:limit]) def item_title(self, item: DropCampaign) -> SafeText: """Return the campaign name as the item title (SafeText for RSS).""" return SafeText(item.get_feed_title()) def item_description(self, item: DropCampaign) -> SafeText: """Return a description of the campaign.""" parts: list[SafeText] = [] parts.extend(generate_item_image(item)) parts.extend(generate_description_html(item=item)) parts.extend(generate_date_html(item=item)) parts.extend(generate_drops_summary_html(item=item)) parts.extend(generate_channels_html(item)) parts.extend(genereate_details_link_html(item)) return SafeText("".join(str(p) for p in parts)) def item_link(self, item: DropCampaign) -> str: """Return the link to the campaign detail.""" return item.get_feed_link() def item_pubdate(self, item: DropCampaign) -> datetime.datetime: """Returns the publication date to the feed item. Fallback to updated_at or now if missing. """ return item.get_feed_pubdate() def item_updateddate(self, item: DropCampaign) -> datetime.datetime: """Returns the campaign's last update time.""" return item.updated_at def item_categories(self, item: DropCampaign) -> tuple[str, ...]: """Returns the associated game's name as a category.""" return item.get_feed_categories() def item_guid(self, item: DropCampaign) -> str: """Return a unique identifier for each campaign.""" return item.get_feed_guid() def item_author_name(self, item: DropCampaign) -> str: """Return the author name for the campaign, typically the game name.""" return item.get_feed_author_name() def item_enclosure_url(self, item: DropCampaign) -> str: """Returns the URL of the campaign image for enclosure.""" return item.get_feed_enclosure_url() def item_enclosure_length(self, item: DropCampaign) -> int: """Returns the length of the enclosure.""" try: size: int | None = getattr(item, "image_size_bytes", None) return int(size) if size is not None else 0 except TypeError, ValueError: return 0 def item_enclosure_mime_type(self, item: DropCampaign) -> str: """Returns the MIME type of the enclosure.""" mime: str = getattr(item, "image_mime_type", "") return mime or "image/jpeg" # MARK: /rss/games/