Rewrite models and Twitch scraper

This commit is contained in:
2024-08-12 06:47:08 +02:00
parent 99b48bc3f6
commit a410fac8e8
14 changed files with 578 additions and 1577 deletions

View File

@ -2,30 +2,15 @@ import asyncio
import json
import logging
import typing
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING
from asgiref.sync import sync_to_async
from django.core.management.base import BaseCommand
from platformdirs import user_data_dir
from playwright.async_api import Playwright, async_playwright
from playwright.async_api._generated import Response
from core.models import (
Allow,
Benefit,
BenefitEdge,
Channel,
DropCampaign,
Game,
Image,
Owner,
Reward,
RewardCampaign,
TimeBasedDrop,
UnlockRequirements,
)
from core.models.twitch import Benefit, Channel, DropCampaign, Game, Owner, Reward, RewardCampaign, TimeBasedDrop
if TYPE_CHECKING:
from playwright.async_api._generated import BrowserContext, Page
@ -50,444 +35,6 @@ def get_data_dir() -> Path:
)
async def add_or_get_game(json_data: dict | None) -> tuple[Game | None, bool]:
"""Add or get Game from JSON data.
Args:
json_data (dict): JSON data to add to the database.
Returns:
tuple[Game | None, bool]: Game instance and whether it was created.
"""
if not json_data:
logger.warning("Couldn't find game data, probably a reward campaign?")
return None, False
game, created = await Game.objects.aupdate_or_create(
id=json_data["id"],
defaults={
"slug": json_data.get("slug"),
"display_name": json_data.get("displayName"),
"typename": json_data.get("__typename"),
"box_art_url": json_data.get("boxArtURL"), # Only for RewardCampaigns
},
)
if created:
logger.info("Found new game: %s", game.display_name or "Unknown Game")
return game, created
async def add_or_get_owner(json_data: dict | None) -> tuple[Owner | None, bool]:
"""Add or get Owner from JSON data.
Args:
json_data (dict): JSON data to add to the database.
Returns:
Owner: Owner instance.
"""
if not json_data:
logger.warning("No owner data provided")
return None, False
owner, created = await Owner.objects.aupdate_or_create(
id=json_data["id"],
defaults={
"display_name": json_data.get("name"),
"typename": json_data.get("__typename"),
},
)
return owner, created
async def add_or_get_allow(json_data: dict | None) -> tuple[Allow | None, bool]:
"""Add or get Allow from JSON data.
Args:
json_data (dict): JSON data to add to the database.
Returns:
Allow: Allow instance.
"""
if not json_data:
logger.warning("No allow data provided")
return None, False
allow, created = await Allow.objects.aupdate_or_create(
is_enabled=json_data.get("isEnabled"),
typename=json_data.get("__typename"),
)
return allow, created
async def add_or_get_time_based_drops(
time_based_drops_data: list[dict] | None,
owner: Owner | None,
game: Game | None,
) -> list[TimeBasedDrop]:
"""Handle TimeBasedDrops from JSON data.
Args:
time_based_drops_data (list[dict]): Time based drops data from JSON.
owner (Owner): Owner instance.
game (Game): Game instance.
Returns:
list[TimeBasedDrop]: TimeBasedDrop instances.
"""
time_based_drops: list[TimeBasedDrop] = []
if not time_based_drops_data:
logger.warning("No time based drops data provided")
return time_based_drops
for time_based_drop_data in time_based_drops_data:
time_based_drop, _ = await TimeBasedDrop.objects.aupdate_or_create(
id=time_based_drop_data["id"],
defaults={
"created_at": time_based_drop_data.get("createdAt"),
"entitlement_limit": time_based_drop_data.get("entitlementLimit"),
"image_asset_url": time_based_drop_data.get("imageAssetURL"),
"is_ios_available": time_based_drop_data.get("isIosAvailable"),
"name": time_based_drop_data.get("name"),
"owner_organization": owner,
"game": game,
"typename": time_based_drop_data.get("__typename"),
},
)
benefit_edges_data: list[dict] = time_based_drop_data.get("benefitEdges", [])
for benefit_edge_data in benefit_edges_data:
benefit_data: dict = benefit_edge_data.get("benefit", {})
benefit, _ = await Benefit.objects.aupdate_or_create(
id=benefit_data["id"],
defaults={
"created_at": benefit_data.get("createdAt"),
"entitlement_limit": benefit_data.get("entitlementLimit"),
"image_asset_url": benefit_data.get("imageAssetURL"),
"is_ios_available": benefit_data.get("isIosAvailable"),
"name": benefit_data.get("name"),
"owner_organization": owner,
"game": game,
"typename": benefit_data.get("__typename"),
},
)
await BenefitEdge.objects.aupdate_or_create(
benefit=benefit,
defaults={
"entitlement_limit": benefit_edge_data.get("entitlementLimit"),
"typename": benefit_edge_data.get("__typename"),
},
)
time_based_drops.append(time_based_drop)
return time_based_drops
async def add_or_get_drop_campaign(
drop_campaign_data: dict | None,
game: Game | None,
owner: Owner | None,
) -> tuple[DropCampaign | None, bool]:
"""Handle DropCampaign from JSON data.
Args:
drop_campaign_data (dict): Drop campaign data from JSON.
game (Game): Game instance.
owner (Owner): Owner instance.
Returns:
tuple[DropCampaign, bool]: DropCampaign instance and whether it was created.
"""
if not drop_campaign_data:
logger.warning("No drop campaign data provided")
return None, False
if drop_campaign_data.get("__typename") != "Game":
logger.error("__typename is not 'Game' for %s", drop_campaign_data.get("name", "Unknown Drop Campaign"))
drop_campaign, _ = await DropCampaign.objects.aupdate_or_create(
id=drop_campaign_data["id"],
defaults={
# "allow": allow, # We add this later
"account_link_url": drop_campaign_data.get("accountLinkURL"),
"description": drop_campaign_data.get("description"),
"details_url": drop_campaign_data.get("detailsURL"),
"ends_at": drop_campaign_data.get("endAt"),
# event_based_drops = ???? # TODO(TheLovinator): Find out what this is # noqa: TD003
"game": game,
"image_url": drop_campaign_data.get("imageURL"),
"name": drop_campaign_data.get("name"),
"owner": owner,
"starts_at": drop_campaign_data.get("startAt"),
"status": drop_campaign_data.get("status"),
# "time_based_drops": time_based_drops, # We add this later
"typename": drop_campaign_data.get("__typename"),
},
)
return drop_campaign, True
async def add_or_get_channel(json_data: dict | None) -> tuple[Channel | None, bool]:
"""Add or get Channel from JSON data.
Args:
json_data (dict): JSON data to add to the database.
Returns:
tuple[Channel | None, bool]: Channel instance and whether it was created.
"""
if not json_data:
logger.warning("No channel data provided")
return None, False
channel, created = await Channel.objects.aupdate_or_create(
id=json_data["id"],
defaults={
"display_name": json_data.get("displayName"),
"name": json_data.get("name"),
"typename": json_data.get("__typename"),
},
)
return channel, created
async def add_drop_campaign(json_data: dict | None) -> None:
"""Add data from JSON to the database.
Args:
json_data (dict): JSON data to add to the database.
"""
if not json_data:
logger.warning("No JSON data provided")
return
# Get the data from the JSON
user_data: dict = json_data.get("data", {}).get("user", {})
drop_campaign_data: dict = user_data.get("dropCampaign", {})
# Add or get Game
game_data: dict = drop_campaign_data.get("game", {})
game, _ = await add_or_get_game(json_data=game_data)
# Add or get Owner
owner_data: dict = drop_campaign_data.get("owner", {})
owner, _ = await add_or_get_owner(json_data=owner_data)
# Add or get Allow
allow_data: dict = drop_campaign_data.get("allow", {})
allow, _ = await add_or_get_allow(json_data=allow_data)
# Add channels to Allow
if allow:
channel_data: list[dict] = allow_data.get("channels", [])
if channel_data:
for json_channel in channel_data:
channel, _ = await add_or_get_channel(json_channel)
if channel:
await allow.channels.aadd(channel)
# Add or get TimeBasedDrops
time_based_drops_data = drop_campaign_data.get("timeBasedDrops", [])
time_based_drops: list[TimeBasedDrop] = await add_or_get_time_based_drops(time_based_drops_data, owner, game)
# Add or get DropCampaign
drop_campaign, _ = await add_or_get_drop_campaign(
drop_campaign_data=drop_campaign_data,
game=game,
owner=owner,
)
if drop_campaign:
drop_campaign.allow = allow
await drop_campaign.time_based_drops.aset(time_based_drops)
await drop_campaign.asave()
logger.info("Added Drop Campaign: %s", drop_campaign.name or "Unknown Drop Campaign")
async def add_or_get_image(json_data: dict | None) -> tuple[Image | None, bool]:
"""Add or get Image from JSON data.
Args:
json_data (dict): JSON data to add to the database.
Returns:
tuple[Image | None, bool]: Image instance and whether it was created.
"""
# TODO(TheLovinator): We should download the image and store it locally # noqa: TD003
if not json_data:
logger.warning("Image data is missing")
return None, False
if not json_data.get("image1xURL"):
logger.warning("Image URL is missing")
return None, False
image, created = await Image.objects.aupdate_or_create(
image1_x_url=json_data.get("image1xURL"),
defaults={
"typename": json_data.get("__typename"),
},
)
return image, created
async def add_or_get_rewards(json_data: dict | None) -> list[Reward]:
"""Add or get Rewards from JSON data.
Args:
json_data (dict): JSON data to add to the database.
Returns:
list[Reward]: Reward instances
"""
rewards: list[Reward] = []
if not json_data:
logger.warning("No rewards found")
return []
if "rewards" not in json_data:
logger.warning("No rewards found")
return []
rewards_json: list[dict] = json_data.get("rewards", [])
for reward_data in rewards_json:
# Add or get bannerImage
banner_image_data: dict = reward_data.get("bannerImage", {})
if banner_image_data:
banner_image, _ = await sync_to_async(Image.objects.get_or_create)(
image1_x_url=banner_image_data["image1xURL"],
defaults={"typename": banner_image_data["__typename"]},
)
# Add or get thumbnailImage
thumbnail_image_data = reward_data.get("thumbnailImage", {})
if thumbnail_image_data:
thumbnail_image, _ = await sync_to_async(Image.objects.get_or_create)(
image1_x_url=thumbnail_image_data["image1xURL"],
defaults={"typename": thumbnail_image_data["__typename"]},
)
# Convert earnableUntil to a datetime object
earnable_until: str | None = reward_data.get("earnableUntil")
earnable_until_date: datetime | None = None
if earnable_until:
earnable_until_date = datetime.fromisoformat(earnable_until.replace("Z", "+00:00"))
reward, _ = await sync_to_async(Reward.objects.get_or_create)(
id=reward_data["id"],
defaults={
"name": reward_data.get("name"),
"banner_image": banner_image,
"thumbnail_image": thumbnail_image,
"earnable_until": earnable_until_date,
"redemption_instructions": reward_data.get("redemptionInstructions"),
"redemption_url": reward_data.get("redemptionURL"),
"typename": reward_data.get("__typename"),
},
)
rewards.append(reward)
return rewards
async def add_or_get_unlock_requirements(json_data: dict | None) -> tuple[UnlockRequirements | None, bool]:
"""Add or get UnlockRequirements from JSON data.
Args:
json_data (dict): JSON data to add to the database.
Returns:
tuple[UnlockRequirements | None, bool]: UnlockRequirements instance and whether it was created.
"""
if not json_data:
logger.warning("No unlock requirements data provided")
return None, False
unlock_requirements, created = await UnlockRequirements.objects.aget_or_create(
subs_goal=json_data["subsGoal"],
defaults={
"minute_watched_goal": json_data["minuteWatchedGoal"],
"typename": json_data["__typename"],
},
)
return unlock_requirements, created
async def add_reward_campaign(json_data: dict | None) -> None:
"""Add data from JSON to the database.
Args:
json_data (dict): JSON data to add to the database.
"""
if not json_data:
logger.warning("No JSON data provided")
return
campaign_data: list[dict] = json_data["data"]["rewardCampaignsAvailableToUser"]
for campaign in campaign_data:
# Add or get Game
game_data: dict = campaign.get("game", {})
game, _ = await add_or_get_game(json_data=game_data)
# Add or get Image
image_data: dict = campaign.get("image", {})
image, _ = await add_or_get_image(json_data=image_data)
# Add or get Rewards
rewards: list[Reward] = await add_or_get_rewards(campaign)
# Add or get Unlock Requirements
unlock_requirements_data: dict = campaign["unlockRequirements"]
unlock_requirements, _ = await add_or_get_unlock_requirements(unlock_requirements_data)
# Create Reward Campaign
reward_campaign, _ = await RewardCampaign.objects.aget_or_create(
id=campaign["id"],
defaults={
"name": campaign.get("name"),
"brand": campaign.get("brand"),
"starts_at": campaign.get("startsAt"),
"ends_at": campaign.get("endsAt"),
"status": campaign.get("status"),
"summary": campaign.get("summary"),
"instructions": campaign.get("instructions"),
"external_url": campaign.get("externalURL"),
"reward_value_url_param": campaign.get("rewardValueURLParam"),
"about_url": campaign.get("aboutURL"),
"is_sitewide": campaign.get("isSitewide"),
"game": game,
"unlock_requirements": unlock_requirements,
"image": image,
# "rewards": rewards, # We add this later
"typename": campaign.get("__typename"),
},
)
# Add Rewards to the Campaign
for reward in rewards:
await reward_campaign.rewards.aadd(reward)
await reward_campaign.asave()
def get_profile_dir() -> Path:
"""Get the profile directory for the browser.
@ -517,23 +64,253 @@ def save_json(campaign: dict, dir_name: str) -> None:
json.dump(campaign, f, indent=4)
async def process_json_data(num: int, campaign: dict | None, json_data: list[dict] | None) -> None:
async def add_reward_campaign(campaign: dict) -> None:
"""Add a reward campaign to the database.
Args:
campaign (dict): The reward campaign to add.
"""
logger.info("Adding reward campaign to database")
for reward_campaign in campaign["data"]["rewardCampaignsAvailableToUser"]:
our_reward_campaign, created = await RewardCampaign.objects.aget_or_create(
id=reward_campaign["id"],
defaults={
"name": reward_campaign["name"],
"brand": reward_campaign["brand"],
"starts_at": reward_campaign["startsAt"],
"ends_at": reward_campaign["endsAt"],
"status": reward_campaign["status"],
"summary": reward_campaign["summary"],
"instructions": reward_campaign["instructions"],
"reward_value_url_param": reward_campaign["rewardValueURLParam"],
"external_url": reward_campaign["externalURL"],
"about_url": reward_campaign["aboutURL"],
"is_site_wide": reward_campaign["isSitewide"],
"sub_goal": reward_campaign["unlockRequirements"]["subsGoal"],
"minute_watched_goal": reward_campaign["unlockRequirements"]["minuteWatchedGoal"],
"image_url": reward_campaign["image"]["image1xURL"],
# "game" # To be implemented
},
)
if created:
logger.info("Added reward campaign %s", our_reward_campaign.id)
else:
logger.info("Updated reward campaign %s", our_reward_campaign.id)
if reward_campaign["game"]:
# TODO(TheLovinator): Add game to reward campaign # noqa: TD003
# TODO(TheLovinator): Send JSON to Discord # noqa: TD003
logger.error("Not implemented: Add game to reward campaign, JSON: %s", reward_campaign["game"])
# Add rewards
for reward in reward_campaign["rewards"]:
our_reward, created = await Reward.objects.aget_or_create(
id=reward["id"],
defaults={
"name": reward["name"],
"banner_image_url": reward["bannerImage"]["image1xURL"],
"thumbnail_image_url": reward["thumbnailImage"]["image1xURL"],
"earnable_until": reward["earnableUntil"],
"redemption_instructions": reward["redemptionInstructions"],
"redemption_url": reward["redemptionURL"],
"campaign": our_reward_campaign,
},
)
if created:
logger.info("Added reward %s", our_reward.id)
else:
logger.info("Updated reward %s", our_reward.id)
async def add_or_update_game(game_json: dict, owner: Owner | None) -> Game | None:
"""Add or update a game in the database.
Args:
game_json (dict): The game to add or update.
owner (Owner): The owner of the game.
Returns:
Game: The game that was added or updated.
"""
if game_json:
game_url: str | None = (
f"https://www.twitch.tv/directory/game/{game_json["slug"]}" if game_json["slug"] else None
)
our_game, created = await Game.objects.aget_or_create(
twitch_id=game_json["id"],
defaults={
"slug": game_json["slug"],
"name": game_json["displayName"],
"game_url": game_url,
"org": owner,
# TODO(TheLovinator): Add box_art_url to game # noqa: TD003
},
)
if created:
logger.info("Added game %s", our_game.twitch_id)
else:
logger.info("Updated game %s", our_game.twitch_id)
return our_game
return None
async def add_or_update_owner(owner_json: dict) -> Owner | None:
"""Add or update an owner in the database.
Args:
owner_json (dict): The owner to add or update.
Returns:
Owner: The owner that was added or updated.
"""
if owner_json:
our_owner, created = await Owner.objects.aget_or_create(
id=owner_json["id"],
defaults={"name": owner_json["name"]},
)
if created:
logger.info("Added owner %s", our_owner.id)
else:
logger.info("Updated owner %s", our_owner.id)
return our_owner
return None
async def add_or_update_channels(channels_json: list[dict]) -> list[Channel] | None:
"""Add or update channels in the database.
Args:
channels_json (list[dict]): The channels to add or update.
Returns:
list[Channel]: The channels that were added or updated.
"""
if not channels_json:
return None
channels: list[Channel] = []
for channel_json in channels_json:
twitch_url: str | None = f"https://www.twitch.tv/{channel_json["name"]}" if channel_json["name"] else None
our_channel, created = await Channel.objects.aget_or_create(
twitch_id=channel_json["id"],
defaults={
"name": channel_json["name"],
"display_name": channel_json["displayName"],
"twitch_url": twitch_url,
"live": False, # Toggle this later
},
)
if created:
logger.info("Added channel %s", our_channel.twitch_id)
else:
logger.info("Updated channel %s", our_channel.twitch_id)
channels.append(our_channel)
return channels
async def add_benefit(benefit: dict, time_based_drop: TimeBasedDrop) -> None:
"""Add a benefit to the database.
Args:
benefit (dict): The benefit to add.
time_based_drop (TimeBasedDrop): The time-based drop the benefit belongs to.
"""
our_benefit, created = await Benefit.objects.aget_or_create(
id=benefit["id"],
defaults={
"twitch_created_at": benefit["createdAt"],
"entitlement_limit": benefit["entitlementLimit"],
"image_url": benefit["imageAssetURL"],
"is_ios_available": benefit["isIosAvailable"],
"name": benefit["name"],
"time_based_drop": time_based_drop,
},
)
if created:
logger.info("Added benefit %s", our_benefit.id)
else:
logger.info("Updated benefit %s", our_benefit.id)
async def add_drop_campaign(drop_campaign: dict) -> None:
"""Add a drop campaign to the database.
Args:
drop_campaign (dict): The drop campaign to add.
"""
logger.info("Adding drop campaign to database")
owner: Owner | None = await add_or_update_owner(drop_campaign["owner"])
game: Game | None = await add_or_update_game(drop_campaign["game"], owner)
channels: list[Channel] | None = await add_or_update_channels(drop_campaign["allow"]["channels"])
our_drop_campaign, created = await DropCampaign.objects.aget_or_create(
id=drop_campaign["id"],
defaults={
"account_link_url": drop_campaign["accountLinkURL"],
"description": drop_campaign["description"],
"details_url": drop_campaign["detailsURL"],
"ends_at": drop_campaign["endAt"],
"starts_at": drop_campaign["startAt"],
"image_url": drop_campaign["imageURL"],
"name": drop_campaign["name"],
"status": drop_campaign["status"],
"game": game,
},
)
if created:
logger.info("Added drop campaign %s", our_drop_campaign.id)
else:
logger.info("Updated drop campaign %s", our_drop_campaign.id)
if channels:
our_drop_campaign.channels.aset(channels) # type: ignore # noqa: PGH003
# Add time-based drops
for time_based_drop in drop_campaign["timeBasedDrops"]:
if time_based_drop["preconditionDrops"]:
# TODO(TheLovinator): Add precondition drops to time-based drop # noqa: TD003
# TODO(TheLovinator): Send JSON to Discord # noqa: TD003
logger.error("Not implemented: Add precondition drops to time-based drop, JSON: %s", time_based_drop)
our_time_based_drop, created = await TimeBasedDrop.objects.aget_or_create(
id=time_based_drop["id"],
defaults={
"required_subs": time_based_drop["requiredSubs"],
"ends_at": time_based_drop["endAt"],
"name": time_based_drop["name"],
"required_minutes_watched": time_based_drop["requiredMinutesWatched"],
"starts_at": time_based_drop["startAt"],
"drop_campaign": our_drop_campaign,
},
)
if created:
logger.info("Added time-based drop %s", our_time_based_drop.id)
else:
logger.info("Updated time-based drop %s", our_time_based_drop.id)
# Add benefits
for benefit_edge in time_based_drop["benefitEdges"]:
await add_benefit(benefit_edge["benefit"], our_time_based_drop)
async def process_json_data(num: int, campaign: dict | None) -> None:
"""Process JSON data.
Args:
num (int): The number of the JSON data.
campaign (dict): The JSON data to process.
json_data (list[dict]): The list of JSON
"""
if not json_data:
logger.warning("No JSON data provided")
logger.info("Processing JSON %d", num)
if not campaign:
logger.warning("No campaign found for JSON %d", num)
return
logger.info("Processing JSON %d of %d", num, len(json_data))
if not isinstance(campaign, dict):
logger.warning("Campaign is not a dictionary")
return
# This is a Reward Campaign
if "rewardCampaignsAvailableToUser" in campaign["data"]:
save_json(campaign, "reward_campaigns")
await add_reward_campaign(campaign)
@ -544,10 +321,10 @@ async def process_json_data(num: int, campaign: dict | None, json_data: list[dic
return
save_json(campaign, "drop_campaign")
await add_drop_campaign(campaign)
await add_drop_campaign(campaign["data"]["user"]["dropCampaign"])
if "dropCampaigns" in campaign.get("data", {}).get("user", {}):
for drop_campaign in campaign["data"]["user"]["dropCampaigns"]:
for drop_campaign in campaign["data"]["currentUser"]["dropCampaigns"]:
save_json(campaign, "drop_campaigns")
await add_drop_campaign(drop_campaign)
@ -605,7 +382,7 @@ class Command(BaseCommand):
await browser.close()
for num, campaign in enumerate(json_data, start=1):
await process_json_data(num, campaign, json_data)
await process_json_data(num=num, campaign=campaign)
return json_data