/campaigns/")
),
"has_sample": bool(sample_game),
"example_xml": render_feed(GameCampaignFeed(), sample_game.twitch_id)
if sample_game
else "",
"example_xml_atom": (
render_feed(GameCampaignAtomFeed(), sample_game.twitch_id)
if sample_game and show_atom
else ""
),
"example_xml_discord": (
render_feed(GameCampaignDiscordFeed(), sample_game.twitch_id)
if sample_game and show_atom
else ""
),
},
]
seo_context: dict[str, Any] = _build_seo_context(
page_title="Twitch RSS Feeds",
page_description="RSS feeds for Twitch drops.",
)
return render(
request,
"twitch/docs_rss.html",
{
"feeds": feeds,
"filtered_feeds": filtered_feeds,
"sample_game": sample_game,
"sample_org": sample_org,
**seo_context,
},
)
# MARK: /debug/
def debug_view(request: HttpRequest) -> HttpResponse:
"""Debug view showing potentially broken or inconsistent data.
Returns:
HttpResponse: Rendered debug template or redirect if unauthorized.
"""
now: datetime.datetime = timezone.now()
# Games with no assigned owner organization
games_without_owner: QuerySet[Game] = Game.objects.filter(
owners__isnull=True,
).order_by("display_name")
# Campaigns with no images at all (no direct URL and no benefit image fallbacks)
broken_image_campaigns: QuerySet[DropCampaign] = (
DropCampaign.objects
.filter(
Q(image_url__isnull=True)
| Q(image_url__exact="")
| ~Q(image_url__startswith="http"),
)
.exclude(
Exists(
TimeBasedDrop.objects.filter(campaign=OuterRef("pk")).filter(
benefits__image_asset_url__startswith="http",
),
),
)
.select_related("game")
)
# Benefits with missing images
broken_benefit_images: QuerySet[DropBenefit] = DropBenefit.objects.annotate(
trimmed_url=Trim("image_asset_url"),
).filter(
Q(image_asset_url__isnull=True)
| Q(trimmed_url__exact="")
| ~Q(image_asset_url__startswith="http"),
)
# Time-based drops without any benefits
drops_without_benefits: QuerySet[TimeBasedDrop] = TimeBasedDrop.objects.filter(
benefits__isnull=True,
).select_related("campaign__game")
# Campaigns with invalid dates (start after end or missing either)
invalid_date_campaigns: QuerySet[DropCampaign] = DropCampaign.objects.filter(
Q(start_at__gt=F("end_at")) | Q(start_at__isnull=True) | Q(end_at__isnull=True),
).select_related("game")
# Duplicate campaign names per game.
# We retrieve the game's name for user-friendly display.
duplicate_name_campaigns: QuerySet[DropCampaign, dict[str, Any]] = (
DropCampaign.objects
.values("game__display_name", "name", "game__twitch_id")
.annotate(name_count=Count("twitch_id"))
.filter(name_count__gt=1)
.order_by("game__display_name", "name")
)
# Active campaigns with no images at all
active_missing_image: QuerySet[DropCampaign] = (
DropCampaign.objects
.filter(start_at__lte=now, end_at__gte=now)
.filter(
Q(image_url__isnull=True)
| Q(image_url__exact="")
| ~Q(image_url__startswith="http"),
)
.exclude(
Exists(
TimeBasedDrop.objects.filter(campaign=OuterRef("pk")).filter(
benefits__image_asset_url__startswith="http",
),
),
)
.select_related("game")
)
# Distinct GraphQL operation names used to fetch campaigns with counts
# Since operation_names is now a JSON list field, we need to flatten and count
operation_names_counter: dict[str, int] = {}
for campaign in DropCampaign.objects.only("operation_names"):
for op_name in campaign.operation_names:
if op_name and op_name.strip():
operation_names_counter[op_name.strip()] = (
operation_names_counter.get(op_name.strip(), 0) + 1
)
operation_names_with_counts: list[dict[str, Any]] = [
{"trimmed_op": op_name, "count": count}
for op_name, count in sorted(operation_names_counter.items())
]
# Campaigns missing DropCampaignDetails operation name
# Need to handle SQLite separately since it doesn't support JSONField lookups
# Sqlite is used when testing
if connection.vendor == "sqlite":
all_campaigns: QuerySet[DropCampaign] = DropCampaign.objects.select_related(
"game",
).order_by("game__display_name", "name")
campaigns_missing_dropcampaigndetails: list[DropCampaign] = [
c
for c in all_campaigns
if c.operation_names is None
or "DropCampaignDetails" not in c.operation_names
]
else:
campaigns_missing_dropcampaigndetails: list[DropCampaign] = list(
DropCampaign.objects
.filter(
Q(operation_names__isnull=True)
| ~Q(operation_names__contains=["DropCampaignDetails"]),
)
.select_related("game")
.order_by("game__display_name", "name"),
)
context: dict[str, Any] = {
"now": now,
"games_without_owner": games_without_owner,
"broken_image_campaigns": broken_image_campaigns,
"broken_benefit_images": broken_benefit_images,
"drops_without_benefits": drops_without_benefits,
"invalid_date_campaigns": invalid_date_campaigns,
"duplicate_name_campaigns": duplicate_name_campaigns,
"active_missing_image": active_missing_image,
"operation_names_with_counts": operation_names_with_counts,
"campaigns_missing_dropcampaigndetails": campaigns_missing_dropcampaigndetails,
}
seo_context: dict[str, Any] = _build_seo_context(
page_title="Debug",
page_description="Debug view showing potentially broken or inconsistent data.",
robots_directive="noindex, nofollow",
)
context.update(seo_context)
return render(request, "twitch/debug.html", context)
# MARK: /datasets/
def dataset_backups_view(request: HttpRequest) -> HttpResponse:
"""View to list database backup datasets on disk.
Args:
request: The HTTP request.
Returns:
HttpResponse: The rendered dataset backups page.
"""
# TODO(TheLovinator): Instead of only using sql we should also support other formats like parquet, csv, or json. # noqa: TD003
# TODO(TheLovinator): Upload to s3 instead. # noqa: TD003
# TODO(TheLovinator): https://developers.google.com/search/docs/appearance/structured-data/dataset#json-ld
datasets_root: Path = settings.DATA_DIR / "datasets"
search_dirs: list[Path] = [datasets_root]
seen_paths: set[str] = set()
datasets: list[dict[str, Any]] = []
for folder in search_dirs:
if not folder.exists() or not folder.is_dir():
continue
# Only include .zst files
for path in folder.glob("*.zst"):
if not path.is_file():
continue
key = str(path.resolve())
if key in seen_paths:
continue
seen_paths.add(key)
stat: stat_result = path.stat()
updated_at: datetime.datetime = datetime.datetime.fromtimestamp(
stat.st_mtime,
tz=timezone.get_current_timezone(),
)
try:
display_path = str(path.relative_to(datasets_root))
download_path: str | None = display_path
except ValueError:
display_path: str = path.name
download_path: str | None = None
datasets.append({
"name": path.name,
"display_path": display_path,
"download_path": download_path,
"size": filesizeformat(stat.st_size),
"updated_at": updated_at,
})
datasets.sort(key=operator.itemgetter("updated_at"), reverse=True)
seo_context: dict[str, Any] = _build_seo_context(
page_title="Twitch Dataset",
page_description="Database backups and datasets available for download.",
)
context: dict[str, Any] = {
"datasets": datasets,
"data_dir": str(datasets_root),
"dataset_count": len(datasets),
**seo_context,
}
return render(request, "twitch/dataset_backups.html", context)
def dataset_backup_download_view(
request: HttpRequest, # noqa: ARG001
relative_path: str,
) -> FileResponse:
"""Download a dataset backup from the data directory.
Args:
request: The HTTP request.
relative_path: The path relative to the data directory.
Returns:
FileResponse: The file response for the requested dataset.
Raises:
Http404: When the file is not found or is outside the data directory.
"""
# TODO(TheLovinator): Use s3 instead of local disk. # noqa: TD003
datasets_root: Path = settings.DATA_DIR / "datasets"
requested_path: Path = (datasets_root / relative_path).resolve()
data_root: Path = datasets_root.resolve()
try:
requested_path.relative_to(data_root)
except ValueError as exc:
msg = "File not found"
raise Http404(msg) from exc
if not requested_path.exists() or not requested_path.is_file():
msg = "File not found"
raise Http404(msg)
if not requested_path.name.endswith(".zst"):
msg = "File not found"
raise Http404(msg)
return FileResponse(
requested_path.open("rb"),
as_attachment=True,
filename=requested_path.name,
)
# MARK: /search/
def search_view(request: HttpRequest) -> HttpResponse:
"""Search view for all models.
Args:
request: The HTTP request.
Returns:
HttpResponse: The rendered search results.
"""
query: str = request.GET.get("q", "")
results: dict[str, QuerySet] = {}
if query:
if len(query) < MIN_QUERY_LENGTH_FOR_FTS:
results["organizations"] = Organization.objects.filter(
name__istartswith=query,
)
results["games"] = Game.objects.filter(
Q(name__istartswith=query) | Q(display_name__istartswith=query),
)
results["campaigns"] = DropCampaign.objects.filter(
Q(name__istartswith=query) | Q(description__icontains=query),
).select_related("game")
results["drops"] = TimeBasedDrop.objects.filter(
name__istartswith=query,
).select_related("campaign")
results["benefits"] = DropBenefit.objects.filter(
name__istartswith=query,
).prefetch_related("drops__campaign")
results["reward_campaigns"] = RewardCampaign.objects.filter(
Q(name__istartswith=query)
| Q(brand__istartswith=query)
| Q(summary__icontains=query),
).select_related("game")
results["badge_sets"] = ChatBadgeSet.objects.filter(
set_id__istartswith=query,
)
results["badges"] = ChatBadge.objects.filter(
Q(title__istartswith=query) | Q(description__icontains=query),
).select_related("badge_set")
else:
results["organizations"] = Organization.objects.filter(
name__icontains=query,
)
results["games"] = Game.objects.filter(
Q(name__icontains=query) | Q(display_name__icontains=query),
)
results["campaigns"] = DropCampaign.objects.filter(
Q(name__icontains=query) | Q(description__icontains=query),
).select_related("game")
results["drops"] = TimeBasedDrop.objects.filter(
name__icontains=query,
).select_related("campaign")
results["benefits"] = DropBenefit.objects.filter(
name__icontains=query,
).prefetch_related("drops__campaign")
results["reward_campaigns"] = RewardCampaign.objects.filter(
Q(name__icontains=query)
| Q(brand__icontains=query)
| Q(summary__icontains=query),
).select_related("game")
results["badge_sets"] = ChatBadgeSet.objects.filter(set_id__icontains=query)
results["badges"] = ChatBadge.objects.filter(
Q(title__icontains=query) | Q(description__icontains=query),
).select_related("badge_set")
total_results_count: int = sum(len(qs) for qs in results.values())
# TODO(TheLovinator): Make the description more informative by including counts of each result type, e.g. "Found 5 games, 3 campaigns, and 10 drops for 'rust'." # noqa: TD003
if query:
page_title: str = f"Search Results for '{query}'"[:60]
page_description: str = f"Found {total_results_count} results for '{query}'."
else:
page_title = "Search"
page_description = "Search for drops, games, channels, and organizations."
seo_context: dict[str, Any] = _build_seo_context(
page_title=page_title,
page_description=page_description,
)
return render(
request,
"twitch/search_results.html",
{"query": query, "results": results, **seo_context},
)
# MARK: /
def dashboard(request: HttpRequest) -> HttpResponse: # noqa: ARG001
"""Dashboard view showing summary stats and latest campaigns.
Args:
request: The HTTP request.
Returns:
HttpResponse: The rendered dashboard page.
"""
# Return HTML to show that the view is working.
return HttpResponse(
"Welcome to the Twitch Drops Dashboard
Use the navigation to explore campaigns, games, organizations, and more.
",
content_type="text/html",
)