Don't use users

This commit is contained in:
2024-07-05 23:50:37 +02:00
parent 474e2b5bff
commit 19feb42ef7
14 changed files with 397 additions and 432 deletions

View File

@ -1,21 +1,17 @@
import datetime
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING
import httpx
from django.contrib import messages
from django.contrib.auth.decorators import login_required
from django.db.models.manager import BaseManager
from django.http import (
HttpRequest,
HttpResponse,
)
from django.shortcuts import redirect, render
from django.template.response import TemplateResponse
from django.views.generic import ListView
from django.views.decorators.http import require_POST
from django.views.generic import ListView, TemplateView
from core.discord import send
from core.models import DiscordSetting
from twitch_app.models import (
DropBenefit,
DropCampaign,
@ -25,12 +21,6 @@ from twitch_app.models import (
from .forms import DiscordSettingForm
if TYPE_CHECKING:
from django.contrib.auth.base_user import AbstractBaseUser
from django.contrib.auth.models import AnonymousUser
from django.db.models.manager import BaseManager
from django.views.decorators.http import require_POST
logger: logging.Logger = logging.getLogger(__name__)
@ -75,81 +65,86 @@ class GameContext:
slug: str | None = None
def index(request: HttpRequest) -> HttpResponse:
"""/ index page.
def fetch_games() -> list[Game]:
"""Fetch all games with necessary fields."""
return list(Game.objects.all().only("id", "image_url", "display_name", "slug"))
Args:
request: The request.
Returns:
HttpResponse: Returns the index page.
"""
list_of_games: list[GameContext] = []
for game in Game.objects.all().only("id", "image_url", "display_name", "slug"):
campaigns: list[CampaignContext] = []
for campaign in DropCampaign.objects.filter(
game=game,
status="ACTIVE",
end_at__gt=datetime.datetime.now(tz=datetime.UTC),
).only(
"id",
"name",
"image_url",
"status",
"account_link_url",
"description",
"details_url",
"start_at",
"end_at",
):
drops: list[DropContext] = []
drop: TimeBasedDrop
for drop in campaign.time_based_drops.all().only(
"id",
"name",
"required_minutes_watched",
"required_subs",
):
benefit: DropBenefit | None = drop.benefits.first()
image_asset_url: str = (
benefit.image_asset_url
if benefit
else "https://static-cdn.jtvnw.net/twitch-quests-assets/CAMPAIGN/default.png"
)
drops.append(
DropContext(
drops_id=drop.id,
image_url=image_asset_url,
name=drop.name,
required_minutes_watched=drop.required_minutes_watched,
required_subs=drop.required_subs,
),
)
if not drops:
logger.info("No drops found for %s", campaign.name)
continue
campaigns.append(
CampaignContext(
drop_id=campaign.id,
name=campaign.name,
image_url=campaign.image_url,
status=campaign.status,
account_link_url=campaign.account_link_url,
description=campaign.description,
details_url=campaign.details_url,
start_at=campaign.start_at,
end_at=campaign.end_at,
drops=drops,
),
)
if not campaigns:
logger.info("No campaigns found for %s", game.display_name)
def fetch_campaigns(game: Game) -> list[CampaignContext]:
"""Fetch active campaigns for a given game."""
campaigns: list[CampaignContext] = []
for campaign in DropCampaign.objects.filter(
game=game,
status="ACTIVE",
end_at__gt=datetime.datetime.now(tz=datetime.UTC),
).only(
"id",
"name",
"image_url",
"status",
"account_link_url",
"description",
"details_url",
"start_at",
"end_at",
):
drops = fetch_drops(campaign)
if not drops:
logger.info("No drops found for %s", campaign.name)
continue
campaigns.append(
CampaignContext(
drop_id=campaign.id,
name=campaign.name,
image_url=campaign.image_url,
status=campaign.status,
account_link_url=campaign.account_link_url,
description=campaign.description,
details_url=campaign.details_url,
start_at=campaign.start_at,
end_at=campaign.end_at,
drops=drops,
),
)
return campaigns
def fetch_drops(campaign: DropCampaign) -> list[DropContext]:
"""Fetch drops for a given campaign."""
drops: list[DropContext] = []
drop: TimeBasedDrop
for drop in campaign.time_based_drops.all().only(
"id",
"name",
"required_minutes_watched",
"required_subs",
):
benefit: DropBenefit | None = drop.benefits.first()
image_asset_url: str = "https://static-cdn.jtvnw.net/twitch-quests-assets/CAMPAIGN/default.png"
if benefit and benefit.image_asset_url:
image_asset_url = benefit.image_asset_url
drops.append(
DropContext(
drops_id=drop.id,
image_url=image_asset_url,
name=drop.name,
required_minutes_watched=drop.required_minutes_watched,
required_subs=drop.required_subs,
),
)
return drops
def prepare_game_contexts() -> list[GameContext]:
"""Prepare game contexts with their respective campaigns and drops."""
list_of_games: list[GameContext] = []
for game in fetch_games():
campaigns: list[CampaignContext] = fetch_campaigns(game)
if not campaigns:
continue
list_of_games.append(
GameContext(
game_id=game.id,
@ -159,131 +154,106 @@ def index(request: HttpRequest) -> HttpResponse:
twitch_url=game.twitch_url,
),
)
return list_of_games
context: dict[str, list[GameContext]] = {"games": list_of_games}
def sort_games_by_campaign_start(list_of_games: list[GameContext]) -> list[GameContext]:
"""Sort games by the start date of the first campaign and reverse the list so the latest games are first."""
if list_of_games and list_of_games[0].campaigns:
list_of_games.sort(
key=lambda x: x.campaigns[0].start_at
if x.campaigns and x.campaigns[0].start_at is not None
else datetime.datetime.min,
)
list_of_games.reverse()
return list_of_games
def index(request: HttpRequest) -> HttpResponse:
"""Render the index page."""
list_of_games: list[GameContext] = prepare_game_contexts()
sorted_list_of_games: list[GameContext] = sort_games_by_campaign_start(list_of_games)
return TemplateResponse(
request=request,
template="index.html",
context=context,
context={"games": sorted_list_of_games},
)
@require_POST
def test_webhook(request: HttpRequest) -> HttpResponse:
"""Test webhook.
Args:
request: The request.
Returns:
HttpResponse: Returns a response.
"""
org_id: str | None = request.POST.get("org_id")
if not org_id:
return HttpResponse(status=400)
campaign: DropCampaign = DropCampaign.objects.get(id=org_id)
msg: str = f"Found new drop for {campaign.game.display_name}:\n{campaign.name}\n{campaign.description}"
send(msg.strip())
return HttpResponse(status=200)
@login_required
def add_discord_webhook(request: HttpRequest) -> HttpResponse:
"""Add Discord webhook."""
if request.method == "POST":
form = DiscordSettingForm(request.POST)
if form.is_valid():
DiscordSetting.objects.create(
user=request.user,
name=form.cleaned_data["name"],
webhook_url=form.cleaned_data["webhook_url"],
disabled=False,
)
messages.success(
request=request,
message=f"Webhook '{form.cleaned_data["name"]}' added ({form.cleaned_data["webhook_url"]})",
)
return redirect("core:add_discord_webhook")
else:
form = DiscordSettingForm()
webhooks: BaseManager[DiscordSetting] = DiscordSetting.objects.filter(
user=request.user,
)
return render(
request,
"add_discord_webhook.html",
{"form": form, "webhooks": webhooks},
)
@login_required
def delete_discord_webhook(request: HttpRequest) -> HttpResponse:
"""Delete Discord webhook."""
if request.method == "POST":
DiscordSetting.objects.filter(
id=request.POST.get("webhook_id"),
name=request.POST.get("webhook_name"),
webhook_url=request.POST.get("webhook_url"),
user=request.user,
).delete()
messages.success(
request=request,
message=f"Webhook '{request.POST.get("webhook_name")}' deleted ({request.POST.get("webhook_url")})",
)
return redirect("core:add_discord_webhook")
@login_required
def subscription_create(request: HttpRequest) -> HttpResponse:
"""Create subscription."""
if request.method == "POST":
game: Game = Game.objects.get(id=request.POST.get("game_id"))
user: AbstractBaseUser | AnonymousUser = request.user
webhook_id: str | None = request.POST.get("webhook_id")
if not webhook_id:
messages.error(request, "No webhook ID provided.")
return redirect("core:index")
if not user.is_authenticated:
messages.error(
request,
"You need to be logged in to create a subscription.",
)
return redirect("core:index")
logger.info(
"Current webhooks: %s",
DiscordSetting.objects.filter(user=user).values_list("id", flat=True),
)
discord_webhook: DiscordSetting = DiscordSetting.objects.get(
id=int(webhook_id),
user=user,
)
messages.success(request, "Subscription created")
send(
message=f"This channel will now receive a notification when a new Twitch drop for **{game}** is available.", # noqa: E501
webhook_url=discord_webhook.webhook_url,
)
return redirect("core:index")
messages.error(request, "Failed to create subscription")
return redirect("core:index")
class GameView(ListView):
model = Game
template_name: str = "games.html"
context_object_name: str = "games"
paginate_by = 100
@dataclass
class WebhookData:
"""The webhook data."""
name: str | None = None
url: str | None = None
status: str | None = None
response: str | None = None
class Webhooks(TemplateView):
model = Game
template_name: str = "webhooks.html"
context_object_name: str = "webhooks"
paginate_by = 100
def get_context_data(self, **kwargs) -> dict[str, list[WebhookData] | DiscordSettingForm]: # noqa: ANN003, ARG002
"""Get the context data for the view."""
cookie: str = self.request.COOKIES.get("webhooks", "")
webhooks: list[str] = cookie.split(",")
webhooks = list(filter(None, webhooks))
webhook_respones: list[WebhookData] = []
# Use httpx to connect to webhook url and get the response
# Use the response to get name of the webhook
with httpx.Client() as client:
for webhook in webhooks:
our_webhook = WebhookData(name="Unknown", url=webhook, status="Failed", response="No response")
response: httpx.Response = client.get(url=webhook)
if response.is_success:
our_webhook.name = response.json()["name"]
our_webhook.status = "Success"
our_webhook.response = response.text
else:
our_webhook.status = "Failed"
our_webhook.response = response.text
# Add to the list of webhooks
webhook_respones.append(our_webhook)
return {"webhooks": webhook_respones, "form": DiscordSettingForm()}
@require_POST
def add_webhook(request: HttpRequest) -> HttpResponse:
"""Add a webhook to the list of webhooks."""
form = DiscordSettingForm(request.POST)
if form.is_valid():
webhook = str(form.cleaned_data["webhook"])
response = HttpResponse()
if "webhooks" in request.COOKIES:
cookie: str = request.COOKIES["webhooks"]
webhooks: list[str] = cookie.split(",")
webhooks = list(filter(None, webhooks))
if webhook in webhooks:
messages.error(request, "Webhook already exists.")
return response
webhooks.append(webhook)
webhook: str = ",".join(webhooks)
response.set_cookie(key="webhooks", value=webhook, max_age=60 * 60 * 24 * 365)
messages.info(request, "Webhook successfully added.")
return response
return HttpResponse(status=400, content="Invalid form data.")