from __future__ import annotations import datetime import json import logging from collections import OrderedDict from collections import defaultdict from typing import TYPE_CHECKING from typing import Any from django.core.serializers import serialize from django.db.models import Count from django.db.models import F from django.db.models import Model from django.db.models import Prefetch from django.db.models import Q from django.db.models.functions import Trim from django.db.models.query import QuerySet from django.http import Http404 from django.http import HttpRequest from django.http import HttpResponse from django.shortcuts import render from django.utils import timezone from django.views.generic import DetailView from django.views.generic import ListView from pygments import highlight from pygments.formatters import HtmlFormatter from pygments.lexers.data import JsonLexer from twitch.models import Channel from twitch.models import DropBenefit from twitch.models import DropCampaign from twitch.models import Game from twitch.models import Organization from twitch.models import TimeBasedDrop if TYPE_CHECKING: from django.db.models import QuerySet from django.http import HttpRequest from django.http import HttpResponse logger: logging.Logger = logging.getLogger("ttvdrops.views") MIN_QUERY_LENGTH_FOR_FTS = 3 MIN_SEARCH_RANK = 0.05 # MARK: /search/ def search_view(request: HttpRequest) -> HttpResponse: """Search view for all models. Args: request: The HTTP request. Returns: HttpResponse: The rendered search results. """ query: str = request.GET.get("q", "") results: dict[str, QuerySet] = {} if query: if len(query) < MIN_QUERY_LENGTH_FOR_FTS: results["organizations"] = Organization.objects.filter(name__istartswith=query) results["games"] = Game.objects.filter(Q(name__istartswith=query) | Q(display_name__istartswith=query)) results["campaigns"] = DropCampaign.objects.filter( Q(name__istartswith=query) | Q(description__icontains=query), ).select_related("game") results["drops"] = TimeBasedDrop.objects.filter(name__istartswith=query).select_related("campaign") results["benefits"] = DropBenefit.objects.filter(name__istartswith=query) else: # SQLite-compatible text search using icontains results["organizations"] = Organization.objects.filter( name__icontains=query, ) results["games"] = Game.objects.filter( Q(name__icontains=query) | Q(display_name__icontains=query), ) results["campaigns"] = DropCampaign.objects.filter( Q(name__icontains=query) | Q(description__icontains=query), ).select_related("game") results["drops"] = TimeBasedDrop.objects.filter( name__icontains=query, ).select_related("campaign") results["benefits"] = DropBenefit.objects.filter( name__icontains=query, ) return render( request, "twitch/search_results.html", {"query": query, "results": results}, ) # MARK: /organizations/ class OrgListView(ListView): """List view for organization.""" model = Organization template_name = "twitch/org_list.html" context_object_name = "orgs" # MARK: /organizations// class OrgDetailView(DetailView): """Detail view for organization.""" model = Organization template_name = "twitch/organization_detail.html" context_object_name = "organization" def get_object( self, queryset: QuerySet[Organization] | None = None, ) -> Organization: """Get the organization object using twitch_id. Args: queryset: Optional queryset to use. Returns: Organization: The organization object. Raises: Http404: If the organization is not found. """ if queryset is None: queryset = self.get_queryset() # Use twitch_id as the lookup field since it's the primary key pk: str | None = self.kwargs.get(self.pk_url_kwarg) try: org: Organization = queryset.get(twitch_id=pk) except Organization.DoesNotExist as exc: msg = "No organization found matching the query" raise Http404(msg) from exc return org def get_context_data(self, **kwargs) -> dict[str, Any]: """Add additional context data. Args: **kwargs: Additional arguments. Returns: dict: Context data. """ context: dict[str, Any] = super().get_context_data(**kwargs) organization: Organization = self.get_object() # pyright: ignore[reportAssignmentType] games: QuerySet[Game] = organization.games.all() # pyright: ignore[reportAttributeAccessIssue] serialized_org: str = serialize( "json", [organization], fields=("name",), ) org_data: list[dict] = json.loads(serialized_org) if games.exists(): serialized_games: str = serialize( "json", games, fields=("slug", "name", "display_name", "box_art"), ) games_data: list[dict] = json.loads(serialized_games) org_data[0]["fields"]["games"] = games_data pretty_org_data: str = json.dumps(org_data[0], indent=4) context.update( { "games": games, "org_data": pretty_org_data, }, ) return context # MARK: /campaigns/ class DropCampaignListView(ListView): """List view for drop campaigns.""" model = DropCampaign template_name = "twitch/campaign_list.html" context_object_name = "campaigns" paginate_by = 100 def get_queryset(self) -> QuerySet[DropCampaign]: """Get queryset of drop campaigns. Returns: QuerySet: Filtered drop campaigns. """ queryset: QuerySet[DropCampaign] = super().get_queryset() game_filter: str | None = self.request.GET.get("game") if game_filter: queryset = queryset.filter(game__id=game_filter) return queryset.select_related("game__owner").order_by("-start_at") def get_context_data(self, **kwargs) -> dict[str, Any]: """Add additional context data. Args: **kwargs: Additional arguments. Returns: dict: Context data. """ context: dict[str, Any] = super().get_context_data(**kwargs) context["games"] = Game.objects.all().order_by("display_name") context["status_options"] = ["active", "upcoming", "expired"] context["now"] = timezone.now() context["selected_game"] = self.request.GET.get("game", "") context["selected_per_page"] = self.paginate_by context["selected_status"] = self.request.GET.get("status", "") return context def format_and_color_json(data: dict[str, Any] | str) -> str: """Format and color a JSON string for HTML display. Args: data: Either a dictionary or a JSON string to format. Returns: str: The formatted code with HTML styles. """ if isinstance(data, dict): formatted_code: str = json.dumps(data, indent=4) else: formatted_code = data return highlight(formatted_code, JsonLexer(), HtmlFormatter()) # MARK: /campaigns// class DropCampaignDetailView(DetailView): """Detail view for a drop campaign.""" model = DropCampaign template_name = "twitch/campaign_detail.html" context_object_name = "campaign" def get_object( self, queryset: QuerySet[DropCampaign] | None = None, ) -> Model: """Get the campaign object with related data prefetched. Args: queryset: Optional queryset to use. Returns: DropCampaign: The campaign object with prefetched relations. """ if queryset is None: queryset = self.get_queryset() queryset = queryset.select_related("game__owner") return super().get_object(queryset=queryset) def get_context_data(self, **kwargs: object) -> dict[str, Any]: # noqa: PLR0914 """Add additional context data. Args: **kwargs: Additional arguments. Returns: dict: Context data. """ context: dict[str, Any] = super().get_context_data(**kwargs) campaign: DropCampaign = context["campaign"] drops: QuerySet[TimeBasedDrop] = ( TimeBasedDrop.objects .filter(campaign=campaign) .select_related("campaign") .prefetch_related("benefits") .order_by("required_minutes_watched") ) serialized_campaign = serialize( "json", [campaign], fields=( "name", "description", "details_url", "account_link_url", "image_url", "start_at", "end_at", "is_account_connected", "game", "created_at", "updated_at", ), ) campaign_data = json.loads(serialized_campaign) if drops.exists(): serialized_drops = serialize( "json", drops, fields=( "name", "required_minutes_watched", "required_subs", "start_at", "end_at", ), ) drops_data: list[dict[str, Any]] = json.loads(serialized_drops) for i, drop in enumerate(drops): drop_benefits: list[DropBenefit] = list(drop.benefits.all()) if drop_benefits: serialized_benefits = serialize( "json", drop_benefits, fields=("name", "image_asset_url"), ) benefits_data = json.loads(serialized_benefits) drops_data[i]["fields"]["benefits"] = benefits_data campaign_data[0]["fields"]["drops"] = drops_data # Enhance drops with additional context data enhanced_drops: list[dict[str, TimeBasedDrop | datetime.datetime | str | None]] = [] now: datetime.datetime = timezone.now() for drop in drops: # Calculate countdown text if drop.end_at and drop.end_at > now: time_diff: datetime.timedelta = drop.end_at - now days: int = time_diff.days hours, remainder = divmod(time_diff.seconds, 3600) minutes, seconds = divmod(remainder, 60) if days > 0: countdown_text: str = f"{days}d {hours}h {minutes}m" elif hours > 0: countdown_text = f"{hours}h {minutes}m" elif minutes > 0: countdown_text = f"{minutes}m {seconds}s" else: countdown_text = f"{seconds}s" elif drop.start_at and drop.start_at > now: countdown_text = "Not started" else: countdown_text = "Expired" enhanced_drop: dict[str, TimeBasedDrop | datetime.datetime | str | None] = { "drop": drop, "local_start": drop.start_at, "local_end": drop.end_at, "timezone_name": "UTC", "countdown_text": countdown_text, } enhanced_drops.append(enhanced_drop) context["now"] = now context["drops"] = enhanced_drops context["campaign_data"] = format_and_color_json(campaign_data[0]) context["owner"] = campaign.game.owner context["allowed_channels"] = campaign.allow_channels.all().order_by("display_name") return context # MARK: /games/ class GamesGridView(ListView): """List view for games grouped by organization.""" model = Game template_name = "twitch/games_grid.html" context_object_name = "games" def get_queryset(self) -> QuerySet[Game]: """Get queryset of all games, annotated with campaign counts. Returns: QuerySet: Annotated games queryset. """ now: datetime.datetime = timezone.now() return ( super() .get_queryset() .annotate( campaign_count=Count("drop_campaigns", distinct=True), active_count=Count( "drop_campaigns", filter=Q( drop_campaigns__start_at__lte=now, drop_campaigns__end_at__gte=now, ), distinct=True, ), ) .order_by("display_name") ) def get_context_data(self, **kwargs) -> dict[str, Any]: """Add additional context data. Games are grouped by their owning organization. Args: **kwargs: Additional arguments. Returns: dict: Context data with games grouped by organization. """ context: dict[str, Any] = super().get_context_data(**kwargs) now: datetime.datetime = timezone.now() games_with_campaigns: QuerySet[Game] = ( Game.objects .filter(drop_campaigns__isnull=False) .select_related("owner") .annotate( campaign_count=Count("drop_campaigns", distinct=True), active_count=Count( "drop_campaigns", filter=Q( drop_campaigns__start_at__lte=now, drop_campaigns__end_at__gte=now, ), distinct=True, ), ) .order_by("owner__name", "display_name") ) games_by_org: defaultdict[Organization, list[dict[str, Game]]] = defaultdict(list) for game in games_with_campaigns: if game.owner: games_by_org[game.owner].append({"game": game}) context["games_by_org"] = OrderedDict( sorted(games_by_org.items(), key=lambda item: item[0].name), ) return context # MARK: /games// class GameDetailView(DetailView): """Detail view for a game.""" model = Game template_name = "twitch/game_detail.html" context_object_name = "game" def get_object(self, queryset: QuerySet[Game] | None = None) -> Game: """Get the game object using twitch_id as the primary key lookup. Args: queryset: Optional queryset to use. Returns: Game: The game object. Raises: Http404: If the game is not found. """ if queryset is None: queryset = self.get_queryset() # Use twitch_id as the lookup field since it's the primary key pk = self.kwargs.get(self.pk_url_kwarg) try: game = queryset.get(twitch_id=pk) except Game.DoesNotExist as exc: msg = "No game found matching the query" raise Http404(msg) from exc return game def get_context_data(self, **kwargs: object) -> dict[str, Any]: """Add additional context data. Args: **kwargs: Additional arguments. Returns: dict: Context data with active, upcoming, and expired campaigns. Expired campaigns are filtered based on either end date or status. """ context: dict[str, Any] = super().get_context_data(**kwargs) game: Game = self.get_object() # pyright: ignore[reportAssignmentType] now: datetime.datetime = timezone.now() all_campaigns: QuerySet[DropCampaign] = ( DropCampaign.objects .filter(game=game) .select_related("game__owner") .prefetch_related( Prefetch( "time_based_drops", queryset=TimeBasedDrop.objects.prefetch_related( Prefetch( "benefits", queryset=DropBenefit.objects.order_by("name"), ), ), ), ) .order_by("-end_at") ) active_campaigns: list[DropCampaign] = [ campaign for campaign in all_campaigns if campaign.start_at is not None and campaign.start_at <= now and campaign.end_at is not None and campaign.end_at >= now ] active_campaigns.sort( key=lambda c: c.end_at if c.end_at is not None else datetime.datetime.max.replace(tzinfo=datetime.UTC), ) upcoming_campaigns: list[DropCampaign] = [ campaign for campaign in all_campaigns if campaign.start_at is not None and campaign.start_at > now ] upcoming_campaigns.sort( key=lambda c: c.start_at if c.start_at is not None else datetime.datetime.max.replace(tzinfo=datetime.UTC), ) expired_campaigns: list[DropCampaign] = [ campaign for campaign in all_campaigns if campaign.end_at is not None and campaign.end_at < now ] # Build campaign data with sorted benefits campaigns_with_benefits: list[dict[str, Any]] = [] for campaign in all_campaigns: benefits_dict: dict[int, DropBenefit] = {} for drop in campaign.time_based_drops.all(): # type: ignore[attr-defined] for benefit in drop.benefits.all(): benefits_dict[benefit.id] = benefit sorted_benefits = sorted( benefits_dict.values(), key=lambda b: b.name, ) campaigns_with_benefits.append( { "campaign": campaign, "sorted_benefits": sorted_benefits, }, ) serialized_game: str = serialize( "json", [game], fields=( "slug", "name", "display_name", "box_art", "owner", ), ) game_data: list[dict[str, Any]] = json.loads(serialized_game) if all_campaigns.exists(): serialized_campaigns = serialize( "json", all_campaigns, fields=( "name", "description", "details_url", "account_link_url", "image_url", "start_at", "end_at", "is_account_connected", ), ) campaigns_data: list[dict[str, Any]] = json.loads( serialized_campaigns, ) game_data[0]["fields"]["campaigns"] = campaigns_data context.update( { "active_campaigns": active_campaigns, "upcoming_campaigns": upcoming_campaigns, "expired_campaigns": expired_campaigns, "campaigns_with_benefits": campaigns_with_benefits, "owner": game.owner, "now": now, "game_data": format_and_color_json(game_data[0]), }, ) return context # MARK: / def dashboard(request: HttpRequest) -> HttpResponse: """Dashboard view showing active campaigns and progress. Args: request: The HTTP request. Returns: HttpResponse: The rendered dashboard template. """ now: datetime.datetime = timezone.now() active_campaigns: QuerySet[DropCampaign] = ( DropCampaign.objects .filter(start_at__lte=now, end_at__gte=now) .select_related("game__owner") .prefetch_related( "allow_channels", ) ) campaigns_by_org_game: dict[str, Any] = {} for campaign in active_campaigns: owner: Organization | None = campaign.game.owner org_id: str = owner.twitch_id if owner else "unknown" org_name: str = owner.name if owner else "Unknown" game_id: str = campaign.game.twitch_id game_name: str = campaign.game.display_name if org_id not in campaigns_by_org_game: campaigns_by_org_game[org_id] = {"name": org_name, "games": {}} if game_id not in campaigns_by_org_game[org_id]["games"]: campaigns_by_org_game[org_id]["games"][game_id] = { "name": game_name, "box_art": campaign.game.box_art, "campaigns": [], } campaigns_by_org_game[org_id]["games"][game_id]["campaigns"].append( campaign, ) sorted_campaigns_by_org_game: dict[str, Any] = { org_id: campaigns_by_org_game[org_id] for org_id in sorted( campaigns_by_org_game.keys(), key=lambda k: campaigns_by_org_game[k]["name"], ) } for org_data in sorted_campaigns_by_org_game.values(): org_data["games"] = { game_id: org_data["games"][game_id] for game_id in sorted( org_data["games"].keys(), key=lambda k: org_data["games"][k]["name"], ) } return render( request, "twitch/dashboard.html", { "active_campaigns": active_campaigns, "campaigns_by_org_game": sorted_campaigns_by_org_game, "now": now, }, ) # MARK: /debug/ def debug_view(request: HttpRequest) -> HttpResponse: """Debug view showing potentially broken or inconsistent data. Only staff users may access this endpoint. Returns: HttpResponse: Rendered debug template or redirect if unauthorized. """ now: datetime.datetime = timezone.now() # Games with no assigned owner organization games_without_owner: QuerySet[Game] = Game.objects.filter( owner__isnull=True, ).order_by("display_name") # Campaigns with missing or obviously broken images broken_image_campaigns: QuerySet[DropCampaign] = DropCampaign.objects.filter( Q(image_url__isnull=True) | Q(image_url__exact="") | ~Q(image_url__startswith="http"), ).select_related("game") # Benefits with missing images broken_benefit_images: QuerySet[DropBenefit] = DropBenefit.objects.annotate( trimmed_url=Trim("image_asset_url"), ).filter( Q(image_asset_url__isnull=True) | Q(trimmed_url__exact="") | ~Q(image_asset_url__startswith="http"), ) # Time-based drops without any benefits drops_without_benefits: QuerySet[TimeBasedDrop] = TimeBasedDrop.objects.filter( benefits__isnull=True, ).select_related( "campaign__game", ) # Campaigns with invalid dates (start after end or missing either) invalid_date_campaigns: QuerySet[DropCampaign] = DropCampaign.objects.filter( Q(start_at__gt=F("end_at")) | Q(start_at__isnull=True) | Q(end_at__isnull=True), ).select_related("game") # Duplicate campaign names per game. # We retrieve the game's name for user-friendly display. duplicate_name_campaigns = ( DropCampaign.objects .values("game_id", "game__display_name", "name") .annotate(name_count=Count("twitch_id")) .filter(name_count__gt=1) .order_by("game__display_name", "name") ) # Campaigns currently active but image missing active_missing_image: QuerySet[DropCampaign] = ( DropCampaign.objects .filter(start_at__lte=now, end_at__gte=now) .filter( Q(image_url__isnull=True) | Q(image_url__exact="") | ~Q(image_url__startswith="http"), ) .select_related("game") ) context: dict[str, Any] = { "now": now, "games_without_owner": games_without_owner, "broken_image_campaigns": broken_image_campaigns, "broken_benefit_images": broken_benefit_images, "drops_without_benefits": drops_without_benefits, "invalid_date_campaigns": invalid_date_campaigns, "duplicate_name_campaigns": duplicate_name_campaigns, "active_missing_image": active_missing_image, } return render( request, "twitch/debug.html", context, ) # MARK: /games/list/ class GamesListView(GamesGridView): """List view for games in simple list format.""" template_name = "twitch/games_list.html" # MARK: /docs/rss/ def docs_rss_view(request: HttpRequest) -> HttpResponse: """View for /docs/rss that lists all available RSS feeds. Args: request: The HTTP request object. Returns: Rendered HTML response with list of RSS feeds. """ feeds: list[dict[str, str]] = [ { "title": "Organizations", "description": "Latest organizations", "url": "/rss/organizations/", }, { "title": "Games", "description": "Latest games", "url": "/rss/games/", }, { "title": "Drop Campaigns", "description": "Latest drop campaigns", "url": "/rss/campaigns/", }, ] return render(request, "twitch/docs_rss.html", {"feeds": feeds}) # MARK: /channels/ class ChannelListView(ListView): """List view for channels.""" model = Channel template_name = "twitch/channel_list.html" context_object_name = "channels" paginate_by = 200 def get_queryset(self) -> QuerySet[Channel]: """Get queryset of channels. Returns: QuerySet: Filtered channels. """ queryset: QuerySet[Channel] = super().get_queryset() search_query: str | None = self.request.GET.get("search") if search_query: queryset = queryset.filter( Q(name__icontains=search_query) | Q(display_name__icontains=search_query), ) return queryset.annotate( campaign_count=Count("allowed_campaigns", distinct=True), ).order_by("-campaign_count", "name") def get_context_data(self, **kwargs) -> dict[str, Any]: """Add additional context data. Args: **kwargs: Additional arguments. Returns: dict: Context data. """ context: dict[str, Any] = super().get_context_data(**kwargs) context["search_query"] = self.request.GET.get("search", "") return context # MARK: /channels// class ChannelDetailView(DetailView): """Detail view for a channel.""" model = Channel template_name = "twitch/channel_detail.html" context_object_name = "channel" def get_object(self, queryset: QuerySet[Channel] | None = None) -> Channel: """Get the channel object using twitch_id as the primary key lookup. Args: queryset: Optional queryset to use. Returns: Channel: The channel object. Raises: Http404: If the channel is not found. """ if queryset is None: queryset = self.get_queryset() # Use twitch_id as the lookup field since it's the primary key pk = self.kwargs.get(self.pk_url_kwarg) try: channel = queryset.get(twitch_id=pk) except Channel.DoesNotExist as exc: msg = "No channel found matching the query" raise Http404(msg) from exc return channel def get_context_data(self, **kwargs: object) -> dict[str, Any]: """Add additional context data. Args: **kwargs: Additional arguments. Returns: dict: Context data with active, upcoming, and expired campaigns. """ context: dict[str, Any] = super().get_context_data(**kwargs) channel: Channel = self.get_object() # pyright: ignore[reportAssignmentType] now: datetime.datetime = timezone.now() all_campaigns: QuerySet[DropCampaign] = ( DropCampaign.objects .filter(allow_channels=channel) .select_related("game__owner") .prefetch_related( Prefetch( "time_based_drops", queryset=TimeBasedDrop.objects.prefetch_related( Prefetch( "benefits", queryset=DropBenefit.objects.order_by("name"), ), ), ), ) .order_by("-start_at") ) active_campaigns: list[DropCampaign] = [ campaign for campaign in all_campaigns if campaign.start_at is not None and campaign.start_at <= now and campaign.end_at is not None and campaign.end_at >= now ] active_campaigns.sort( key=lambda c: c.end_at if c.end_at is not None else datetime.datetime.max.replace(tzinfo=datetime.UTC), ) upcoming_campaigns: list[DropCampaign] = [ campaign for campaign in all_campaigns if campaign.start_at is not None and campaign.start_at > now ] upcoming_campaigns.sort( key=lambda c: c.start_at if c.start_at is not None else datetime.datetime.max.replace(tzinfo=datetime.UTC), ) expired_campaigns: list[DropCampaign] = [ campaign for campaign in all_campaigns if campaign.end_at is not None and campaign.end_at < now ] # Build campaign data with sorted benefits campaigns_with_benefits = [] for campaign in all_campaigns: benefits_dict: dict[int, DropBenefit] = {} for drop in campaign.time_based_drops.all(): # type: ignore[attr-defined] for benefit in drop.benefits.all(): benefits_dict[benefit.id] = benefit sorted_benefits = sorted( benefits_dict.values(), key=lambda b: b.name, ) campaigns_with_benefits.append( { "campaign": campaign, "sorted_benefits": sorted_benefits, }, ) serialized_channel = serialize( "json", [channel], fields=( "name", "display_name", ), ) channel_data = json.loads(serialized_channel) if all_campaigns.exists(): serialized_campaigns = serialize( "json", all_campaigns, fields=( "name", "description", "details_url", "account_link_url", "image_url", "start_at", "end_at", "is_account_connected", ), ) campaigns_data = json.loads(serialized_campaigns) channel_data[0]["fields"]["campaigns"] = campaigns_data context.update( { "active_campaigns": active_campaigns, "upcoming_campaigns": upcoming_campaigns, "expired_campaigns": expired_campaigns, "campaigns_with_benefits": campaigns_with_benefits, "now": now, "channel_data": format_and_color_json(channel_data[0]), }, ) return context