ttvdrops/twitch/views.py

1823 lines
59 KiB
Python

from __future__ import annotations
import csv
import datetime
import json
import logging
import operator
from collections import OrderedDict
from collections import defaultdict
from copy import copy
from typing import TYPE_CHECKING
from typing import Any
from typing import Literal
from django.conf import settings
from django.core.paginator import EmptyPage
from django.core.paginator import Page
from django.core.paginator import PageNotAnInteger
from django.core.paginator import Paginator
from django.core.serializers import serialize
from django.db.models import Count
from django.db.models import F
from django.db.models import OuterRef
from django.db.models import Prefetch
from django.db.models import Q
from django.db.models import Subquery
from django.db.models.functions import Trim
from django.http import FileResponse
from django.http import Http404
from django.http import HttpRequest
from django.http import HttpResponse
from django.shortcuts import render
from django.template.defaultfilters import filesizeformat
from django.urls import reverse
from django.utils import timezone
from django.views.generic import DetailView
from django.views.generic import ListView
from pygments import highlight
from pygments.formatters import HtmlFormatter
from pygments.lexers.data import JsonLexer
from twitch.feeds import DropCampaignFeed
from twitch.feeds import GameCampaignFeed
from twitch.feeds import GameFeed
from twitch.feeds import OrganizationCampaignFeed
from twitch.feeds import OrganizationRSSFeed
from twitch.feeds import RewardCampaignFeed
from twitch.models import Channel
from twitch.models import ChatBadge
from twitch.models import ChatBadgeSet
from twitch.models import DropBenefit
from twitch.models import DropCampaign
from twitch.models import Game
from twitch.models import Organization
from twitch.models import RewardCampaign
from twitch.models import TimeBasedDrop
if TYPE_CHECKING:
from collections.abc import Callable
from os import stat_result
from pathlib import Path
from debug_toolbar.utils import QueryDict
from django.db.models.query import QuerySet
logger: logging.Logger = logging.getLogger("ttvdrops.views")
MIN_QUERY_LENGTH_FOR_FTS = 3
MIN_SEARCH_RANK = 0.05
def emote_gallery_view(request: HttpRequest) -> HttpResponse:
"""View to display all emote images (distribution_type='EMOTE'), clickable to their campaign.
Args:
request: The HTTP request.
Returns:
HttpResponse: The rendered emote gallery page.
"""
emote_benefits: QuerySet[DropBenefit, DropBenefit] = (
DropBenefit.objects
.filter(distribution_type="EMOTE")
.select_related()
.prefetch_related(
Prefetch(
"drops",
queryset=TimeBasedDrop.objects.select_related("campaign"),
to_attr="_emote_drops",
),
)
)
emotes: list[dict[str, str | DropCampaign]] = []
for benefit in emote_benefits:
# Find the first drop with a campaign for this benefit
drop: TimeBasedDrop | None = next((d for d in getattr(benefit, "_emote_drops", []) if d.campaign), None)
if drop and drop.campaign:
emotes.append({
"image_url": benefit.image_best_url,
"campaign": drop.campaign,
})
context: dict[str, list[dict[str, Any]]] = {"emotes": emotes}
return render(request, "twitch/emote_gallery.html", context)
# MARK: /search/
def search_view(request: HttpRequest) -> HttpResponse:
"""Search view for all models.
Args:
request: The HTTP request.
Returns:
HttpResponse: The rendered search results.
"""
query: str = request.GET.get("q", "")
results: dict[str, QuerySet] = {}
if query:
if len(query) < MIN_QUERY_LENGTH_FOR_FTS:
results["organizations"] = Organization.objects.filter(name__istartswith=query)
results["games"] = Game.objects.filter(Q(name__istartswith=query) | Q(display_name__istartswith=query))
results["campaigns"] = DropCampaign.objects.filter(
Q(name__istartswith=query) | Q(description__icontains=query),
).select_related("game")
results["drops"] = TimeBasedDrop.objects.filter(name__istartswith=query).select_related("campaign")
results["benefits"] = DropBenefit.objects.filter(name__istartswith=query).prefetch_related(
"drops__campaign",
)
results["reward_campaigns"] = RewardCampaign.objects.filter(
Q(name__istartswith=query) | Q(brand__istartswith=query) | Q(summary__icontains=query),
).select_related("game")
results["badge_sets"] = ChatBadgeSet.objects.filter(set_id__istartswith=query)
results["badges"] = ChatBadge.objects.filter(
Q(title__istartswith=query) | Q(description__icontains=query),
).select_related("badge_set")
else:
results["organizations"] = Organization.objects.filter(
name__icontains=query,
)
results["games"] = Game.objects.filter(
Q(name__icontains=query) | Q(display_name__icontains=query),
)
results["campaigns"] = DropCampaign.objects.filter(
Q(name__icontains=query) | Q(description__icontains=query),
).select_related("game")
results["drops"] = TimeBasedDrop.objects.filter(
name__icontains=query,
).select_related("campaign")
results["benefits"] = DropBenefit.objects.filter(
name__icontains=query,
).prefetch_related("drops__campaign")
results["reward_campaigns"] = RewardCampaign.objects.filter(
Q(name__icontains=query) | Q(brand__icontains=query) | Q(summary__icontains=query),
).select_related("game")
results["badge_sets"] = ChatBadgeSet.objects.filter(set_id__icontains=query)
results["badges"] = ChatBadge.objects.filter(
Q(title__icontains=query) | Q(description__icontains=query),
).select_related("badge_set")
return render(
request,
"twitch/search_results.html",
{"query": query, "results": results},
)
# MARK: /organizations/
def org_list_view(request: HttpRequest) -> HttpResponse:
"""Function-based view for organization list.
Args:
request: The HTTP request.
Returns:
HttpResponse: The rendered organization list page.
"""
orgs: QuerySet[Organization] = Organization.objects.all().order_by("name")
# Serialize all organizations
serialized_orgs: str = serialize(
"json",
orgs,
fields=(
"twitch_id",
"name",
"added_at",
"updated_at",
),
)
orgs_data: list[dict] = json.loads(serialized_orgs)
context: dict[str, Any] = {
"orgs": orgs,
"orgs_data": format_and_color_json(orgs_data),
}
return render(request, "twitch/org_list.html", context)
# MARK: /organizations/<twitch_id>/
def organization_detail_view(request: HttpRequest, twitch_id: str) -> HttpResponse:
"""Function-based view for organization detail.
Args:
request: The HTTP request.
twitch_id: The Twitch ID of the organization.
Returns:
HttpResponse: The rendered organization detail page.
Raises:
Http404: If the organization is not found.
"""
try:
organization: Organization = Organization.objects.get(twitch_id=twitch_id)
except Organization.DoesNotExist as exc:
msg = "No organization found matching the query"
raise Http404(msg) from exc
games: QuerySet[Game] = organization.games.all() # pyright: ignore[reportAttributeAccessIssue]
serialized_org: str = serialize(
"json",
[organization],
fields=(
"twitch_id",
"name",
"added_at",
"updated_at",
),
)
org_data: list[dict] = json.loads(serialized_org)
if games.exists():
serialized_games: str = serialize(
"json",
games,
fields=(
"twitch_id",
"slug",
"name",
"display_name",
"box_art",
"added_at",
"updated_at",
),
)
games_data: list[dict] = json.loads(serialized_games)
org_data[0]["fields"]["games"] = games_data
context: dict[str, Any] = {
"organization": organization,
"games": games,
"org_data": format_and_color_json(org_data[0]),
}
return render(request, "twitch/organization_detail.html", context)
# MARK: /campaigns/
def drop_campaign_list_view(request: HttpRequest) -> HttpResponse:
"""Function-based view for drop campaigns list.
Args:
request: The HTTP request.
Returns:
HttpResponse: The rendered campaign list page.
"""
game_filter: str | None = request.GET.get("game")
status_filter: str | None = request.GET.get("status")
per_page: int = 100
queryset: QuerySet[DropCampaign] = DropCampaign.objects.all()
if game_filter:
queryset = queryset.filter(game__twitch_id=game_filter)
queryset = queryset.prefetch_related("game__owners").order_by("-start_at")
# Optionally filter by status (active, upcoming, expired)
now: datetime.datetime = timezone.now()
if status_filter == "active":
queryset = queryset.filter(start_at__lte=now, end_at__gte=now)
elif status_filter == "upcoming":
queryset = queryset.filter(start_at__gt=now)
elif status_filter == "expired":
queryset = queryset.filter(end_at__lt=now)
paginator: Paginator[DropCampaign] = Paginator(queryset, per_page)
page: str | Literal[1] = request.GET.get("page") or 1
try:
campaigns: Page[DropCampaign] = paginator.page(page)
except PageNotAnInteger:
campaigns = paginator.page(1)
except EmptyPage:
campaigns = paginator.page(paginator.num_pages)
context: dict[str, Any] = {
"campaigns": campaigns,
"page_obj": campaigns,
"is_paginated": campaigns.has_other_pages(),
"games": Game.objects.all().order_by("display_name"),
"status_options": ["active", "upcoming", "expired"],
"now": now,
"selected_game": game_filter or "",
"selected_per_page": per_page,
"selected_status": status_filter or "",
}
return render(request, "twitch/campaign_list.html", context)
def format_and_color_json(data: dict[str, Any] | list[dict] | str) -> str:
"""Format and color a JSON string for HTML display.
Args:
data: Either a dictionary, list of dictionaries, or a JSON string to format.
Returns:
str: The formatted code with HTML styles.
"""
if isinstance(data, (dict, list)):
formatted_code: str = json.dumps(data, indent=4)
else:
formatted_code = data
return highlight(formatted_code, JsonLexer(), HtmlFormatter())
# MARK: /datasets/
def dataset_backups_view(request: HttpRequest) -> HttpResponse:
"""View to list database backup datasets on disk.
Args:
request: The HTTP request.
Returns:
HttpResponse: The rendered dataset backups page.
"""
datasets_root: Path = settings.DATA_DIR / "datasets"
search_dirs: list[Path] = [datasets_root]
seen_paths: set[str] = set()
datasets: list[dict[str, Any]] = []
for folder in search_dirs:
if not folder.exists() or not folder.is_dir():
continue
# Only include .zst files
for path in folder.glob("*.zst"):
if not path.is_file():
continue
key = str(path.resolve())
if key in seen_paths:
continue
seen_paths.add(key)
stat: stat_result = path.stat()
updated_at: datetime.datetime = datetime.datetime.fromtimestamp(
stat.st_mtime,
tz=timezone.get_current_timezone(),
)
try:
display_path = str(path.relative_to(datasets_root))
download_path: str | None = display_path
except ValueError:
display_path: str = path.name
download_path: str | None = None
datasets.append({
"name": path.name,
"display_path": display_path,
"download_path": download_path,
"size": filesizeformat(stat.st_size),
"updated_at": updated_at,
})
datasets.sort(key=operator.itemgetter("updated_at"), reverse=True)
context: dict[str, Any] = {
"datasets": datasets,
"data_dir": str(datasets_root),
"dataset_count": len(datasets),
}
return render(request, "twitch/dataset_backups.html", context)
def dataset_backup_download_view(request: HttpRequest, relative_path: str) -> FileResponse: # noqa: ARG001
"""Download a dataset backup from the data directory.
Args:
request: The HTTP request.
relative_path: The path relative to the data directory.
Returns:
FileResponse: The file response for the requested dataset.
Raises:
Http404: When the file is not found or is outside the data directory.
"""
allowed_endings = (".zst",)
datasets_root: Path = settings.DATA_DIR / "datasets"
requested_path: Path = (datasets_root / relative_path).resolve()
data_root: Path = datasets_root.resolve()
try:
requested_path.relative_to(data_root)
except ValueError as exc:
msg = "File not found"
raise Http404(msg) from exc
if not requested_path.exists() or not requested_path.is_file():
msg = "File not found"
raise Http404(msg)
if not requested_path.name.endswith(allowed_endings):
msg = "File not found"
raise Http404(msg)
return FileResponse(
requested_path.open("rb"),
as_attachment=True,
filename=requested_path.name,
)
def _enhance_drops_with_context(drops: QuerySet[TimeBasedDrop], now: datetime.datetime) -> list[dict[str, Any]]:
"""Helper to enhance drops with countdown and context.
Args:
drops: QuerySet of TimeBasedDrop objects.
now: Current datetime.
Returns:
List of dicts with drop, local_start, local_end, timezone_name, and countdown_text.
"""
enhanced: list[dict[str, Any]] = []
for drop in drops:
if drop.end_at and drop.end_at > now:
time_diff: datetime.timedelta = drop.end_at - now
days: int = time_diff.days
hours, remainder = divmod(time_diff.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
if days > 0:
countdown_text: str = f"{days}d {hours}h {minutes}m"
elif hours > 0:
countdown_text = f"{hours}h {minutes}m"
elif minutes > 0:
countdown_text = f"{minutes}m {seconds}s"
else:
countdown_text = f"{seconds}s"
elif drop.start_at and drop.start_at > now:
countdown_text = "Not started"
else:
countdown_text = "Expired"
enhanced.append({
"drop": drop,
"local_start": drop.start_at,
"local_end": drop.end_at,
"timezone_name": "UTC",
"countdown_text": countdown_text,
})
return enhanced
# MARK: /campaigns/<twitch_id>/
def drop_campaign_detail_view(request: HttpRequest, twitch_id: str) -> HttpResponse: # noqa: PLR0914
"""Function-based view for a drop campaign detail.
Args:
request: The HTTP request.
twitch_id: The Twitch ID of the campaign.
Returns:
HttpResponse: The rendered campaign detail page.
Raises:
Http404: If the campaign is not found.
"""
try:
campaign: DropCampaign = DropCampaign.objects.prefetch_related(
"game__owners",
Prefetch(
"allow_channels",
queryset=Channel.objects.order_by("display_name"),
to_attr="channels_ordered",
),
).get(
twitch_id=twitch_id,
)
except DropCampaign.DoesNotExist as exc:
msg = "No campaign found matching the query"
raise Http404(msg) from exc
drops: QuerySet[TimeBasedDrop] = (
TimeBasedDrop.objects
.filter(campaign=campaign)
.select_related("campaign")
.prefetch_related("benefits")
.order_by("required_minutes_watched")
)
serialized_campaign = serialize(
"json",
[campaign],
fields=(
"twitch_id",
"name",
"description",
"details_url",
"account_link_url",
"image_url",
"start_at",
"end_at",
"allow_is_enabled",
"operation_names",
"game",
"created_at",
"updated_at",
),
)
campaign_data = json.loads(serialized_campaign)
if drops.exists():
badge_benefit_names: set[str] = {
benefit.name
for drop in drops
for benefit in drop.benefits.all()
if benefit.distribution_type == "BADGE" and benefit.name
}
badge_descriptions_by_title: dict[str, str] = dict(
ChatBadge.objects.filter(title__in=badge_benefit_names).values_list("title", "description"),
)
serialized_drops = serialize(
"json",
drops,
fields=(
"twitch_id",
"name",
"required_minutes_watched",
"required_subs",
"start_at",
"end_at",
"added_at",
"updated_at",
),
)
drops_data: list[dict[str, Any]] = json.loads(serialized_drops)
for i, drop in enumerate(drops):
drop_benefits: list[DropBenefit] = list(drop.benefits.all())
if drop_benefits:
serialized_benefits = serialize(
"json",
drop_benefits,
fields=(
"twitch_id",
"name",
"image_asset_url",
"added_at",
"updated_at",
"created_at",
"entitlement_limit",
"is_ios_available",
"distribution_type",
),
)
benefits_data = json.loads(serialized_benefits)
for benefit_data in benefits_data:
fields: dict[str, Any] = benefit_data.get("fields", {})
if fields.get("distribution_type") != "BADGE":
continue
# DropBenefit doesn't have a description field; fetch it from ChatBadge when possible.
if fields.get("description"):
continue
badge_description: str | None = badge_descriptions_by_title.get(fields.get("name", ""))
if badge_description:
fields["description"] = badge_description
drops_data[i]["fields"]["benefits"] = benefits_data
campaign_data[0]["fields"]["drops"] = drops_data
now: datetime.datetime = timezone.now()
enhanced_drops: list[dict[str, Any]] = _enhance_drops_with_context(drops, now)
# Attach awarded_badge to each drop in enhanced_drops
for enhanced_drop in enhanced_drops:
drop = enhanced_drop["drop"]
awarded_badge = None
for benefit in drop.benefits.all():
if benefit.distribution_type == "BADGE":
awarded_badge: ChatBadge | None = ChatBadge.objects.filter(title=benefit.name).first()
break
enhanced_drop["awarded_badge"] = awarded_badge
context: dict[str, Any] = {
"campaign": campaign,
"now": now,
"drops": enhanced_drops,
"campaign_data": format_and_color_json(campaign_data[0]),
"owners": list(campaign.game.owners.all()),
"allowed_channels": getattr(campaign, "channels_ordered", []),
}
return render(request, "twitch/campaign_detail.html", context)
# MARK: /games/
class GamesGridView(ListView):
"""List view for games grouped by organization."""
model = Game
template_name = "twitch/games_grid.html"
context_object_name = "games"
def get_queryset(self) -> QuerySet[Game]:
"""Get queryset of all games, annotated with campaign counts.
Returns:
QuerySet: Annotated games queryset.
"""
now: datetime.datetime = timezone.now()
return (
super()
.get_queryset()
.prefetch_related("owners")
.annotate(
campaign_count=Count("drop_campaigns", distinct=True),
active_count=Count(
"drop_campaigns",
filter=Q(
drop_campaigns__start_at__lte=now,
drop_campaigns__end_at__gte=now,
),
distinct=True,
),
)
.order_by("display_name")
)
def get_context_data(self, **kwargs) -> dict[str, Any]:
"""Add additional context data.
Games are grouped by their owning organization.
Args:
**kwargs: Additional arguments.
Returns:
dict: Context data with games grouped by organization.
"""
context: dict[str, Any] = super().get_context_data(**kwargs)
now: datetime.datetime = timezone.now()
games_with_campaigns: QuerySet[Game] = (
Game.objects
.filter(drop_campaigns__isnull=False)
.prefetch_related("owners")
.annotate(
campaign_count=Count("drop_campaigns", distinct=True),
active_count=Count(
"drop_campaigns",
filter=Q(
drop_campaigns__start_at__lte=now,
drop_campaigns__end_at__gte=now,
),
distinct=True,
),
)
.order_by("display_name")
)
games_by_org: defaultdict[Organization, list[dict[str, Game]]] = defaultdict(list)
for game in games_with_campaigns:
for org in game.owners.all():
games_by_org[org].append({"game": game})
context["games_by_org"] = OrderedDict(
sorted(games_by_org.items(), key=lambda item: item[0].name),
)
return context
# MARK: /games/<twitch_id>/
class GameDetailView(DetailView):
"""Detail view for a game."""
model = Game
template_name = "twitch/game_detail.html"
context_object_name = "game"
lookup_field = "twitch_id"
def get_object(self, queryset: QuerySet[Game] | None = None) -> Game:
"""Get the game object using twitch_id as the primary key lookup.
Args:
queryset: Optional queryset to use.
Returns:
Game: The game object.
Raises:
Http404: If the game is not found.
"""
if queryset is None:
queryset = self.get_queryset()
# Use twitch_id as the lookup field since it's the primary key
twitch_id = self.kwargs.get("twitch_id")
try:
game = queryset.get(twitch_id=twitch_id)
except Game.DoesNotExist as exc:
msg = "No game found matching the query"
raise Http404(msg) from exc
return game
def get_context_data(self, **kwargs: object) -> dict[str, Any]:
"""Add additional context data.
Args:
**kwargs: Additional arguments.
Returns:
dict: Context data with active, upcoming, and expired
campaigns. Expired campaigns are filtered based on
either end date or status.
"""
context: dict[str, Any] = super().get_context_data(**kwargs)
game: Game = self.get_object() # pyright: ignore[reportAssignmentType]
now: datetime.datetime = timezone.now()
# For each drop, find awarded badge (distribution_type BADGE)
drop_awarded_badges: dict[str, ChatBadge] = {}
drops: QuerySet[TimeBasedDrop, TimeBasedDrop] = TimeBasedDrop.objects.filter(
campaign__game=game,
).prefetch_related("benefits")
for drop in drops:
for benefit in drop.benefits.all():
if benefit.distribution_type == "BADGE":
# Find badge by title
badge: ChatBadge | None = ChatBadge.objects.filter(title=benefit.name).first()
if badge:
drop_awarded_badges[drop.twitch_id] = badge
all_campaigns: QuerySet[DropCampaign] = (
DropCampaign.objects
.filter(game=game)
.prefetch_related("game__owners")
.prefetch_related(
Prefetch(
"time_based_drops",
queryset=TimeBasedDrop.objects.prefetch_related(
Prefetch(
"benefits",
queryset=DropBenefit.objects.order_by("name"),
),
),
),
)
.order_by("-end_at")
)
active_campaigns: list[DropCampaign] = [
campaign
for campaign in all_campaigns
if campaign.start_at is not None
and campaign.start_at <= now
and campaign.end_at is not None
and campaign.end_at >= now
]
active_campaigns.sort(
key=lambda c: c.end_at if c.end_at is not None else datetime.datetime.max.replace(tzinfo=datetime.UTC),
)
upcoming_campaigns: list[DropCampaign] = [
campaign for campaign in all_campaigns if campaign.start_at is not None and campaign.start_at > now
]
upcoming_campaigns.sort(
key=lambda c: c.start_at if c.start_at is not None else datetime.datetime.max.replace(tzinfo=datetime.UTC),
)
expired_campaigns: list[DropCampaign] = [
campaign for campaign in all_campaigns if campaign.end_at is not None and campaign.end_at < now
]
serialized_game: str = serialize(
"json",
[game],
fields=(
"twitch_id",
"slug",
"name",
"display_name",
"box_art",
"owner",
"added_at",
"updated_at",
),
)
game_data: list[dict[str, Any]] = json.loads(serialized_game)
if all_campaigns.exists():
serialized_campaigns = serialize(
"json",
all_campaigns,
fields=(
"twitch_id",
"name",
"description",
"details_url",
"account_link_url",
"image_url",
"start_at",
"end_at",
"allow_is_enabled",
"game",
"operation_names",
"added_at",
"updated_at",
),
)
campaigns_data: list[dict[str, Any]] = json.loads(
serialized_campaigns,
)
game_data[0]["fields"]["campaigns"] = campaigns_data
owners: list[Organization] = list(game.owners.all())
context.update(
{
"active_campaigns": active_campaigns,
"upcoming_campaigns": upcoming_campaigns,
"expired_campaigns": expired_campaigns,
"owner": owners[0] if owners else None,
"owners": owners,
"drop_awarded_badges": drop_awarded_badges,
"now": now,
"game_data": format_and_color_json(game_data[0]),
},
)
return context
# MARK: /
def dashboard(request: HttpRequest) -> HttpResponse:
"""Dashboard view showing active campaigns and progress.
Args:
request: The HTTP request.
Returns:
HttpResponse: The rendered dashboard template.
"""
now: datetime.datetime = timezone.now()
active_campaigns: QuerySet[DropCampaign] = (
DropCampaign.objects
.filter(start_at__lte=now, end_at__gte=now)
.select_related("game")
.prefetch_related("game__owners")
.prefetch_related(
Prefetch(
"allow_channels",
queryset=Channel.objects.order_by("display_name"),
to_attr="channels_ordered",
),
)
.order_by("-start_at")
)
# Preserve insertion order (newest campaigns first). Group by game so games with multiple owners
# don't render duplicate campaign cards.
campaigns_by_game: OrderedDict[str, dict[str, Any]] = OrderedDict()
for campaign in active_campaigns:
game: Game = campaign.game
game_id: str = game.twitch_id
if game_id not in campaigns_by_game:
campaigns_by_game[game_id] = {
"name": game.display_name,
"box_art": game.box_art_best_url,
"owners": list(game.owners.all()),
"campaigns": [],
}
campaigns_by_game[game_id]["campaigns"].append({
"campaign": campaign,
"allowed_channels": getattr(campaign, "channels_ordered", []),
})
# Get active reward campaigns (Quest rewards)
active_reward_campaigns: QuerySet[RewardCampaign] = (
RewardCampaign.objects
.filter(starts_at__lte=now, ends_at__gte=now)
.select_related("game")
.order_by("-starts_at")
)
return render(
request,
"twitch/dashboard.html",
{
"active_campaigns": active_campaigns,
"campaigns_by_game": campaigns_by_game,
"active_reward_campaigns": active_reward_campaigns,
"now": now,
},
)
# MARK: /reward-campaigns/
def reward_campaign_list_view(request: HttpRequest) -> HttpResponse:
"""Function-based view for reward campaigns list.
Args:
request: The HTTP request.
Returns:
HttpResponse: The rendered reward campaigns list page.
"""
game_filter: str | None = request.GET.get("game")
status_filter: str | None = request.GET.get("status")
per_page: int = 100
queryset: QuerySet[RewardCampaign] = RewardCampaign.objects.all()
if game_filter:
queryset = queryset.filter(game__twitch_id=game_filter)
queryset = queryset.select_related("game").order_by("-starts_at")
# Optionally filter by status (active, upcoming, expired)
now = timezone.now()
if status_filter == "active":
queryset = queryset.filter(starts_at__lte=now, ends_at__gte=now)
elif status_filter == "upcoming":
queryset = queryset.filter(starts_at__gt=now)
elif status_filter == "expired":
queryset = queryset.filter(ends_at__lt=now)
paginator: Paginator[RewardCampaign] = Paginator(queryset, per_page)
page: str | Literal[1] = request.GET.get("page") or 1
try:
reward_campaigns: Page[RewardCampaign] = paginator.page(page)
except PageNotAnInteger:
reward_campaigns = paginator.page(1)
except EmptyPage:
reward_campaigns = paginator.page(paginator.num_pages)
context: dict[str, Any] = {
"reward_campaigns": reward_campaigns,
"games": Game.objects.all().order_by("display_name"),
"status_options": ["active", "upcoming", "expired"],
"now": now,
"selected_game": game_filter or "",
"selected_per_page": per_page,
"selected_status": status_filter or "",
}
return render(request, "twitch/reward_campaign_list.html", context)
# MARK: /reward-campaigns/<twitch_id>/
def reward_campaign_detail_view(request: HttpRequest, twitch_id: str) -> HttpResponse:
"""Function-based view for a reward campaign detail.
Args:
request: The HTTP request.
twitch_id: The Twitch ID of the reward campaign.
Returns:
HttpResponse: The rendered reward campaign detail page.
Raises:
Http404: If the reward campaign is not found.
"""
try:
reward_campaign: RewardCampaign = RewardCampaign.objects.select_related("game").get(
twitch_id=twitch_id,
)
except RewardCampaign.DoesNotExist as exc:
msg = "No reward campaign found matching the query"
raise Http404(msg) from exc
serialized_campaign = serialize(
"json",
[reward_campaign],
fields=(
"twitch_id",
"name",
"brand",
"summary",
"instructions",
"external_url",
"about_url",
"reward_value_url_param",
"starts_at",
"ends_at",
"is_sitewide",
"game",
"added_at",
"updated_at",
),
)
campaign_data: list[dict[str, Any]] = json.loads(serialized_campaign)
now: datetime.datetime = timezone.now()
context: dict[str, Any] = {
"reward_campaign": reward_campaign,
"now": now,
"campaign_data": format_and_color_json(campaign_data[0]),
"is_active": reward_campaign.is_active,
}
return render(request, "twitch/reward_campaign_detail.html", context)
# MARK: /debug/
def debug_view(request: HttpRequest) -> HttpResponse:
"""Debug view showing potentially broken or inconsistent data.
Returns:
HttpResponse: Rendered debug template or redirect if unauthorized.
"""
now: datetime.datetime = timezone.now()
# Games with no assigned owner organization
games_without_owner: QuerySet[Game] = Game.objects.filter(
owners__isnull=True,
).order_by("display_name")
# Campaigns with missing or obviously broken images
broken_image_campaigns: QuerySet[DropCampaign] = DropCampaign.objects.filter(
Q(image_url__isnull=True) | Q(image_url__exact="") | ~Q(image_url__startswith="http"),
).select_related("game")
# Benefits with missing images
broken_benefit_images: QuerySet[DropBenefit] = DropBenefit.objects.annotate(
trimmed_url=Trim("image_asset_url"),
).filter(
Q(image_asset_url__isnull=True) | Q(trimmed_url__exact="") | ~Q(image_asset_url__startswith="http"),
)
# Time-based drops without any benefits
drops_without_benefits: QuerySet[TimeBasedDrop] = TimeBasedDrop.objects.filter(
benefits__isnull=True,
).select_related(
"campaign__game",
)
# Campaigns with invalid dates (start after end or missing either)
invalid_date_campaigns: QuerySet[DropCampaign] = DropCampaign.objects.filter(
Q(start_at__gt=F("end_at")) | Q(start_at__isnull=True) | Q(end_at__isnull=True),
).select_related("game")
# Duplicate campaign names per game.
# We retrieve the game's name for user-friendly display.
duplicate_name_campaigns = (
DropCampaign.objects
.values("game__display_name", "name", "game__twitch_id")
.annotate(name_count=Count("twitch_id"))
.filter(name_count__gt=1)
.order_by("game__display_name", "name")
)
# Campaigns currently active but image missing
active_missing_image: QuerySet[DropCampaign] = (
DropCampaign.objects
.filter(start_at__lte=now, end_at__gte=now)
.filter(
Q(image_url__isnull=True) | Q(image_url__exact="") | ~Q(image_url__startswith="http"),
)
.select_related("game")
)
# Distinct GraphQL operation names used to fetch campaigns with counts
# Since operation_names is now a JSON list field, we need to flatten and count
operation_names_counter: dict[str, int] = {}
for campaign in DropCampaign.objects.only("operation_names"):
for op_name in campaign.operation_names:
if op_name and op_name.strip():
operation_names_counter[op_name.strip()] = operation_names_counter.get(op_name.strip(), 0) + 1
operation_names_with_counts: list[dict[str, Any]] = [
{"trimmed_op": op_name, "count": count} for op_name, count in sorted(operation_names_counter.items())
]
campaigns_missing_dropcampaigndetails: QuerySet[DropCampaign] = (
DropCampaign.objects
.filter(
Q(operation_names__isnull=True) | ~Q(operation_names__icontains="DropCampaignDetails"),
)
.select_related("game")
.order_by("game__display_name", "name")
)
context: dict[str, Any] = {
"now": now,
"games_without_owner": games_without_owner,
"broken_image_campaigns": broken_image_campaigns,
"broken_benefit_images": broken_benefit_images,
"drops_without_benefits": drops_without_benefits,
"invalid_date_campaigns": invalid_date_campaigns,
"duplicate_name_campaigns": duplicate_name_campaigns,
"active_missing_image": active_missing_image,
"operation_names_with_counts": operation_names_with_counts,
"campaigns_missing_dropcampaigndetails": campaigns_missing_dropcampaigndetails,
}
return render(
request,
"twitch/debug.html",
context,
)
# MARK: /games/list/
class GamesListView(GamesGridView):
"""List view for games in simple list format."""
template_name = "twitch/games_list.html"
# MARK: /docs/rss/
def docs_rss_view(request: HttpRequest) -> HttpResponse:
"""View for /docs/rss that lists all available RSS feeds.
Args:
request: The HTTP request object.
Returns:
Rendered HTML response with list of RSS feeds.
"""
def absolute(path: str) -> str:
try:
return request.build_absolute_uri(path)
except Exception: # pragma: no cover - defensive logging for docs only
logger.exception("Failed to build absolute URL for %s", path)
return path
def _pretty_example(xml_str: str, max_items: int = 1) -> str:
try:
trimmed = xml_str.strip()
first_item = trimmed.find("<item")
if first_item != -1 and max_items == 1:
second_item = trimmed.find("<item", first_item + 5)
if second_item != -1:
end_channel = trimmed.find("</channel>", second_item)
if end_channel != -1:
trimmed = trimmed[:second_item] + trimmed[end_channel:]
formatted = trimmed.replace("><", ">\n<")
return "\n".join(line for line in formatted.splitlines() if line.strip())
except Exception: # pragma: no cover - defensive formatting for docs only
logger.exception("Failed to pretty-print RSS example")
return xml_str
def render_feed(feed_view: Callable[..., HttpResponse], *args: object) -> str:
try:
limited_request: HttpRequest = copy(request)
# Add limit=1 to GET parameters
get_data: QueryDict = request.GET.copy()
get_data["limit"] = "1"
limited_request.GET = get_data # pyright: ignore[reportAttributeAccessIssue]
response: HttpResponse = feed_view(limited_request, *args)
return _pretty_example(response.content.decode("utf-8"))
except Exception: # pragma: no cover - defensive logging for docs only
logger.exception("Failed to render %s for RSS docs", feed_view.__class__.__name__)
return ""
feeds: list[dict[str, str]] = [
{
"title": "All Organizations",
"description": "Latest organizations added to TTVDrops",
"url": absolute(reverse("twitch:organization_feed")),
"example_xml": render_feed(OrganizationRSSFeed()),
},
{
"title": "All Games",
"description": "Latest games added to TTVDrops",
"url": absolute(reverse("twitch:game_feed")),
"example_xml": render_feed(GameFeed()),
},
{
"title": "All Drop Campaigns",
"description": "Latest drop campaigns across all games",
"url": absolute(reverse("twitch:campaign_feed")),
"example_xml": render_feed(DropCampaignFeed()),
},
{
"title": "All Reward Campaigns",
"description": "Latest reward campaigns (Quest rewards) on Twitch",
"url": absolute(reverse("twitch:reward_campaign_feed")),
"example_xml": render_feed(RewardCampaignFeed()),
},
]
sample_game: Game | None = Game.objects.order_by("-added_at").first()
sample_org: Organization | None = Organization.objects.order_by("-added_at").first()
if sample_org is None and sample_game is not None:
sample_org = sample_game.owners.order_by("-pk").first()
filtered_feeds: list[dict[str, str | bool]] = [
{
"title": "Campaigns for a Single Game",
"description": "Latest drop campaigns for one game.",
"url": (
absolute(reverse("twitch:game_campaign_feed", args=[sample_game.twitch_id]))
if sample_game
else absolute("/rss/games/<game_id>/campaigns/")
),
"has_sample": bool(sample_game),
"example_xml": render_feed(GameCampaignFeed(), sample_game.twitch_id) if sample_game else "",
},
{
"title": "Campaigns for an Organization",
"description": "Drop campaigns across games owned by one organization.",
"url": (
absolute(reverse("twitch:organization_campaign_feed", args=[sample_org.twitch_id]))
if sample_org
else absolute("/rss/organizations/<org_id>/campaigns/")
),
"has_sample": bool(sample_org),
"example_xml": render_feed(OrganizationCampaignFeed(), sample_org.twitch_id) if sample_org else "",
},
]
return render(
request,
"twitch/docs_rss.html",
{
"feeds": feeds,
"filtered_feeds": filtered_feeds,
"sample_game": sample_game,
"sample_org": sample_org,
},
)
# MARK: /channels/
class ChannelListView(ListView):
"""List view for channels."""
model = Channel
template_name = "twitch/channel_list.html"
context_object_name = "channels"
paginate_by = 200
def get_queryset(self) -> QuerySet[Channel]:
"""Get queryset of channels.
Returns:
QuerySet: Filtered channels.
"""
queryset: QuerySet[Channel] = super().get_queryset()
search_query: str | None = self.request.GET.get("search")
if search_query:
queryset = queryset.filter(Q(name__icontains=search_query) | Q(display_name__icontains=search_query))
campaign_count_subquery: QuerySet[DropCampaign, DropCampaign] = (
DropCampaign.allow_channels.through.objects
.filter(channel_id=OuterRef("pk"))
.values("channel_id")
.annotate(count=Count("id"))
.values("count")
)
return queryset.annotate(campaign_count=Subquery(campaign_count_subquery)).order_by("-campaign_count", "name")
def get_context_data(self, **kwargs) -> dict[str, Any]:
"""Add additional context data.
Args:
**kwargs: Additional arguments.
Returns:
dict: Context data.
"""
context: dict[str, Any] = super().get_context_data(**kwargs)
context["search_query"] = self.request.GET.get("search", "")
return context
# MARK: /channels/<twitch_id>/
class ChannelDetailView(DetailView):
"""Detail view for a channel."""
model = Channel
template_name = "twitch/channel_detail.html"
context_object_name = "channel"
lookup_field = "twitch_id"
def get_object(self, queryset: QuerySet[Channel] | None = None) -> Channel:
"""Get the channel object using twitch_id as the primary key lookup.
Args:
queryset: Optional queryset to use.
Returns:
Channel: The channel object.
Raises:
Http404: If the channel is not found.
"""
if queryset is None:
queryset = self.get_queryset()
twitch_id = self.kwargs.get("twitch_id")
try:
channel = queryset.get(twitch_id=twitch_id)
except Channel.DoesNotExist as exc:
msg = "No channel found matching the query"
raise Http404(msg) from exc
return channel
def get_context_data(self, **kwargs: object) -> dict[str, Any]:
"""Add additional context data.
Args:
**kwargs: Additional arguments.
Returns:
dict: Context data with active, upcoming, and expired campaigns.
"""
context: dict[str, Any] = super().get_context_data(**kwargs)
channel: Channel = self.get_object() # pyright: ignore[reportAssignmentType]
now: datetime.datetime = timezone.now()
all_campaigns: QuerySet[DropCampaign] = (
DropCampaign.objects
.filter(allow_channels=channel)
.prefetch_related("game__owners")
.prefetch_related(
Prefetch(
"time_based_drops",
queryset=TimeBasedDrop.objects.prefetch_related(
Prefetch(
"benefits",
queryset=DropBenefit.objects.order_by("name"),
),
),
),
)
.order_by("-start_at")
)
active_campaigns: list[DropCampaign] = [
campaign
for campaign in all_campaigns
if campaign.start_at is not None
and campaign.start_at <= now
and campaign.end_at is not None
and campaign.end_at >= now
]
active_campaigns.sort(
key=lambda c: c.end_at if c.end_at is not None else datetime.datetime.max.replace(tzinfo=datetime.UTC),
)
upcoming_campaigns: list[DropCampaign] = [
campaign for campaign in all_campaigns if campaign.start_at is not None and campaign.start_at > now
]
upcoming_campaigns.sort(
key=lambda c: c.start_at if c.start_at is not None else datetime.datetime.max.replace(tzinfo=datetime.UTC),
)
expired_campaigns: list[DropCampaign] = [
campaign for campaign in all_campaigns if campaign.end_at is not None and campaign.end_at < now
]
serialized_channel = serialize(
"json",
[channel],
fields=(
"twitch_id",
"name",
"display_name",
"added_at",
"updated_at",
),
)
channel_data = json.loads(serialized_channel)
if all_campaigns.exists():
serialized_campaigns = serialize(
"json",
all_campaigns,
fields=(
"twitch_id",
"name",
"description",
"details_url",
"account_link_url",
"image_url",
"start_at",
"end_at",
"added_at",
"updated_at",
),
)
campaigns_data = json.loads(serialized_campaigns)
channel_data[0]["fields"]["campaigns"] = campaigns_data
context.update(
{
"active_campaigns": active_campaigns,
"upcoming_campaigns": upcoming_campaigns,
"expired_campaigns": expired_campaigns,
"now": now,
"channel_data": format_and_color_json(channel_data[0]),
},
)
return context
# MARK: /badges/
def badge_list_view(request: HttpRequest) -> HttpResponse:
"""List view for chat badge sets.
Args:
request: The HTTP request.
Returns:
HttpResponse: The rendered badge list page.
"""
badge_sets: QuerySet[ChatBadgeSet] = (
ChatBadgeSet.objects
.all()
.prefetch_related(
Prefetch(
"badges",
queryset=ChatBadge.objects.order_by("badge_id"),
),
)
.order_by("set_id")
)
# Group badges by set for easier display
badge_data: list[dict[str, Any]] = [
{
"set": badge_set,
"badges": list(badge_set.badges.all()), # pyright: ignore[reportAttributeAccessIssue]
}
for badge_set in badge_sets
]
context: dict[str, Any] = {
"badge_sets": badge_sets,
"badge_data": badge_data,
}
return render(request, "twitch/badge_list.html", context)
# MARK: /badges/<set_id>/
def badge_set_detail_view(request: HttpRequest, set_id: str) -> HttpResponse:
"""Detail view for a specific badge set.
Args:
request: The HTTP request.
set_id: The ID of the badge set.
Returns:
HttpResponse: The rendered badge set detail page.
Raises:
Http404: If the badge set is not found.
"""
try:
badge_set: ChatBadgeSet = ChatBadgeSet.objects.prefetch_related(
Prefetch(
"badges",
queryset=ChatBadge.objects.order_by("badge_id"),
),
).get(set_id=set_id)
except ChatBadgeSet.DoesNotExist as exc:
msg = "No badge set found matching the query"
raise Http404(msg) from exc
badges: QuerySet[ChatBadge] = badge_set.badges.all() # pyright: ignore[reportAttributeAccessIssue]
# Attach award_campaigns attribute to each badge for template use
for badge in badges:
benefits: QuerySet[DropBenefit, DropBenefit] = DropBenefit.objects.filter(
distribution_type="BADGE",
name=badge.title,
)
campaigns: QuerySet[DropCampaign, DropCampaign] = DropCampaign.objects.filter(
time_based_drops__benefits__in=benefits,
).distinct()
badge.award_campaigns = list(campaigns) # pyright: ignore[reportAttributeAccessIssue]
# Serialize for JSON display
serialized_set = serialize(
"json",
[badge_set],
fields=(
"set_id",
"added_at",
"updated_at",
),
)
set_data: list[dict[str, Any]] = json.loads(serialized_set)
if badges.exists():
serialized_badges = serialize(
"json",
badges,
fields=(
"badge_id",
"image_url_1x",
"image_url_2x",
"image_url_4x",
"title",
"description",
"click_action",
"click_url",
"added_at",
"updated_at",
),
)
badges_data: list[dict[str, Any]] = json.loads(serialized_badges)
set_data[0]["fields"]["badges"] = badges_data
context: dict[str, Any] = {
"badge_set": badge_set,
"badges": badges,
"set_data": format_and_color_json(set_data[0]),
}
return render(request, "twitch/badge_set_detail.html", context)
# MARK: Export Views
def export_campaigns_csv(request: HttpRequest) -> HttpResponse:
"""Export drop campaigns to CSV format.
Args:
request: The HTTP request.
Returns:
HttpResponse: CSV file response.
"""
# Get filters from query parameters
game_filter: str | None = request.GET.get("game")
status_filter: str | None = request.GET.get("status")
queryset: QuerySet[DropCampaign] = DropCampaign.objects.all()
if game_filter:
queryset = queryset.filter(game__twitch_id=game_filter)
queryset = queryset.prefetch_related("game__owners").order_by("-start_at")
now: datetime.datetime = timezone.now()
if status_filter == "active":
queryset = queryset.filter(start_at__lte=now, end_at__gte=now)
elif status_filter == "upcoming":
queryset = queryset.filter(start_at__gt=now)
elif status_filter == "expired":
queryset = queryset.filter(end_at__lt=now)
# Create CSV response
response = HttpResponse(content_type="text/csv")
response["Content-Disposition"] = "attachment; filename=campaigns.csv"
writer = csv.writer(response)
writer.writerow([
"Twitch ID",
"Name",
"Description",
"Game",
"Status",
"Start Date",
"End Date",
"Details URL",
"Created At",
"Updated At",
])
for campaign in queryset:
# Determine campaign status
if campaign.start_at and campaign.end_at:
if campaign.start_at <= now <= campaign.end_at:
status = "Active"
elif campaign.start_at > now:
status = "Upcoming"
else:
status = "Expired"
else:
status = "Unknown"
writer.writerow([
campaign.twitch_id,
campaign.name,
campaign.description[:100] if campaign.description else "", # Truncate for CSV
campaign.game.name if campaign.game else "",
status,
campaign.start_at.isoformat() if campaign.start_at else "",
campaign.end_at.isoformat() if campaign.end_at else "",
campaign.details_url,
campaign.added_at.isoformat() if campaign.added_at else "",
campaign.updated_at.isoformat() if campaign.updated_at else "",
])
return response
def export_campaigns_json(request: HttpRequest) -> HttpResponse:
"""Export drop campaigns to JSON format.
Args:
request: The HTTP request.
Returns:
HttpResponse: JSON file response.
"""
# Get filters from query parameters
game_filter: str | None = request.GET.get("game")
status_filter: str | None = request.GET.get("status")
queryset: QuerySet[DropCampaign] = DropCampaign.objects.all()
if game_filter:
queryset = queryset.filter(game__twitch_id=game_filter)
queryset = queryset.prefetch_related("game__owners").order_by("-start_at")
now = timezone.now()
if status_filter == "active":
queryset = queryset.filter(start_at__lte=now, end_at__gte=now)
elif status_filter == "upcoming":
queryset = queryset.filter(start_at__gt=now)
elif status_filter == "expired":
queryset = queryset.filter(end_at__lt=now)
# Build data list
campaigns_data: list[dict[str, Any]] = []
for campaign in queryset:
# Determine campaign status
if campaign.start_at and campaign.end_at:
if campaign.start_at <= now <= campaign.end_at:
status = "Active"
elif campaign.start_at > now:
status = "Upcoming"
else:
status = "Expired"
else:
status = "Unknown"
campaigns_data.append({
"twitch_id": campaign.twitch_id,
"name": campaign.name,
"description": campaign.description,
"game": campaign.game.name if campaign.game else None,
"game_twitch_id": campaign.game.twitch_id if campaign.game else None,
"status": status,
"start_at": campaign.start_at.isoformat() if campaign.start_at else None,
"end_at": campaign.end_at.isoformat() if campaign.end_at else None,
"details_url": campaign.details_url,
"account_link_url": campaign.account_link_url,
"added_at": campaign.added_at.isoformat() if campaign.added_at else None,
"updated_at": campaign.updated_at.isoformat() if campaign.updated_at else None,
})
# Create JSON response
response = HttpResponse(
json.dumps(campaigns_data, indent=2),
content_type="application/json",
)
response["Content-Disposition"] = "attachment; filename=campaigns.json"
return response
def export_games_csv(request: HttpRequest) -> HttpResponse: # noqa: ARG001 # noqa: ARG001
"""Export games to CSV format.
Args:
request: The HTTP request.
Returns:
HttpResponse: CSV file response.
"""
queryset: QuerySet[Game] = Game.objects.all().order_by("display_name")
# Create CSV response
response = HttpResponse(content_type="text/csv")
response["Content-Disposition"] = "attachment; filename=games.csv"
writer = csv.writer(response)
writer.writerow([
"Twitch ID",
"Name",
"Display Name",
"Slug",
"Box Art URL",
"Added At",
"Updated At",
])
for game in queryset:
writer.writerow([
game.twitch_id,
game.name,
game.display_name,
game.slug,
game.box_art,
game.added_at.isoformat() if game.added_at else "",
game.updated_at.isoformat() if game.updated_at else "",
])
return response
def export_games_json(request: HttpRequest) -> HttpResponse: # noqa: ARG001 # noqa: ARG001
"""Export games to JSON format.
Args:
request: The HTTP request.
Returns:
HttpResponse: JSON file response.
"""
queryset: QuerySet[Game] = Game.objects.all().order_by("display_name")
# Build data list
games_data: list[dict[str, Any]] = [
{
"twitch_id": game.twitch_id,
"name": game.name,
"display_name": game.display_name,
"slug": game.slug,
"box_art_url": game.box_art,
"added_at": game.added_at.isoformat() if game.added_at else None,
"updated_at": game.updated_at.isoformat() if game.updated_at else None,
}
for game in queryset
]
# Create JSON response
response = HttpResponse(
json.dumps(games_data, indent=2),
content_type="application/json",
)
response["Content-Disposition"] = "attachment; filename=games.json"
return response
def export_organizations_csv(request: HttpRequest) -> HttpResponse: # noqa: ARG001
"""Export organizations to CSV format.
Args:
request: The HTTP request.
Returns:
HttpResponse: CSV file response.
"""
queryset: QuerySet[Organization] = Organization.objects.all().order_by("name")
# Create CSV response
response = HttpResponse(content_type="text/csv")
response["Content-Disposition"] = "attachment; filename=organizations.csv"
writer = csv.writer(response)
writer.writerow([
"Twitch ID",
"Name",
"Added At",
"Updated At",
])
for org in queryset:
writer.writerow([
org.twitch_id,
org.name,
org.added_at.isoformat() if org.added_at else "",
org.updated_at.isoformat() if org.updated_at else "",
])
return response
def export_organizations_json(request: HttpRequest) -> HttpResponse: # noqa: ARG001
"""Export organizations to JSON format.
Args:
request: The HTTP request.
Returns:
HttpResponse: JSON file response.
"""
queryset: QuerySet[Organization] = Organization.objects.all().order_by("name")
# Build data list
orgs_data: list[dict[str, Any]] = [
{
"twitch_id": org.twitch_id,
"name": org.name,
"added_at": org.added_at.isoformat() if org.added_at else None,
"updated_at": org.updated_at.isoformat() if org.updated_at else None,
}
for org in queryset
]
# Create JSON response
response = HttpResponse(
json.dumps(orgs_data, indent=2),
content_type="application/json",
)
response["Content-Disposition"] = "attachment; filename=organizations.json"
return response