import asyncio import json import logging import typing from pathlib import Path from typing import TYPE_CHECKING from django.core.management.base import BaseCommand from platformdirs import user_data_dir from playwright.async_api import Playwright, async_playwright from playwright.async_api._generated import Response from core.models.twitch import Benefit, Channel, DropCampaign, Game, Owner, Reward, RewardCampaign, TimeBasedDrop if TYPE_CHECKING: from playwright.async_api._generated import BrowserContext, Page logger: logging.Logger = logging.getLogger(__name__) def get_data_dir() -> Path: """Get the data directory. Returns: Path: The data directory. """ return Path( user_data_dir( appname="TTVDrops", appauthor="TheLovinator", roaming=True, ensure_exists=True, ), ) def get_profile_dir() -> Path: """Get the profile directory for the browser. Returns: Path: The profile directory. """ profile_dir: Path = Path(get_data_dir() / "chrome-profile") profile_dir.mkdir(parents=True, exist_ok=True) logger.debug("Launching Chrome browser with user data directory: %s", profile_dir) return profile_dir def save_json(campaign: dict | None, dir_name: str) -> None: """Save JSON data to a file. Args: campaign (dict): The JSON data to save. dir_name (Path): The directory to save the JSON data to. """ if not campaign: return save_dir: Path = Path(dir_name) save_dir.mkdir(parents=True, exist_ok=True) # File name is the hash of the JSON data file_name: str = f"{hash(json.dumps(campaign))}.json" with Path(save_dir / file_name).open(mode="w", encoding="utf-8") as f: json.dump(campaign, f, indent=4) async def add_reward_campaign(campaign: dict | None) -> None: # noqa: C901 """Add a reward campaign to the database. Args: campaign (dict): The reward campaign to add. """ if not campaign: return logger.info("Adding reward campaign to database") for reward_campaign in campaign["data"]["rewardCampaignsAvailableToUser"]: mappings: dict[str, str] = { "brand": "brand", "createdAt": "created_at", "startsAt": "starts_at", "endsAt": "ends_at", "status": "status", "summary": "summary", "instructions": "instructions", "rewardValueURLParam": "reward_value_url_param", "externalURL": "external_url", "aboutURL": "about_url", "isSitewide": "is_site_wide", } defaults = {new_key: reward_campaign[key] for key, new_key in mappings.items() if reward_campaign.get(key)} if reward_campaign.get("unlockRequirements", {}).get("subsGoal"): defaults["sub_goal"] = reward_campaign["unlockRequirements"]["subsGoal"] if reward_campaign.get("unlockRequirements", {}).get("minuteWatchedGoal"): defaults["minute_watched_goal"] = reward_campaign["unlockRequirements"]["minuteWatchedGoal"] if reward_campaign.get("image"): defaults["image_url"] = reward_campaign["image"]["image1xURL"] our_reward_campaign, created = await RewardCampaign.objects.aupdate_or_create( id=reward_campaign["id"], defaults=defaults, ) if created: logger.info("Added reward campaign %s", our_reward_campaign.id) if reward_campaign["game"]: # TODO(TheLovinator): Add game to reward campaign # noqa: TD003 # TODO(TheLovinator): Send JSON to Discord # noqa: TD003 logger.error("Not implemented: Add game to reward campaign, JSON: %s", reward_campaign["game"]) # Add rewards for reward in reward_campaign["rewards"]: mappings: dict[str, str] = { "name": "name", "bannerImage": "banner_image_url", "thumbnailImage": "thumbnail_image_url", "earnableUntil": "earnable_until", "redemptionInstructions": "redemption_instructions", "redemptionURL": "redemption_url", } defaults = {new_key: reward[key] for key, new_key in mappings.items() if reward.get(key)} if our_reward_campaign: defaults["campaign"] = our_reward_campaign our_reward, created = await Reward.objects.aupdate_or_create( id=reward["id"], defaults=defaults, ) if created: logger.info("Added reward %s", our_reward.id) else: logger.info("Updated reward %s", our_reward.id) async def add_or_update_game(game_json: dict | None) -> Game | None: """Add or update a game in the database. Args: game_json (dict): The game to add or update. Returns: Game: The game that was added or updated. """ if not game_json: return None mappings: dict[str, str] = { "slug": "slug", "displayName": "name", "boxArtURL": "box_art_url", } defaults = {new_key: game_json[key] for key, new_key in mappings.items() if game_json.get(key)} if game_json.get("slug"): defaults["game_url"] = f"https://www.twitch.tv/directory/game/{game_json["slug"]}" if game_json.get("owner"): owner: Owner | None = await add_or_update_owner(game_json["owner"]) if owner: defaults["org"] = owner our_game, created = await Game.objects.aupdate_or_create(twitch_id=game_json["id"], defaults=defaults) if created: logger.info("Added game %s", our_game.twitch_id) return our_game async def add_or_update_owner(owner_json: dict | None) -> Owner | None: """Add or update an owner in the database. Args: owner_json (dict): The owner to add or update. Returns: Owner: The owner that was added or updated. """ if not owner_json: return None defaults = {} if owner_json.get("name"): defaults["name"] = owner_json["name"] our_owner, created = await Owner.objects.aupdate_or_create(id=owner_json["id"], defaults=defaults) if created: logger.info("Added owner %s", our_owner.id) return our_owner async def add_or_update_channels(channels_json: list[dict]) -> list[Channel] | None: """Add or update channels in the database. Args: channels_json (list[dict]): The channels to add or update. Returns: list[Channel]: The channels that were added or updated. """ if not channels_json: return None channels: list[Channel] = [] for channel_json in channels_json: defaults = {} if channel_json.get("displayName"): defaults["display_name"] = channel_json["displayName"] if channel_json.get("name"): defaults["name"] = channel_json["name"] defaults["twitch_url"] = f"https://www.twitch.tv/{channel_json["name"]}" our_channel, created = await Channel.objects.aupdate_or_create(twitch_id=channel_json["id"], defaults=defaults) if created: logger.info("Added channel %s", our_channel.twitch_id) channels.append(our_channel) return channels async def add_benefit(benefit: dict, time_based_drop: TimeBasedDrop) -> None: """Add a benefit to the database. Args: benefit (dict): The benefit to add. time_based_drop (TimeBasedDrop): The time-based drop the benefit belongs to. """ logger.info("Adding benefit to database") mappings = { "createdAt": "twitch_created_at", "entitlementLimit": "entitlement_limit", "imageAssetURL": "image_url", "isIosAvailable": "is_ios_available", "name": "name", } defaults = {new_key: benefit[key] for key, new_key in mappings.items() if benefit.get(key)} our_benefit, created = await Benefit.objects.aupdate_or_create(id=benefit["id"], defaults=defaults) if created: logger.info("Added benefit %s", our_benefit.id) if time_based_drop: await time_based_drop.benefits.aadd(our_benefit) # type: ignore # noqa: PGH003 async def add_drop_campaign(drop_campaign: dict | None) -> None: """Add a drop campaign to the database. Args: drop_campaign (dict): The drop campaign to add. """ if not drop_campaign: return defaults = {} if drop_campaign.get("game"): game: Game | None = await add_or_update_game(drop_campaign["game"]) defaults["game"] = game mappings: dict[str, str] = { "accountLinkURL": "account_link_url", "description": "description", "detailsURL": "details_url", "endAt": "ends_at", "startAt": "starts_at", "imageURL": "image_url", "name": "name", "status": "status", } for key, new_key in mappings.items(): if drop_campaign.get(key): defaults[new_key] = drop_campaign[key] our_drop_campaign, created = await DropCampaign.objects.aupdate_or_create( id=drop_campaign["id"], defaults=defaults, ) if created: logger.info("Added drop campaign %s", our_drop_campaign.id) if drop_campaign.get("allow") and drop_campaign["allow"].get("channels"): channels: list[Channel] | None = await add_or_update_channels(drop_campaign["allow"]["channels"]) if channels: for channel in channels: await channel.drop_campaigns.aadd(our_drop_campaign) await add_time_based_drops(drop_campaign, our_drop_campaign) async def add_time_based_drops(drop_campaign: dict, our_drop_campaign: DropCampaign) -> None: """Add time-based drops to the database. Args: drop_campaign (dict): The drop campaign containing time-based drops. our_drop_campaign (DropCampaign): The drop campaign object in the database. """ for time_based_drop in drop_campaign.get("timeBasedDrops", []): if time_based_drop.get("preconditionDrops"): # TODO(TheLovinator): Add precondition drops to time-based drop # noqa: TD003 # TODO(TheLovinator): Send JSON to Discord # noqa: TD003 logger.error("Not implemented: Add precondition drops to time-based drop, JSON: %s", time_based_drop) mappings = { "requiredSubs": "required_subs", "endAt": "ends_at", "name": "name", "requiredMinutesWatched": "required_minutes_watched", "startAt": "starts_at", } defaults = {new_key: time_based_drop[key] for key, new_key in mappings.items() if time_based_drop.get(key)} if our_drop_campaign: defaults["drop_campaign"] = our_drop_campaign our_time_based_drop, created = await TimeBasedDrop.objects.aupdate_or_create( id=time_based_drop["id"], defaults=defaults, ) if created: logger.info("Added time-based drop %s", our_time_based_drop.id) if time_based_drop.get("benefitEdges") and our_time_based_drop: for benefit_edge in time_based_drop["benefitEdges"]: await add_benefit(benefit_edge["benefit"], our_time_based_drop) async def process_json_data(num: int, campaign: dict | None) -> None: """Process JSON data. Args: num (int): The number of the JSON data. campaign (dict): The JSON data to process. """ logger.info("Processing JSON %d", num) if not campaign: logger.warning("No campaign found for JSON %d", num) return if not isinstance(campaign, dict): logger.warning("Campaign is not a dictionary") return # This is a Reward Campaign if "rewardCampaignsAvailableToUser" in campaign.get("data", {}): save_json(campaign, "reward_campaigns") await add_reward_campaign(campaign) if "dropCampaign" in campaign.get("data", {}).get("user", {}): if not campaign["data"]["user"]["dropCampaign"]: logger.warning("No drop campaign found") return save_json(campaign, "drop_campaign") if campaign.get("data", {}).get("user", {}).get("dropCampaign"): await add_drop_campaign(campaign["data"]["user"]["dropCampaign"]) if "dropCampaigns" in campaign.get("data", {}).get("currentUser", {}): for drop_campaign in campaign["data"]["currentUser"]["dropCampaigns"]: save_json(campaign, "drop_campaigns") await add_drop_campaign(drop_campaign) class Command(BaseCommand): help = "Scrape Twitch Drops Campaigns with login using Firefox" @staticmethod async def run(playwright: Playwright) -> list[dict[str, typing.Any]]: profile_dir: Path = get_profile_dir() browser: BrowserContext = await playwright.chromium.launch_persistent_context( channel="chrome", user_data_dir=profile_dir, headless=False, args=["--disable-blink-features=AutomationControlled"], ) logger.debug("Launched Chrome browser") page: Page = await browser.new_page() json_data: list[dict] = [] async def handle_response(response: Response) -> None: if "https://gql.twitch.tv/gql" in response.url: try: body: typing.Any = await response.json() json_data.extend(body) except Exception: logger.exception( "Failed to parse JSON from %s", response.url, ) page.on("response", handle_response) await page.goto("https://www.twitch.tv/drops/campaigns") logger.debug("Navigated to Twitch drops campaigns page") logged_in = False while not logged_in: try: await page.wait_for_selector( 'div[data-a-target="top-nav-avatar"]', timeout=300000, ) logged_in = True logger.info("Logged in to Twitch") except KeyboardInterrupt as e: raise KeyboardInterrupt from e except Exception: # noqa: BLE001 await asyncio.sleep(5) logger.info("Waiting for login") await page.wait_for_load_state("networkidle") logger.debug("Page loaded. Scraping data...") await browser.close() for num, campaign in enumerate(json_data, start=1): await process_json_data(num=num, campaign=campaign) return json_data def handle(self, *args, **kwargs) -> None: # noqa: ANN002, ARG002, ANN003 asyncio.run(self.run_with_playwright()) async def run_with_playwright(self) -> None: async with async_playwright() as playwright: await self.run(playwright) if __name__ == "__main__": Command().handle()