Files
twitch-drop-notifier/core/models.py
2024-09-22 11:42:20 +02:00

1028 lines
36 KiB
Python

from __future__ import annotations
import logging
from datetime import datetime
from io import BytesIO
from typing import TYPE_CHECKING, Any, ClassVar, Literal, Self, cast
import requests
from asgiref.sync import sync_to_async
from django.contrib.auth.models import AbstractUser
from django.core.files import File
from django.core.files.base import ContentFile
from django.db import models
from PIL import Image
if TYPE_CHECKING:
from django.db.models.fields.files import ImageFieldFile
logger: logging.Logger = logging.getLogger(__name__)
# The image file format to save images as.
image_file_format: Literal["webp"] = "webp"
async def update_scraped_json(model_instance: models.Model, new_json_data: dict) -> None:
"""Update the JSON data for the drop campaign.
Args:
model_instance (models.Model): The Django model instance which must have a 'scraped_json' attribute.
new_json_data (dict): The new JSON data to be updated.
"""
if await sync_to_async(hasattr)(model_instance, "scraped_json"):
if model_instance.scraped_json: # type: ignore[attr-defined]
model_instance.scraped_json.json_data = new_json_data # type: ignore[attr-defined]
await model_instance.scraped_json.asave() # type: ignore[attr-defined]
else:
scraped_json_instance: ScrapedJson = await ScrapedJson.objects.acreate(json_data=new_json_data)
model_instance.scraped_json = scraped_json_instance # type: ignore[attr-defined]
await model_instance.asave()
else:
logger.error("The model instance does not have a 'scraped_json' attribute.")
model_instance.save()
def wrong_typename(data: dict, expected: str) -> bool:
"""Check if the data is the expected type.
# TODO(TheLovinator): Double check this. # noqa: TD003
Type name examples:
- Game
- DropCampaign
- TimeBasedDrop
- DropBenefit
- RewardCampaign
- Reward
Args:
data (dict): The data to check.
expected (str): The expected type.
Returns:
bool: True if the data is not the expected type.
"""
is_unexpected_type: bool = data.get("__typename", "") != expected
if is_unexpected_type:
logger.error("Not a %s? %s", expected, data)
return is_unexpected_type
def update_field(instance: models.Model, django_field_name: str, new_value: str | datetime | None) -> int:
"""Update a field on an instance if the new value is different from the current value.
Args:
instance (models.Model): The Django model instance.
django_field_name (str): The name of the field to update.
new_value (str | datetime | None): The new value to update the field with.
Returns:
int: If the field was updated, returns 1. Otherwise, returns 0.
"""
# Get the current value of the field.
try:
current_value = getattr(instance, django_field_name)
except AttributeError:
logger.exception("Field %s does not exist on %s", django_field_name, instance)
return 0
# Only update the field if the new value is different from the current value.
if new_value and new_value != current_value:
setattr(instance, django_field_name, new_value)
return 1
# 0 fields updated.
return 0
def get_value(data: dict, key: str) -> datetime | str | None:
"""Get a value from a dictionary.
We have this function so we can handle values that we need to convert to a different type. For example, we might
need to convert a string to a datetime object.
Args:
data (dict): The dictionary to get the value from.
key (str): The key to get the value for.
Returns:
datetime | str | None: The value from the dictionary
"""
data_key: Any | None = data.get(key)
if not data_key:
return None
# Dates are in the format "2024-08-12T05:59:59.999Z"
dates: list[str] = ["endAt", "endsAt,", "startAt", "startsAt", "createdAt", "earnableUntil"]
if key in dates:
return datetime.fromisoformat(data_key.replace("Z", "+00:00"))
return data_key
async def update_fields(instance: models.Model, data: dict, field_mapping: dict[str, str]) -> int:
"""Update multiple fields on an instance using a mapping from external field names to model field names.
Args:
instance (models.Model): The Django model instance.
data (dict): The new data to update the fields with.
field_mapping (dict[str, str]): A dictionary mapping external field names to model field names.
Returns:
int: The number of fields updated. Used for only saving the instance if there were changes.
"""
dirty = 0
for json_field, django_field_name in field_mapping.items():
data_key: datetime | str | None = get_value(data, json_field)
dirty += update_field(instance=instance, django_field_name=django_field_name, new_value=data_key)
if dirty > 0:
await instance.asave()
return dirty
def convert_image_to_webp(data: bytes | None) -> File | None:
"""Convert an image to a webp format.
Args:
data (bytes | None): The image data to convert.
Returns:
ImageFile | None: The image converted to a webp format.
"""
if not data:
return None
try:
with BytesIO(data) as input_buffer, Image.open(input_buffer) as image:
output_buffer = BytesIO()
image.save(output_buffer, format=image_file_format)
output_buffer.seek(0)
return File(file=Image.open(output_buffer))
except Exception:
logger.exception("Failed to convert image to webp.")
return File(file=Image.open(fp=ContentFile(data)).convert("RGB"))
def fetch_image(image_url: str) -> bytes | None:
"""Fetch an image from a URL and return the response.
Args:
image_url (str): The URL of the image to fetch.
Returns:
requests.Response | None: The response if the image was fetched, otherwise None.
"""
response: requests.Response = requests.get(image_url, timeout=5, stream=True)
response.raise_for_status()
if response.ok:
if response.raw:
logging.debug("Fetched image from %s", image_url)
return response.raw.read()
logging.error("Response raw is None for %s", image_url)
return None
logging.error("Failed to retrieve content. Status code: %s", response.status_code)
return None
class User(AbstractUser): ...
class ScrapedJson(models.Model):
json_data = models.JSONField(unique=True, help_text="The JSON data from the Twitch API.")
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
ordering: ClassVar[list[str]] = ["-created_at"]
def __str__(self) -> str:
return f"{self.pk}"
class Owner(models.Model):
"""The company or person that owns the game.
Drops will be grouped by the owner. Users can also subscribe to owners.
"""
# "ad299ac0-f1a5-417d-881d-952c9aed00e9"
twitch_id = models.TextField(primary_key=True)
# When the owner was first added to the database.
created_at = models.DateTimeField(null=True, auto_created=True)
# When the owner was last modified.
modified_at = models.DateTimeField(null=True, auto_now=True)
# "Microsoft"
name = models.TextField(null=True)
class Meta:
ordering: ClassVar[list[str]] = ["name"]
def __str__(self) -> str:
return self.name or self.twitch_id
async def aimport_json(self, data: dict) -> Self:
if wrong_typename(data, "Organization"):
return self
field_mapping: dict[str, str] = {"name": "name"}
updated: int = await update_fields(instance=self, data=data, field_mapping=field_mapping)
if updated > 0:
logger.info("Updated %s fields for %s", updated, self)
return self
def get_game_image_path(instance: models.Model, filename: str) -> str:
"""Get the path for the game image.
Args:
instance (models.Model): The instance of the model. Is a Game.
filename (str): The filename of the image.
Returns:
str: The path to the image.
"""
instance = cast(Game, instance)
# Example: game/509658.png
image_path: str = f"game/{filename}"
logger.debug("Saved image to %s", image_path)
return image_path
class Game(models.Model):
"""The game the drop campaign is for. Note that some reward campaigns are not tied to a game."""
# "509658"
twitch_id = models.TextField(primary_key=True)
# When the game was first added to the database.
created_at = models.DateTimeField(null=True, auto_created=True)
# When the game was last modified.
modified_at = models.DateTimeField(null=True, auto_now=True)
# "https://www.twitch.tv/directory/category/halo-infinite"
game_url = models.URLField(null=True)
# "Halo Infinite"
name = models.TextField(null=True)
# "https://static-cdn.jtvnw.net/ttv-boxart/Halo%20Infinite.jpg"
box_art_url = models.URLField(null=True)
image = models.ImageField(null=True, upload_to=get_game_image_path)
# "halo-infinite"
slug = models.TextField(null=True, unique=True)
org = models.ForeignKey(Owner, on_delete=models.CASCADE, related_name="games", null=True)
class Meta:
ordering: ClassVar[list[str]] = ["name"]
def __str__(self) -> str:
return self.name or self.twitch_id
def download_image(self) -> ImageFieldFile | None:
"""Download the image for the game.
Returns:
ImageFieldFile | None: The image file or None if it doesn't exist.
"""
# We don't want to re-download the image if it already exists.
# TODO(TheLovinator): Check if there is a different image available. # noqa: TD003
if self.image:
return self.image
if not self.box_art_url:
return None
response: bytes | None = fetch_image(image_url=self.box_art_url)
image: File | None = convert_image_to_webp(response)
if image:
self.image.save(name=f"{self.twitch_id}.{image_file_format}", content=image, save=True)
logger.info("Downloaded image for %s to %s", self, self.image.url)
return None
async def aimport_json(self, data: dict, owner: Owner | None) -> Self:
if wrong_typename(data, "Game"):
return self
# Map the fields from the JSON data to the Django model fields.
field_mapping: dict[str, str] = {
"displayName": "name",
"boxArtURL": "box_art_url", # TODO(TheLovinator): Should download the image. # noqa: TD003
"slug": "slug",
}
updated: int = await update_fields(instance=self, data=data, field_mapping=field_mapping)
if updated > 0:
logger.info("Updated %s fields for %s", updated, self)
# Handle the owner of the game.
if owner and await sync_to_async(lambda: self not in owner.games.all())(): # type: ignore[attr-defined]
await owner.games.aadd(self) # type: ignore # noqa: PGH003
await self.asave()
logger.info("Added game %s for %s", self, owner)
return self
def get_drop_campaign_image_path(instance: models.Model, filename: str) -> str:
"""Get the path for the drop campaign image.
Args:
instance (models.Model): The instance of the model. Is a DropCampaign.
filename (str): The filename of the image.
Returns:
str: The path to the image.
"""
instance = cast(DropCampaign, instance)
# Example: drop_campaigns/509658/509658.png
image_path: str = f"drop_campaign/{filename}"
logger.debug("Saved image to %s", image_path)
return image_path
class DropCampaign(models.Model):
"""This is the drop campaign we will see on the front end."""
# "f257ce6e-502a-11ef-816e-0a58a9feac02"
twitch_id = models.TextField(primary_key=True)
# When the drop campaign was first added to the database.
created_at = models.DateTimeField(null=True, auto_created=True)
# When the drop campaign was last modified.
modified_at = models.DateTimeField(null=True, auto_now=True)
# "https://www.halowaypoint.com/settings/linked-accounts"
account_link_url = models.URLField(null=True)
# "Tune into this HCS Grassroots event to earn Halo Infinite in-game content!"
description = models.TextField(null=True)
# "https://www.halowaypoint.com"
details_url = models.URLField(null=True)
# "2024-08-12T05:59:59.999Z"
ends_at = models.DateTimeField(null=True)
# "2024-08-11T11:00:00Z""
starts_at = models.DateTimeField(null=True)
# "https://static-cdn.jtvnw.net/twitch-quests-assets/CAMPAIGN/c8e02666-8b86-471f-bf38-7ece29a758e4.png"
image_url = models.URLField(null=True)
image = models.ImageField(null=True, upload_to=get_drop_campaign_image_path)
# "HCS Open Series - Week 1 - DAY 2 - AUG11"
name = models.TextField(null=True)
# "ACTIVE"
status = models.TextField(null=True)
game = models.ForeignKey(Game, on_delete=models.CASCADE, related_name="drop_campaigns", null=True)
scraped_json = models.ForeignKey(
ScrapedJson,
null=True,
on_delete=models.SET_NULL,
help_text="Reference to the JSON data from the Twitch API.",
)
class Meta:
ordering: ClassVar[list[str]] = ["ends_at"]
def __str__(self) -> str:
return self.name or self.twitch_id
def download_image(self) -> ImageFieldFile | None:
"""Download the image for the drop campaign.
Returns:
ImageFieldFile | None: The image file or None if it doesn't exist.
"""
# We don't want to re-download the image if it already exists.
# TODO(TheLovinator): Check if there is a different image available. # noqa: TD003
if self.image:
return self.image
if not self.image_url:
return None
response: bytes | None = fetch_image(image_url=self.image_url)
image: File | None = convert_image_to_webp(response)
if image:
self.image.save(name=f"{self.twitch_id}.{image_file_format}", content=image, save=True)
logger.info("Downloaded image for %s to %s", self, self.image.url)
return None
async def aimport_json(self, data: dict, game: Game | None, *, scraping_local_files: bool = False) -> Self:
"""Import the data from the Twitch API.
Args:
data (dict | None): The data from the Twitch API.
game (Game | None): The game this drop campaign is for.
scraping_local_files (bool, optional): If this was scraped from local data. Defaults to True.
Returns:
Self: The updated drop campaign.
"""
if wrong_typename(data, "DropCampaign"):
return self
# Save the JSON data for debugging purposes.
await update_scraped_json(model_instance=self, new_json_data=data)
field_mapping: dict[str, str] = {
"name": "name",
"accountLinkURL": "account_link_url", # TODO(TheLovinator): Should archive site. # noqa: TD003
"description": "description",
"endAt": "ends_at",
"startAt": "starts_at",
"detailsURL": "details_url", # TODO(TheLovinator): Should archive site. # noqa: TD003
"imageURL": "image_url", # TODO(TheLovinator): Should download the image. # noqa: TD003
}
updated: int = await update_fields(instance=self, data=data, field_mapping=field_mapping)
if updated > 0:
logger.info("Updated %s fields for %s", updated, self)
# Update the drop campaign's status if the new status is different.
# When scraping local files:
# - Only update if the status changes from "ACTIVE" to "EXPIRED".
# When scraping from the Twitch API:
# - Always update the status regardless of its value.
status = data.get("status")
if status and status != self.status:
# Check if scraping local files and status changes from ACTIVE to EXPIRED
should_update = scraping_local_files and status == "EXPIRED" and self.status == "ACTIVE"
# Always update if not scraping local files
if not scraping_local_files or should_update:
self.status = status
await self.asave()
# Update the game if the game is different or not set.
if game and await sync_to_async(lambda: game != self.game)():
self.game = game
logger.info("Updated game %s for %s", game, self)
await self.asave()
return self
class TimeBasedDrop(models.Model):
"""This is the drop we will see on the front end."""
# "d5cdf372-502b-11ef-bafd-0a58a9feac02"
twitch_id = models.TextField(primary_key=True)
# When the drop was first added to the database.
created_at = models.DateTimeField(null=True, auto_created=True)
# When the drop was last modified.
modified_at = models.DateTimeField(null=True, auto_now=True)
# "1"
required_subs = models.PositiveBigIntegerField(null=True)
# "2024-08-12T05:59:59.999Z"
ends_at = models.DateTimeField(null=True)
# "Cosmic Nexus Chimera"
name = models.TextField(null=True)
# "120"
required_minutes_watched = models.PositiveBigIntegerField(null=True)
# "2024-08-11T11:00:00Z"
starts_at = models.DateTimeField(null=True)
drop_campaign = models.ForeignKey(DropCampaign, on_delete=models.CASCADE, related_name="drops", null=True)
class Meta:
ordering: ClassVar[list[str]] = ["required_minutes_watched"]
def __str__(self) -> str:
return self.name or self.twitch_id
async def aimport_json(self, data: dict, drop_campaign: DropCampaign | None) -> Self:
if wrong_typename(data, "TimeBasedDrop"):
return self
field_mapping: dict[str, str] = {
"name": "name",
"requiredSubs": "required_subs",
"requiredMinutesWatched": "required_minutes_watched",
"startAt": "starts_at",
"endAt": "ends_at",
}
updated: int = await update_fields(instance=self, data=data, field_mapping=field_mapping)
if updated > 0:
logger.info("Updated %s fields for %s", updated, self)
if drop_campaign and await sync_to_async(lambda: drop_campaign != self.drop_campaign)():
self.drop_campaign = drop_campaign
logger.info("Updated drop campaign %s for %s", drop_campaign, self)
await self.asave()
return self
def get_benefit_image_path(instance: models.Model, filename: str) -> str:
"""Get the path for the benefit image.
Args:
instance (models.Model): The instance of the model. Is a Benefit.
filename (str): The filename of the image.
Returns:
str: The path to the image.
"""
instance = cast(Benefit, instance)
# Example: benefit_images/509658.png
image_path: str = f"benefit/{filename}"
logger.debug("Saved image to %s", image_path)
return image_path
class Benefit(models.Model):
"""Benefits are the rewards for the drops."""
# "d5cdf372-502b-11ef-bafd-0a58a9feac02"
twitch_id = models.TextField(primary_key=True)
# When the benefit was first added to the database.
created_at = models.DateTimeField(null=True, auto_created=True)
# When the benefit was last modified.
modified_at = models.DateTimeField(null=True, auto_now=True)
# Note: This is Twitch's created_at from the API.
# "2023-11-09T01:18:00.126Z"
twitch_created_at = models.DateTimeField(null=True)
# "1"
entitlement_limit = models.PositiveBigIntegerField(null=True)
# "https://static-cdn.jtvnw.net/twitch-quests-assets/REWARD/e58ad175-73f6-4392-80b8-fb0223163733.png"
image_url = models.URLField(null=True)
image = models.ImageField(null=True, upload_to=get_benefit_image_path)
# "True" or "False". None if unknown.
is_ios_available = models.BooleanField(null=True)
# "Cosmic Nexus Chimera"
name = models.TextField(null=True)
time_based_drop = models.ForeignKey(
TimeBasedDrop,
on_delete=models.CASCADE,
related_name="benefits",
null=True,
)
class Meta:
ordering: ClassVar[list[str]] = ["-twitch_created_at"]
def __str__(self) -> str:
return self.name or self.twitch_id
def download_image(self) -> ImageFieldFile | None:
"""Download the image for the benefit.
Returns:
ImageFieldFile | None: The image file or None if it doesn't exist.
"""
# TODO(TheLovinator): Check if the image on Twitch is different. # noqa: TD003
if self.image:
logger.debug("Image already exists for %s", self)
return self.image
if not self.image_url:
logger.error("No image URL for %s", self)
return None
response: bytes | None = fetch_image(image_url=self.image_url)
image: File | None = convert_image_to_webp(response)
if image:
self.image.save(name=f"{self.twitch_id}.{image_file_format}", content=image, save=True)
logger.info("Downloaded image for %s to %s", self, self.image.url)
return None
async def aimport_json(self, data: dict, time_based_drop: TimeBasedDrop | None) -> Self:
if wrong_typename(data, "DropBenefit"):
return self
field_mapping: dict[str, str] = {
"name": "name",
"imageAssetURL": "image_url", # TODO(TheLovinator): Should download the image. # noqa: TD003
"entitlementLimit": "entitlement_limit",
"isIOSAvailable": "is_ios_available",
"createdAt": "twitch_created_at",
}
updated: int = await update_fields(instance=self, data=data, field_mapping=field_mapping)
if updated > 0:
logger.info("Updated %s fields for %s", updated, self)
if time_based_drop and await sync_to_async(lambda: self not in time_based_drop.benefits.all())(): # type: ignore[attr-defined]
await time_based_drop.benefits.aadd(self) # type: ignore # noqa: PGH003
await time_based_drop.asave()
logger.info("Added benefit %s for %s", self, time_based_drop)
return self
def get_reward_image_path(instance: models.Model, filename: str) -> str:
"""Get the path for the reward image.
Args:
instance (models.Model): The instance of the model. Is a Reward.
filename (str): The filename of the image.
Returns:
str: The path to the image.
"""
instance = cast(Reward, instance)
# Example: reward/509658.png
image_path: str = f"reward/{filename}"
logger.debug("Saved image to %s", image_path)
return image_path
class RewardCampaign(models.Model):
"""Buy subscriptions to earn rewards."""
# "dc4ff0b4-4de0-11ef-9ec3-621fb0811846"
twitch_id = models.TextField(primary_key=True)
# When the reward campaign was first added to the database.
created_at = models.DateTimeField(null=True, auto_created=True)
# When the reward campaign was last modified.
modified_at = models.DateTimeField(null=True, auto_now=True)
# "Buy 1 new sub, get 3 months of Apple TV+"
name = models.TextField(null=True)
# "Apple TV+"
brand = models.TextField(null=True)
# "2024-08-11T11:00:00Z"
starts_at = models.DateTimeField(null=True)
# "2024-08-12T05:59:59.999Z"
ends_at = models.DateTimeField(null=True)
# "UNKNOWN"
status = models.TextField(null=True)
# "Get 3 months of Apple TV+ with the purchase of a new sub"
summary = models.TextField(null=True)
# "Buy a new sub to get 3 months of Apple TV+"
instructions = models.TextField(null=True)
# ""
reward_value_url_param = models.TextField(null=True)
# "https://tv.apple.com/includes/commerce/redeem/code-entry"
external_url = models.URLField(null=True)
# "https://blog.twitch.tv/2024/07/26/sub-and-get-apple-tv/"
about_url = models.URLField(null=True)
# "True" or "False". None if unknown.
is_site_wide = models.BooleanField(null=True)
# "1"
subs_goal = models.PositiveBigIntegerField(null=True)
# "0"
minute_watched_goal = models.PositiveBigIntegerField(null=True)
# "https://static-cdn.jtvnw.net/twitch-quests-assets/CAMPAIGN/quests_appletv_q3_2024/apple_150x200.png"
image_url = models.URLField(null=True)
image = models.ImageField(null=True, upload_to=get_reward_image_path)
game = models.ForeignKey(Game, on_delete=models.CASCADE, related_name="reward_campaigns", null=True)
scraped_json = models.ForeignKey(
ScrapedJson,
null=True,
on_delete=models.SET_NULL,
help_text="Reference to the JSON data from the Twitch API.",
)
class Meta:
ordering: ClassVar[list[str]] = ["-starts_at"]
def __str__(self) -> str:
return self.name or self.twitch_id
def download_image(self) -> ImageFieldFile | None:
"""Download the image for the drop campaign.
Returns:
ImageFieldFile | None: The image file or None if it doesn't exist.
"""
# We don't want to re-download the image if it already exists.
# TODO(TheLovinator): Check if there is a different image available. # noqa: TD003
if self.image:
return self.image
if not self.image_url:
return None
response: bytes | None = fetch_image(image_url=self.image_url)
image: File | None = convert_image_to_webp(response)
if image:
file_name: str = f"{self.twitch_id}.{image_file_format}"
self.image.save(name=file_name, content=image, save=True)
logger.info("Downloaded image for %s to %s", self, self.image.url)
return self.image
return None
async def aimport_json(self, data: dict) -> Self:
if wrong_typename(data, "RewardCampaign"):
return self
# Save the JSON data for debugging purposes.
await update_scraped_json(model_instance=self, new_json_data=data)
field_mapping: dict[str, str] = {
"name": "name",
"brand": "brand",
"startsAt": "starts_at",
"endsAt": "ends_at",
"status": "status",
"summary": "summary",
"instructions": "instructions",
"rewardValueURLParam": "reward_value_url_param", # wtf is this?
"externalURL": "external_url",
"aboutURL": "about_url",
"isSitewide": "is_site_wide",
}
updated: int = await update_fields(instance=self, data=data, field_mapping=field_mapping)
if updated > 0:
logger.info("Updated %s fields for %s", updated, self)
if data.get("unlockRequirements", {}):
subs_goal = data["unlockRequirements"].get("subsGoal")
if subs_goal and subs_goal != self.subs_goal:
self.subs_goal = subs_goal
await self.asave()
minutes_watched_goal = data["unlockRequirements"].get("minuteWatchedGoal")
if minutes_watched_goal and minutes_watched_goal != self.minute_watched_goal:
self.minute_watched_goal = minutes_watched_goal
await self.asave()
image_url = data.get("image", {}).get("image1xURL")
if image_url and image_url != self.image_url:
# await sync_to_async(self.download_image)()
# TODO(TheLovinator): Download the image. # noqa: TD003
self.image_url = image_url
await self.asave()
if data.get("game") and data["game"].get("id"):
game, _ = await Game.objects.aget_or_create(twitch_id=data["game"]["id"])
await game.reward_campaigns.aadd(self) # type: ignore # noqa: PGH003
await self.asave()
if "rewards" in data:
for reward in data["rewards"]:
reward_instance, created = await Reward.objects.aupdate_or_create(twitch_id=reward["id"])
await reward_instance.aimport_json(reward, self)
if created:
logger.info("Added reward %s to %s", reward_instance, self)
return self
def get_reward_banner_image_path(instance: models.Model, filename: str) -> str:
"""Get the path for the reward banner image.
Args:
instance (models.Model): The instance of the model. Is a Reward.
filename (str): The filename of the image.
Returns:
str: The path to the image.
"""
instance = cast(Reward, instance)
# Example: reward/banner_509658.png
image_path: str = f"reward/banner_{filename}"
logger.debug("Saved image to %s", image_path)
return image_path
def get_reward_thumbnail_image_path(instance: models.Model, filename: str) -> str:
"""Get the path for the reward thumbnail image.
Args:
instance (models.Model): The instance of the model. Is a Reward.
filename (str): The filename of the image.
Returns:
str: The path to the image.
"""
instance = cast(Reward, instance)
# Example: reward/thumb_509658.png
image_path: str = f"reward/thumb_{filename}"
logger.debug("Saved image to %s", image_path)
return image_path
class Reward(models.Model):
"""This from the RewardCampaign."""
# "dc2e9810-4de0-11ef-9ec3-621fb0811846"
twitch_id = models.TextField(primary_key=True)
# When the reward was first added to the database.
created_at = models.DateTimeField(null=True, auto_created=True)
# When the reward was last modified.
modified_at = models.DateTimeField(null=True, auto_now=True)
# "3 months of Apple TV+"
name = models.TextField(null=True)
# "https://static-cdn.jtvnw.net/twitch-quests-assets/CAMPAIGN/quests_appletv_q3_2024/apple_200x200.png"
banner_image_url = models.URLField(null=True)
banner_image = models.ImageField(null=True, upload_to=get_reward_banner_image_path)
# "https://static-cdn.jtvnw.net/twitch-quests-assets/CAMPAIGN/quests_appletv_q3_2024/apple_200x200.png"
thumbnail_image_url = models.URLField(null=True)
thumbnail_image = models.ImageField(null=True, upload_to=get_reward_thumbnail_image_path)
# "2024-08-19T19:00:00Z"
earnable_until = models.DateTimeField(null=True)
# ""
redemption_instructions = models.TextField(null=True)
# "https://tv.apple.com/includes/commerce/redeem/code-entry"
redemption_url = models.URLField(null=True)
campaign = models.ForeignKey(RewardCampaign, on_delete=models.CASCADE, related_name="rewards", null=True)
class Meta:
ordering: ClassVar[list[str]] = ["-earnable_until"]
def __str__(self) -> str:
return self.name or "Reward name unknown"
def download_banner_image(self) -> ImageFieldFile | None:
"""Download the banner image for the reward.
Returns:
ImageFieldFile | None: The image file or None if it doesn't exist.
"""
# We don't want to re-download the image if it already exists.
# TODO(TheLovinator): Check if there is a different image available. # noqa: TD003
if self.banner_image:
return self.banner_image
if not self.banner_image_url:
return None
if not self.banner_image and self.banner_image_url:
response: bytes | None = fetch_image(image_url=self.banner_image_url)
image: File | None = convert_image_to_webp(response)
if image:
self.banner_image.save(name=f"{self.twitch_id}.{image_file_format}", content=image, save=True)
logger.info("Downloaded image for %s to %s", self, self.banner_image.url)
return None
def download_thumbnail_image(self) -> ImageFieldFile | None:
"""Download the thumbnail image for the reward.
Returns:
ImageFieldFile | None: The image file or None if it doesn't exist.
"""
# We don't want to re-download the image if it already exists.
# TODO(TheLovinator): Check if there is a different image available. # noqa: TD003
if self.thumbnail_image:
return self.thumbnail_image
if not self.thumbnail_image_url:
return None
if not self.thumbnail_image and self.thumbnail_image_url:
response: bytes | None = fetch_image(image_url=self.thumbnail_image_url)
image: File | None = convert_image_to_webp(response)
if image:
self.thumbnail_image.save(name=f"{self.twitch_id}.{image_file_format}", content=image, save=True)
logger.info("Downloaded image for %s to %s", self, self.thumbnail_image.url)
return None
async def aimport_json(self, data: dict, reward_campaign: RewardCampaign | None) -> Self:
if wrong_typename(data, "Reward"):
return self
field_mapping: dict[str, str] = {
"name": "name",
"earnableUntil": "earnable_until",
"redemptionInstructions": "redemption_instructions",
"redemptionURL": "redemption_url",
}
updated: int = await update_fields(instance=self, data=data, field_mapping=field_mapping)
if updated > 0:
logger.info("Updated %s fields for %s", updated, self)
banner_image_url = data.get("bannerImage", {}).get("image1xURL")
if banner_image_url and banner_image_url != self.banner_image_url:
# await sync_to_async(self.download_banner_image)()
# TODO(TheLovinator): Download the image. # noqa: TD003
self.banner_image_url = banner_image_url
await self.asave()
thumbnail_image_url = data.get("thumbnailImage", {}).get("image1xURL")
if thumbnail_image_url and thumbnail_image_url != self.thumbnail_image_url:
# await sync_to_async(self.download_thumbnail_image)()
# TODO(TheLovinator): Download the image. # noqa: TD003
self.thumbnail_image_url = thumbnail_image_url
await self.asave()
if reward_campaign and await sync_to_async(lambda: reward_campaign != self.campaign)():
self.campaign = reward_campaign
await self.asave()
return self
class Webhook(models.Model):
"""Discord webhook."""
id = models.TextField(primary_key=True)
avatar = models.TextField(null=True)
channel_id = models.TextField(null=True)
guild_id = models.TextField(null=True)
name = models.TextField(null=True)
type = models.TextField(null=True)
token = models.TextField()
url = models.TextField()
# When the webhook was first added to the database.
created_at = models.DateTimeField(null=True, auto_created=True)
# When the webhook was last modified.
modified_at = models.DateTimeField(null=True, auto_now=True)
# Get notified when the site finds a new game.
subscribed_new_games = models.ManyToManyField(Game, related_name="subscribed_new_games")
# Get notified when a drop goes live.
subscribed_live_games = models.ManyToManyField(Game, related_name="subscribed_live_games")
# Get notified when the site finds a new drop campaign for a specific organization.
subscribed_new_owners = models.ManyToManyField(Owner, related_name="subscribed_new_owners")
# Get notified when a drop goes live for a specific organization.
subscribed_live_owners = models.ManyToManyField(Owner, related_name="subscribed_live_owners")
# So we don't spam the same drop campaign over and over.
seen_drops = models.ManyToManyField(DropCampaign, related_name="seen_drops")
class Meta:
unique_together: ClassVar[list[str]] = ["id", "token"]
def __str__(self) -> str:
return f"{self.name} - {self.get_webhook_url()}"
def get_webhook_url(self) -> str:
try:
return f"https://discord.com/api/webhooks/{self.id}/{self.token}"
except AttributeError:
return ""