Refactor views

This commit is contained in:
Joakim Hellsén 2025-08-01 20:24:44 +02:00
commit 245896aa06

View file

@ -1,7 +1,8 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
from typing import TYPE_CHECKING, Any from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, cast
from django.db.models import Count, Prefetch, Q from django.db.models import Count, Prefetch, Q
from django.db.models.query import QuerySet from django.db.models.query import QuerySet
@ -12,6 +13,8 @@ from django.views.generic import DetailView, ListView
from twitch.models import DropCampaign, Game, Organization, TimeBasedDrop from twitch.models import DropCampaign, Game, Organization, TimeBasedDrop
if TYPE_CHECKING: if TYPE_CHECKING:
import datetime
from django.db.models import QuerySet from django.db.models import QuerySet
from django.http import HttpRequest, HttpResponse from django.http import HttpRequest, HttpResponse
@ -24,24 +27,7 @@ class DropCampaignListView(ListView):
model = DropCampaign model = DropCampaign
template_name = "twitch/campaign_list.html" template_name = "twitch/campaign_list.html"
context_object_name = "campaigns" context_object_name = "campaigns"
paginate_by = 96 # Default pagination size paginate_by = 100
def get_paginate_by(self, queryset) -> int: # noqa: ANN001, ARG002
"""Get the pagination size, allowing override via URL parameter.
Args:
queryset: The queryset being paginated.
Returns:
int: Number of items per page.
"""
per_page: str | None = self.request.GET.get("per_page")
if per_page and per_page.isdigit():
per_page_int = int(per_page)
# Limit to reasonable values to prevent performance issues
if per_page_int in {12, 24, 48, 96}:
return per_page_int
return self.paginate_by
def get_queryset(self) -> QuerySet[DropCampaign]: def get_queryset(self) -> QuerySet[DropCampaign]:
"""Get queryset of drop campaigns. """Get queryset of drop campaigns.
@ -55,10 +41,9 @@ class DropCampaignListView(ListView):
if game_filter: if game_filter:
queryset = queryset.filter(game__id=game_filter) queryset = queryset.filter(game__id=game_filter)
# Prefetch related objects to reduce queries
return queryset.select_related("game", "owner").order_by("-start_at") return queryset.select_related("game", "owner").order_by("-start_at")
def get_context_data(self, **kwargs: object) -> dict[str, Any]: def get_context_data(self, **kwargs) -> dict[str, Any]:
"""Add additional context data. """Add additional context data.
Args: Args:
@ -67,29 +52,15 @@ class DropCampaignListView(ListView):
Returns: Returns:
dict: Context data. dict: Context data.
""" """
context = super().get_context_data(**kwargs) kwargs = cast("dict[str, Any]", kwargs)
context: dict[str, datetime.datetime | str | int | QuerySet[Game, Game] | None] = super().get_context_data(**kwargs)
# Load all games in a single query instead of multiple queries per game
context["games"] = Game.objects.all().order_by("display_name") context["games"] = Game.objects.all().order_by("display_name")
# Add status options for filtering
context["status_options"] = []
# Add selected filters
context["selected_status"] = self.request.GET.get("status", "")
context["selected_game"] = self.request.GET.get("game", "")
# Add per_page options and current selection
context["per_page_options"] = [96, 192, 384, 768]
per_page: str = self.request.GET.get("per_page", str(self.paginate_by))
if per_page.isdigit():
per_page_int = int(per_page)
context["selected_per_page"] = per_page_int
else:
logger.warning("Invalid per_page value: %s. Using default %d.", per_page, self.paginate_by)
context["selected_per_page"] = self.paginate_by
# Current time for active campaign highlighting
context["now"] = timezone.now() context["now"] = timezone.now()
context["selected_game"] = str(self.request.GET.get(key="game", default=""))
context["selected_per_page"] = self.paginate_by
context["selected_status"] = self.request.GET.get(key="status", default="")
return context return context
@ -110,16 +81,12 @@ class DropCampaignDetailView(DetailView):
Returns: Returns:
DropCampaign: The campaign object with prefetched relations. DropCampaign: The campaign object with prefetched relations.
""" """
# Prefetch all needed related objects in a single query
if queryset is None: if queryset is None:
queryset = self.get_queryset() queryset = self.get_queryset()
queryset = queryset.select_related("game", "owner") queryset = queryset.select_related("game", "owner")
# We don't need to prefetch time_based_drops here since we're fetching them separately in get_context_data return super().get_object(queryset=queryset)
# with proper ordering and prefetching of benefits
return super().get_object(queryset=queryset) # type: ignore[return-value]
def get_context_data(self, **kwargs: object) -> dict[str, Any]: def get_context_data(self, **kwargs: object) -> dict[str, Any]:
"""Add additional context data. """Add additional context data.
@ -130,10 +97,10 @@ class DropCampaignDetailView(DetailView):
Returns: Returns:
dict: Context data. dict: Context data.
""" """
context = super().get_context_data(**kwargs) context: dict[str, Any] = super().get_context_data(**kwargs)
campaign = context["campaign"] # Get the campaign from context campaign = context["campaign"]
# Add drops for this campaign with benefits preloaded in a single efficient query context["now"] = timezone.now()
context["drops"] = ( context["drops"] = (
TimeBasedDrop.objects.filter(campaign=campaign) TimeBasedDrop.objects.filter(campaign=campaign)
.select_related("campaign") .select_related("campaign")
@ -141,9 +108,6 @@ class DropCampaignDetailView(DetailView):
.order_by("required_minutes_watched") .order_by("required_minutes_watched")
) )
# Current time for active campaign highlighting
context["now"] = timezone.now()
return context return context
@ -178,7 +142,7 @@ class GameListView(ListView):
.order_by("display_name") .order_by("display_name")
) )
def get_context_data(self, **kwargs: object) -> dict[str, Any]: def get_context_data(self, **kwargs) -> dict[str, Any]:
"""Add additional context data with games grouped by organization. """Add additional context data with games grouped by organization.
Args: Args:
@ -187,40 +151,41 @@ class GameListView(ListView):
Returns: Returns:
dict: Context data with games grouped by organization. dict: Context data with games grouped by organization.
""" """
context = super().get_context_data(**kwargs)
# Create a dictionary to hold games by organization @dataclass(frozen=True)
games_by_org = {} class OrganizationData:
now = timezone.now() id: str
name: str
# Step 1: Get all organizations with games in a single query context: dict[str, Any] = super().get_context_data(**kwargs)
# We'll prefetch the games related to each organization through drop_campaigns
organizations_with_games = Organization.objects.filter(drop_campaigns__isnull=False).distinct().order_by("name")
# Step 2: Get all game-organization relationships in a single efficient query games_by_org: dict[OrganizationData, list[dict[str, Game | dict[str, int]]]] = {}
# This query gets all games with their campaign counts and organization info now: datetime.datetime = timezone.now()
game_org_relations = DropCampaign.objects.values("game_id", "owner_id", "owner__name").annotate(
organizations_with_games: QuerySet[Organization, Organization] = (
Organization.objects.filter(drop_campaigns__isnull=False).distinct().order_by("name")
)
game_org_relations: QuerySet[DropCampaign, dict[str, Any]] = DropCampaign.objects.values(
"game_id", "owner_id", "owner__name"
).annotate(
campaign_count=Count("id", distinct=True), campaign_count=Count("id", distinct=True),
active_count=Count("id", filter=Q(start_at__lte=now, end_at__gte=now), distinct=True), active_count=Count("id", filter=Q(start_at__lte=now, end_at__gte=now), distinct=True),
) )
# Step 3: Get all games in a single query with their display names all_games: dict[str, Game] = {game.id: game for game in Game.objects.all()}
all_games = {game.id: game for game in Game.objects.all()} org_names: dict[str, str] = {org.id: org.name for org in organizations_with_games}
# Step 4: Create a mapping of organization_id to organization_name game_org_map: dict[str, dict[str, Any]] = {}
org_names = {org.id: org.name for org in organizations_with_games}
# Step 5: Group games by organization
game_org_map = {}
for relation in game_org_relations: for relation in game_org_relations:
org_id = relation["owner_id"] org_id: str = relation["owner_id"]
game_id = relation["game_id"] game_id: str = relation["game_id"]
if org_id not in game_org_map: if org_id not in game_org_map:
game_org_map[org_id] = {} game_org_map[org_id] = {}
if game_id not in game_org_map[org_id]: if game_id not in game_org_map[org_id]:
game = all_games.get(game_id) game: Game | None = all_games.get(game_id)
if game: if game:
game_org_map[org_id][game_id] = { game_org_map[org_id][game_id] = {
"game": game, "game": game,
@ -228,21 +193,18 @@ class GameListView(ListView):
"active_count": relation["active_count"], "active_count": relation["active_count"],
} }
# Step 6: Convert the nested dictionary to the format expected by the template
for org_id, games in game_org_map.items(): for org_id, games in game_org_map.items():
if org_id in org_names: if org_id in org_names:
# Create an Organization-like object with id and name org_obj = OrganizationData(id=org_id, name=org_names[org_id])
org_obj = type("Organization", (), {"id": org_id, "name": org_names[org_id]})
games_by_org[org_obj] = list(games.values()) games_by_org[org_obj] = list(games.values())
# Create the flattened games_with_counts for backward compatibility games_with_counts: list[dict[str, Game | dict[str, int]]] = []
games_with_counts = []
for org_games in games_by_org.values(): for org_games in games_by_org.values():
games_with_counts.extend(org_games) games_with_counts.extend(org_games)
# Add to context context["games_with_counts"] = games_with_counts
context["games_with_counts"] = games_with_counts # Keep the original list for backward compatibility context["games_by_org"] = games_by_org
context["games_by_org"] = games_by_org # Add the new organized structure
return context return context
@ -264,27 +226,28 @@ class GameDetailView(DetailView):
dict: Context data with active, upcoming, and expired campaigns. dict: Context data with active, upcoming, and expired campaigns.
Expired campaigns are filtered based on either end date or status. Expired campaigns are filtered based on either end date or status.
""" """
context = super().get_context_data(**kwargs) context: dict[str, Any] = super().get_context_data(**kwargs)
game = self.get_object() game: Game = self.get_object()
# Get all campaigns for this game in a single query with prefetching now: datetime.datetime = timezone.now()
now = timezone.now() all_campaigns: QuerySet[DropCampaign, DropCampaign] = (
all_campaigns = DropCampaign.objects.filter(game=game).select_related("owner").order_by("-end_at") DropCampaign.objects.filter(game=game).select_related("owner").order_by("-end_at")
)
# Filter the campaigns in Python instead of making multiple queries active_campaigns: list[DropCampaign] = [
active_campaigns = [campaign for campaign in all_campaigns if campaign.start_at <= now and campaign.end_at >= now] campaign for campaign in all_campaigns if campaign.start_at <= now and campaign.end_at >= now
active_campaigns.sort(key=lambda c: c.end_at) # Sort by end_at ascending ]
active_campaigns.sort(key=lambda c: c.end_at)
upcoming_campaigns = [campaign for campaign in all_campaigns if campaign.start_at > now] upcoming_campaigns: list[DropCampaign] = [campaign for campaign in all_campaigns if campaign.start_at > now]
upcoming_campaigns.sort(key=lambda c: c.start_at) # Sort by start_at ascending upcoming_campaigns.sort(key=lambda c: c.start_at)
# Filter for expired campaigns expired_campaigns: list[DropCampaign] = [campaign for campaign in all_campaigns if campaign.end_at < now]
expired_campaigns = [campaign for campaign in all_campaigns if campaign.end_at < now]
context.update({ context.update({
"active_campaigns": active_campaigns, "active_campaigns": active_campaigns,
"upcoming_campaigns": upcoming_campaigns, "upcoming_campaigns": upcoming_campaigns,
"expired_campaigns": expired_campaigns, # Only include expired campaigns "expired_campaigns": expired_campaigns,
"now": now, "now": now,
}) })
@ -300,42 +263,34 @@ def dashboard(request: HttpRequest) -> HttpResponse:
Returns: Returns:
HttpResponse: The rendered dashboard template. HttpResponse: The rendered dashboard template.
""" """
# Get active campaigns with prefetching to reduce queries now: datetime.datetime = timezone.now()
now = timezone.now() active_campaigns: QuerySet[DropCampaign, DropCampaign] = (
active_campaigns = (
DropCampaign.objects.filter(start_at__lte=now, end_at__gte=now) DropCampaign.objects.filter(start_at__lte=now, end_at__gte=now)
.select_related("game", "owner") .select_related("game", "owner")
# Prefetch the time-based drops with their benefits to avoid N+1 queries
.prefetch_related(Prefetch("time_based_drops", queryset=TimeBasedDrop.objects.prefetch_related("benefits"))) .prefetch_related(Prefetch("time_based_drops", queryset=TimeBasedDrop.objects.prefetch_related("benefits")))
) )
# Group active campaigns by organization and then by game campaigns_by_org_game: dict[str, Any] = {}
campaigns_by_org_game = {}
for campaign in active_campaigns: for campaign in active_campaigns:
org_id = campaign.owner.id org_id: str = campaign.owner.id
org_name = campaign.owner.name org_name: str = campaign.owner.name
game_id = campaign.game.id game_id: str = campaign.game.id
game_name = campaign.game.display_name game_name: str = campaign.game.display_name
# Initialize org dict if not exists
if org_id not in campaigns_by_org_game: if org_id not in campaigns_by_org_game:
campaigns_by_org_game[org_id] = {"name": org_name, "games": {}} campaigns_by_org_game[org_id] = {"name": org_name, "games": {}}
# Initialize game list if not exists
if game_id not in campaigns_by_org_game[org_id]["games"]: if game_id not in campaigns_by_org_game[org_id]["games"]:
campaigns_by_org_game[org_id]["games"][game_id] = {"name": game_name, "campaigns": []} campaigns_by_org_game[org_id]["games"][game_id] = {"name": game_name, "campaigns": []}
# Add campaign to the game list
campaigns_by_org_game[org_id]["games"][game_id]["campaigns"].append(campaign) campaigns_by_org_game[org_id]["games"][game_id]["campaigns"].append(campaign)
# Sort organizations alphabetically sorted_campaigns_by_org_game: dict[str, Any] = {
sorted_campaigns_by_org_game = {
org_id: campaigns_by_org_game[org_id] org_id: campaigns_by_org_game[org_id]
for org_id in sorted(campaigns_by_org_game.keys(), key=lambda k: campaigns_by_org_game[k]["name"]) for org_id in sorted(campaigns_by_org_game.keys(), key=lambda k: campaigns_by_org_game[k]["name"])
} }
# Sort games alphabetically within each organization
for org_data in sorted_campaigns_by_org_game.values(): for org_data in sorted_campaigns_by_org_game.values():
org_data["games"] = { org_data["games"] = {
game_id: org_data["games"][game_id] for game_id in sorted(org_data["games"].keys(), key=lambda k: org_data["games"][k]["name"]) game_id: org_data["games"][game_id] for game_id in sorted(org_data["games"].keys(), key=lambda k: org_data["games"][k]["name"])
@ -345,8 +300,8 @@ def dashboard(request: HttpRequest) -> HttpResponse:
request, request,
"twitch/dashboard.html", "twitch/dashboard.html",
{ {
"active_campaigns": active_campaigns, # Keep the original list for backward compatibility "active_campaigns": active_campaigns,
"campaigns_by_org_game": sorted_campaigns_by_org_game, # Add the new organized structure "campaigns_by_org_game": sorted_campaigns_by_org_game,
"now": now, "now": now,
}, },
) )