import asyncio import logging import typing from datetime import datetime from pathlib import Path from typing import TYPE_CHECKING from asgiref.sync import sync_to_async from django.core.management.base import BaseCommand from platformdirs import user_data_dir from playwright.async_api import Playwright, async_playwright from playwright.async_api._generated import Response from twitch_app.models import ( Allow, Benefit, BenefitEdge, Channel, DropCampaign, Game, Image, Owner, Reward, RewardCampaign, TimeBasedDrop, UnlockRequirements, ) if TYPE_CHECKING: from playwright.async_api._generated import BrowserContext, Page import json # Where to store the Chrome profile data_dir = Path( user_data_dir( appname="TTVDrops", appauthor="TheLovinator", roaming=True, ensure_exists=True, ), ) if not data_dir: msg = "DATA_DIR is not set in settings.py" raise ValueError(msg) logger: logging.Logger = logging.getLogger(__name__) async def add_or_get_game(json_data: dict, name: str) -> tuple[Game | None, bool]: """Add or get Game from JSON data. Args: json_data (dict): JSON data to add to the database. name (str): Name of the drop campaign. Returns: tuple[Game | None, bool]: Game instance and whether it was created. """ if not json_data: logger.warning("%s is not for a game?", name) return None, False game, created = await Game.objects.aupdate_or_create( id=json_data["id"], defaults={ "slug": json_data.get("slug"), "display_name": json_data.get("displayName"), "typename": json_data.get("__typename"), "box_art_url": json_data.get("boxArtURL"), # Only for RewardCampaigns }, ) return game, created async def add_or_get_owner(json_data: dict, name: str) -> tuple[Owner | None, bool]: """Add or get Owner from JSON data. Args: json_data (dict): JSON data to add to the database. name (str): Name of the drop campaign. Returns: Owner: Owner instance. """ if not json_data: logger.warning("Owner data is missing for %s", name) return None, False owner, created = await Owner.objects.aupdate_or_create( id=json_data["id"], defaults={ "display_name": json_data.get("name"), "typename": json_data.get("__typename"), }, ) return owner, created async def add_or_get_allow(json_data: dict, name: str) -> tuple[Allow | None, bool]: """Add or get Allow from JSON data. Args: json_data (dict): JSON data to add to the database. name (str): Name of the drop campaign. Returns: Allow: Allow instance. """ if not json_data: logger.warning("Allow data is missing for %s", name) return None, False allow, created = await Allow.objects.aupdate_or_create( is_enabled=json_data.get("isEnabled"), typename=json_data.get("__typename"), ) return allow, created async def add_or_get_time_based_drops( time_based_drops_data: list[dict] | None, owner: Owner | None, game: Game | None, ) -> list[TimeBasedDrop]: """Handle TimeBasedDrops from JSON data. Args: time_based_drops_data (list[dict]): Time based drops data from JSON. owner (Owner): Owner instance. game (Game): Game instance. Returns: list[TimeBasedDrop]: TimeBasedDrop instances. """ time_based_drops: list[TimeBasedDrop] = [] if not time_based_drops_data: logger.warning("No time based drops found") return [] for time_based_drop_data in time_based_drops_data: time_based_drop, _ = await TimeBasedDrop.objects.aupdate_or_create( id=time_based_drop_data["id"], defaults={ "created_at": time_based_drop_data.get("createdAt"), "entitlement_limit": time_based_drop_data.get("entitlementLimit"), "image_asset_url": time_based_drop_data.get("imageAssetURL"), "is_ios_available": time_based_drop_data.get("isIosAvailable"), "name": time_based_drop_data.get("name"), "owner_organization": owner, "game": game, "typename": time_based_drop_data.get("__typename"), }, ) benefit_edges_data: list[dict] = time_based_drop_data.get("benefitEdges", []) for benefit_edge_data in benefit_edges_data: benefit_data: dict = benefit_edge_data.get("benefit", {}) benefit, _ = await Benefit.objects.aupdate_or_create( id=benefit_data["id"], defaults={ "created_at": benefit_data.get("createdAt"), "entitlement_limit": benefit_data.get("entitlementLimit"), "image_asset_url": benefit_data.get("imageAssetURL"), "is_ios_available": benefit_data.get("isIosAvailable"), "name": benefit_data.get("name"), "owner_organization": owner, "game": game, "typename": benefit_data.get("__typename"), }, ) await BenefitEdge.objects.aupdate_or_create( benefit=benefit, defaults={ "entitlement_limit": benefit_edge_data.get("entitlementLimit"), "typename": benefit_edge_data.get("__typename"), }, ) time_based_drops.append(time_based_drop) return time_based_drops async def add_or_get_drop_campaign( drop_campaign_data: dict, game: Game | None, owner: Owner | None, ) -> tuple[DropCampaign | None, bool]: """Handle DropCampaign from JSON data. Args: drop_campaign_data (dict): Drop campaign data from JSON. game (Game): Game instance. owner (Owner): Owner instance. Returns: tuple[DropCampaign, bool]: DropCampaign instance and whether it was created. """ if not drop_campaign_data: logger.warning("No drop campaign data found") return None, False if drop_campaign_data.get("__typename") != "Game": logger.error("__typename is not 'Game' for %s", drop_campaign_data.get("name", "Unknown Drop Campaign")) drop_campaign, _ = await DropCampaign.objects.aupdate_or_create( id=drop_campaign_data["id"], defaults={ # "allow": allow, # We add this later "account_link_url": drop_campaign_data.get("accountLinkURL"), "description": drop_campaign_data.get("description"), "details_url": drop_campaign_data.get("detailsURL"), "ends_at": drop_campaign_data.get("endAt"), # event_based_drops = ???? # TODO(TheLovinator): Find out what this is # noqa: TD003 "game": game, "image_url": drop_campaign_data.get("imageURL"), "name": drop_campaign_data.get("name"), "owner": owner, "starts_at": drop_campaign_data.get("startAt"), "status": drop_campaign_data.get("status"), # "time_based_drops": time_based_drops, # We add this later "typename": drop_campaign_data.get("__typename"), }, ) return drop_campaign, True async def add_or_get_channel(json_data: dict) -> tuple[Channel | None, bool]: """Add or get Channel from JSON data. Args: json_data (dict): JSON data to add to the database. Returns: tuple[Channel | None, bool]: Channel instance and whether it was created. """ if not json_data: logger.warning("Channel data is missing") return None, False channel, created = await Channel.objects.aupdate_or_create( id=json_data["id"], defaults={ "display_name": json_data.get("displayName"), "name": json_data.get("name"), "typename": json_data.get("__typename"), }, ) return channel, created async def add_drop_campaign(json_data: dict) -> None: """Add data from JSON to the database.""" # Get the data from the JSON user_data: dict = json_data.get("data", {}).get("user", {}) drop_campaign_data: dict = user_data.get("dropCampaign", {}) # Add or get Game game_data: dict = drop_campaign_data.get("game", {}) game, _ = await add_or_get_game(json_data=game_data, name=drop_campaign_data.get("name", "Unknown Drop Campaign")) # Add or get Owner owner_data: dict = drop_campaign_data.get("owner", {}) owner, _ = await add_or_get_owner( json_data=owner_data, name=drop_campaign_data.get("name", "Unknown Drop Campaign"), ) # Add or get Allow allow_data: dict = drop_campaign_data.get("allow", {}) allow, _ = await add_or_get_allow( json_data=allow_data, name=drop_campaign_data.get("name", "Unknown Drop Campaign"), ) # Add channels to Allow if allow: channel_data: list[dict] = allow_data.get("channels", []) if channel_data: for json_channel in channel_data: channel, _ = await add_or_get_channel(json_channel) if channel: await allow.channels.aadd(channel) # Add or get TimeBasedDrops time_based_drops_data = drop_campaign_data.get("timeBasedDrops", []) time_based_drops: list[TimeBasedDrop] = await add_or_get_time_based_drops(time_based_drops_data, owner, game) # Add or get DropCampaign drop_campaign, _ = await add_or_get_drop_campaign( drop_campaign_data=drop_campaign_data, game=game, owner=owner, ) if drop_campaign: drop_campaign.allow = allow await drop_campaign.time_based_drops.aset(time_based_drops) await drop_campaign.asave() logger.info("Added Drop Campaign: %s", drop_campaign.name or "Unknown Drop Campaign") async def add_or_get_image(json_data: dict) -> tuple[Image | None, bool]: """Add or get Image from JSON data. Args: json_data (dict): JSON data to add to the database. Returns: tuple[Image | None, bool]: Image instance and whether it was created. """ # TODO(TheLovinator): We should download the image and store it locally # noqa: TD003 if not json_data: logger.warning("Image data is missing") return None, False if not json_data.get("image1xURL"): logger.warning("Image URL is missing") return None, False image, created = await Image.objects.aupdate_or_create( image1_x_url=json_data.get("image1xURL"), defaults={ "typename": json_data.get("__typename"), }, ) return image, created async def add_or_get_rewards(json_data: dict) -> list[Reward]: """Add or get Rewards from JSON data. Args: json_data (dict): JSON data to add to the database. Returns: list[Reward]: Reward instances """ rewards: list[Reward] = [] if not json_data: logger.warning("No rewards found") return [] if "rewards" not in json_data: logger.warning("No rewards found") return [] rewards_json: list[dict] = json_data.get("rewards", []) for reward_data in rewards_json: # Add or get bannerImage banner_image_data: dict = reward_data.get("bannerImage", {}) if banner_image_data: banner_image, _ = await sync_to_async(Image.objects.get_or_create)( image1_x_url=banner_image_data["image1xURL"], defaults={"typename": banner_image_data["__typename"]}, ) # Add or get thumbnailImage thumbnail_image_data = reward_data.get("thumbnailImage", {}) if thumbnail_image_data: thumbnail_image, _ = await sync_to_async(Image.objects.get_or_create)( image1_x_url=thumbnail_image_data["image1xURL"], defaults={"typename": thumbnail_image_data["__typename"]}, ) # Convert earnableUntil to a datetime object earnable_until: str | None = reward_data.get("earnableUntil") earnable_until_date: datetime | None = None if earnable_until: earnable_until_date = datetime.fromisoformat(earnable_until.replace("Z", "+00:00")) reward, _ = await sync_to_async(Reward.objects.get_or_create)( id=reward_data["id"], defaults={ "name": reward_data.get("name"), "banner_image": banner_image, "thumbnail_image": thumbnail_image, "earnable_until": earnable_until_date, "redemption_instructions": reward_data.get("redemptionInstructions"), "redemption_url": reward_data.get("redemptionURL"), "typename": reward_data.get("__typename"), }, ) rewards.append(reward) return rewards async def add_or_get_unlock_requirements(json_data: dict) -> tuple[UnlockRequirements | None, bool]: """Add or get UnlockRequirements from JSON data. Args: json_data (dict): JSON data to add to the database. Returns: tuple[UnlockRequirements | None, bool]: UnlockRequirements instance and whether it was created. """ if not json_data: logger.warning("Unlock Requirements data is missing") return None, False unlock_requirements, created = await UnlockRequirements.objects.aget_or_create( subs_goal=json_data["subsGoal"], defaults={ "minute_watched_goal": json_data["minuteWatchedGoal"], "typename": json_data["__typename"], }, ) return unlock_requirements, created async def add_reward_campaign(json_data: dict) -> None: """Add data from JSON to the database. Args: json_data (dict): JSON data to add to the database. Returns: None: No return value. """ campaign_data: list[dict] = json_data["data"]["rewardCampaignsAvailableToUser"] for campaign in campaign_data: # Add or get Game game_data: dict = campaign.get("game", {}) game, _ = await add_or_get_game(json_data=game_data, name=campaign.get("name", "Unknown Reward Campaign")) # Add or get Image image_data: dict = campaign.get("image", {}) image, _ = await add_or_get_image(json_data=image_data) # Add or get Rewards rewards: list[Reward] = await add_or_get_rewards(campaign) # Add or get Unlock Requirements unlock_requirements_data: dict = campaign["unlockRequirements"] unlock_requirements, _ = await add_or_get_unlock_requirements(unlock_requirements_data) # Create Reward Campaign reward_campaign, _ = await RewardCampaign.objects.aget_or_create( id=campaign["id"], defaults={ "name": campaign.get("name"), "brand": campaign.get("brand"), "starts_at": campaign.get("startsAt"), "ends_at": campaign.get("endsAt"), "status": campaign.get("status"), "summary": campaign.get("summary"), "instructions": campaign.get("instructions"), "external_url": campaign.get("externalURL"), "reward_value_url_param": campaign.get("rewardValueURLParam"), "about_url": campaign.get("aboutURL"), "is_sitewide": campaign.get("isSitewide"), "game": game, "unlock_requirements": unlock_requirements, "image": image, # "rewards": rewards, # We add this later "typename": campaign.get("__typename"), }, ) # Add Rewards to the Campaign for reward in rewards: await reward_campaign.rewards.aadd(reward) await reward_campaign.asave() class Command(BaseCommand): help = "Scrape Twitch Drops Campaigns with login using Firefox" async def run( # noqa: PLR6301, C901 self, playwright: Playwright, ) -> list[dict[str, typing.Any]]: args: list[str] = [] # disable navigator.webdriver:true flag args.append("--disable-blink-features=AutomationControlled") profile_dir: Path = Path(data_dir / "chrome-profile") profile_dir.mkdir(parents=True, exist_ok=True) logger.debug( "Launching Chrome browser with user data directory: %s", profile_dir, ) browser: BrowserContext = await playwright.chromium.launch_persistent_context( channel="chrome", user_data_dir=profile_dir, headless=False, args=args, ) logger.debug("Launched Chrome browser") page: Page = await browser.new_page() json_data: list[dict] = [] async def handle_response(response: Response) -> None: if "https://gql.twitch.tv/gql" in response.url: try: body: typing.Any = await response.json() json_data.extend(body) except Exception: logger.exception( "Failed to parse JSON from %s", response.url, ) page.on("response", handle_response) await page.goto("https://www.twitch.tv/drops/campaigns") logger.debug("Navigated to Twitch drops campaigns page") logged_in = False while not logged_in: try: await page.wait_for_selector( 'div[data-a-target="top-nav-avatar"]', timeout=300000, ) logged_in = True logger.info("Logged in to Twitch") except KeyboardInterrupt as e: raise KeyboardInterrupt from e except Exception: # noqa: BLE001 await asyncio.sleep(5) logger.info("Waiting for login") await page.wait_for_load_state("networkidle") logger.debug("Page loaded. Scraping data...") # Wait 5 seconds for the page to load # await asyncio.sleep(5) await browser.close() for num, campaign in enumerate(json_data, start=1): logger.info("Processing JSON %d of %d", num, len(json_data)) if not isinstance(campaign, dict): continue if "rewardCampaignsAvailableToUser" in campaign["data"]: # Save to folder named "reward_campaigns" dir_name: Path = Path("reward_campaigns") dir_name.mkdir(parents=True, exist_ok=True) with open(file=Path(dir_name / f"reward_campaign_{num}.json"), mode="w", encoding="utf-8") as f: json.dump(campaign, f, indent=4) await add_reward_campaign(campaign) if "dropCampaign" in campaign.get("data", {}).get("user", {}): if not campaign["data"]["user"]["dropCampaign"]: logger.warning("No drop campaign found") continue # Save to folder named "drop_campaign" dir_name: Path = Path("drop_campaign") dir_name.mkdir(parents=True, exist_ok=True) with open(file=Path(dir_name / f"drop_campaign_{num}.json"), mode="w", encoding="utf-8") as f: json.dump(campaign, f, indent=4) await add_drop_campaign(campaign) if "dropCampaigns" in campaign.get("data", {}).get("user", {}): for drop_campaign in campaign["data"]["user"]["dropCampaigns"]: # Save to folder named "drop_campaigns" dir_name: Path = Path("drop_campaigns") dir_name.mkdir(parents=True, exist_ok=True) with open(file=Path(dir_name / f"drop_campaign_{num}.json"), mode="w", encoding="utf-8") as f: json.dump(drop_campaign, f, indent=4) await add_drop_campaign(drop_campaign) return json_data def handle(self, *args, **kwargs) -> None: # noqa: ANN002, ARG002, ANN003 asyncio.run(self.run_with_playwright()) async def run_with_playwright(self) -> None: async with async_playwright() as playwright: await self.run(playwright) if __name__ == "__main__": Command().handle()