import asyncio import json import logging import os import shutil import subprocess # noqa: S404 import time from datetime import UTC, datetime from pathlib import Path from typing import TYPE_CHECKING, Any, Literal import aiofiles import httpx from markdownify import MarkdownConverter from markupsafe import escape if TYPE_CHECKING: from collections.abc import Coroutine logging.basicConfig( level=logging.INFO, format="%(message)s", ) logger: logging.Logger = logging.getLogger("wutheringwaves") async def fetch_json(url: str, client: httpx.AsyncClient) -> dict[Any, Any] | None: """Fetch JSON data from a URL. Args: url (str): The URL to fetch data from. client (httpx.AsyncClient): The HTTP client to use for the request. Returns: dict[Any, Any] | None: The parsed JSON data if successful, None otherwise. """ try: response: httpx.Response = await client.get(url) response.raise_for_status() return response.json() except (httpx.RequestError, json.JSONDecodeError): logger.exception("Error fetching %s:", url) return None async def save_prettified_json(data: dict[Any, Any], filepath: Path) -> bool: """Save JSON data to a file with pretty formatting. Args: data (dict[Any, Any]): The JSON data to save. filepath (Path): The path to the file where the data will be saved. Returns: bool: True if the data was saved successfully, False otherwise. """ try: async with aiofiles.open(filepath, "w", encoding="utf-8") as f: await f.write(json.dumps(data, indent=2, ensure_ascii=False)) except Exception: logger.exception("Error saving %s:", filepath) return False else: return True def set_file_timestamp(filepath: Path, timestamp_str: str) -> bool: """Set file's modification time based on ISO timestamp string. Args: filepath (Path): The path to the file. timestamp_str (str): The ISO timestamp string. Returns: bool: True if the timestamp was set successfully, False otherwise. """ try: # Parse the timestamp string dt: datetime = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S").replace(tzinfo=UTC) # Convert to Unix timestamp timestamp: float = dt.timestamp() # Set the file's modification time os.utime(filepath, (timestamp, timestamp)) except ValueError: logger.info("Error setting timestamp for %s", filepath) return False else: logger.info("Timestamp for %s set to %s", filepath, dt.isoformat()) return True def get_file_timestamp(timestamp_str: str) -> float: """Convert ISO timestamp string to Unix timestamp. Args: timestamp_str (str): The ISO timestamp string. Returns: float: The Unix timestamp, or 0 if conversion failed. """ try: # Parse the timestamp string dt: datetime = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S").replace(tzinfo=UTC) # Convert to Unix timestamp return dt.timestamp() except ValueError: logger.info("Error converting timestamp %s", timestamp_str) return 0.0 def commit_file_with_timestamp(filepath: Path) -> bool: # noqa: PLR0911 """Commit a file to Git with its modification time as the commit time. Args: filepath (Path): The path to the file to commit. Returns: bool: True if the commit was successful, False otherwise. """ # Check in Git history if we already have this file git_log_cmd: list[str] = ["git", "log", "--pretty=format:%H", "--follow", str(filepath)] try: git_log_output: str = subprocess.check_output(git_log_cmd, text=True).strip() # noqa: S603 if git_log_output: logger.info("File %s already exists in Git history.", filepath) return True except subprocess.CalledProcessError: logger.exception("Error checking Git history for %s", filepath) return False try: # Get the full path to the Git executable git_executable: str | None = shutil.which("git") if not git_executable: logger.error("Git executable not found.") return False # Validate the filepath if not filepath.is_file(): logger.error("Invalid file path: %s", filepath) return False # Get the file's modification time timestamp: float = filepath.stat().st_mtime git_time: str = datetime.fromtimestamp(timestamp, tz=UTC).strftime("%Y-%m-%dT%H:%M:%S") # Stage the file subprocess.run([git_executable, "add", str(filepath)], check=True, text=True) # noqa: S603 # Commit the file with the modification time as the commit time env: dict[str, str] = { **os.environ, "GIT_AUTHOR_DATE": git_time, "GIT_COMMITTER_DATE": git_time, } subprocess.run( # noqa: S603 [git_executable, "commit", "-m", f"Add {filepath.name}"], check=True, env=env, text=True, ) except subprocess.CalledProcessError: logger.exception("Subprocess error occurred while committing the file.") return False except Exception: logger.exception("Error committing %s to Git", filepath) return False else: logger.info("Successfully committed %s to Git", filepath) return True def add_articles_to_readme(articles: dict[Any, Any] | None = None) -> None: """Add the list of articles to the README.md file.""" if articles is None: logger.warning("No articles to add to README.md") return readme_file: Path = Path("README.md") if not readme_file.is_file(): logger.error("README.md file not found.") return with readme_file.open("r+", encoding="utf-8") as f: # Read existing content lines: list[str] = f.readlines() # Find "## Articles" section or add it articles_section_index = -1 for i, line in enumerate(lines): if line.strip() == "## Articles": articles_section_index: int = i break # Create new content new_lines: list[str] = [] if articles_section_index >= 0: new_lines = lines[: articles_section_index + 1] # Keep everything up to "## Articles" else: new_lines = lines if new_lines and not new_lines[-1].endswith("\n"): new_lines.append("\n") new_lines.append("## Articles\n") # Add articles new_lines.append("\n") # Add a blank line after the heading for article in sorted(articles, key=lambda x: x.get("createTime", ""), reverse=True): article_id: str = str(article.get("articleId", "")) article_title: str = article.get("articleTitle", "No Title") article_url: str = f"https://wutheringwaves.kurogames.com/en/main/news/detail/{article_id}" new_lines.append( f"- [{article_title}]({article_url}) [[json]](articles/{article_id}.json)\n", ) # Add articles directory section new_lines.append("\n## Articles Directory\n\n") new_lines.append("The articles are saved in the `articles` directory.\n") new_lines.append("You can view them [here](articles).\n") # Write the updated content f.seek(0) f.truncate() f.writelines(new_lines) logger.info("Articles added to README.md") def batch_process_timestamps(menu_data: dict[Any, Any], output_dir: Path) -> None: """Process all timestamps in batch for better performance. Args: menu_data (list[dict[str, Any]]): The article menu data containing timestamps. output_dir (Path): Directory containing the article files. """ # Extract article IDs and timestamps timestamp_map: dict[str, str] = {} for item in menu_data: article_id = str(item.get("articleId", "")) create_time = item.get("createTime") if article_id and create_time: timestamp_map[article_id] = create_time logger.info("Collected %s timestamps from menu data", len(timestamp_map)) # Check which files need timestamp updates files_to_update: list[tuple[Path, str]] = [] for article_id, create_time in timestamp_map.items(): file_path: Path = output_dir / f"{article_id}.json" if not file_path.exists(): continue expected_timestamp: float = get_file_timestamp(create_time) if expected_timestamp == 0.0: continue actual_timestamp: float = file_path.stat().st_mtime # Only update if timestamps don't match (with a small tolerance) if abs(actual_timestamp - expected_timestamp) > 1.0: files_to_update.append((file_path, create_time)) logger.info("Found %s files that need timestamp updates", len(files_to_update)) # Update timestamps and commit files for file_path, create_time in files_to_update: logger.info("Setting %s timestamp to %s", file_path, create_time) if set_file_timestamp(file_path, create_time): if not commit_file_with_timestamp(file_path): logger.error("Failed to commit file %s to Git", file_path) else: logger.error("Failed to update timestamp for %s", file_path) class CustomLinkMarkdownConverter(MarkdownConverter): """Custom Markdown converter to handle links. This class is a subclass of MarkdownConverter and overrides the convert_a method to customize the conversion of tags to Markdown links. """ def convert_a(self, el: Any, text: str, **kwargs) -> str: # type: ignore # noqa: ANN003, ANN401, ARG002, PGH003, PLR6301 """Convert tags. Args: el (Any): The element to convert. text (str): The text content of the element. kwargs (Any): Additional arguments. Returns: str: The converted text. """ href: str | None = el.get("href") if not href: return text return f"[{text}](<{href}>)" def generate_atom_feed(articles: list[dict[Any, Any]], file_name: str) -> str: # noqa: PLR0914 """Generate an Atom feed from a list of articles. Args: articles (list[dict[Any, Any]]): The list of articles to include in the feed. file_name (str): The name of the file to save the feed to. Returns: str: The generated Atom feed as a string. """ atom_entries: list[str] = [] latest_entry: str = datetime.now(UTC).isoformat() # Get the latest entry date if articles: latest_entry = articles[0].get("createTime", "") if latest_entry: latest_entry = datetime.strptime(str(latest_entry), "%Y-%m-%d %H:%M:%S").replace(tzinfo=UTC).isoformat() for article in articles: article_id: str = str(article.get("articleId", "")) # Use stable identifier based on article ID entry_id: str = ( f"urn:article:{article_id}" if article_id else f"urn:wutheringwaves:unknown-article-{hash(article.get('articleTitle', '') + article.get('createTime', ''))}" ) article_title: str = article.get("articleTitle", "No Title") article_content: str = article.get("articleContent", article_title) if not article_content: article_content = article_title converter: CustomLinkMarkdownConverter = CustomLinkMarkdownConverter( heading_style="ATX", bullets="-", strip=["img"], ) article_content = article_content.replace(" ", " ") # Replace non-breaking spaces with regular spaces # noqa: RUF001 article_content: str = converter.convert(article_content).strip() # type: ignore # noqa: PGH003 article_content = escape(article_content) article_url: str = f"https://wutheringwaves.kurogames.com/en/main/news/detail/{article_id}" article_create_time: str = article.get("createTime", "") published: str = "" updated: str = latest_entry if article_create_time: timestamp: datetime = datetime.strptime(str(article_create_time), "%Y-%m-%d %H:%M:%S").replace(tzinfo=UTC) iso_time: str = timestamp.isoformat() published = f"{iso_time}" updated = iso_time article_category: str = article.get("articleTypeName", "Wuthering Waves") category: str = f'' if article_category else "" atom_entries.append( f""" {entry_id} {escape(article_title)} {article_content} {published} {updated} {category} Wuthering Waves wutheringwaves_ensupport@kurogames.com https://wutheringwaves.kurogames.com """, ) # Create the complete Atom feed atom_feed: str = f""" Wuthering Waves Articles urn:wutheringwaves:feed {latest_entry} Latest articles from Wuthering Waves https://git.lovinator.space/TheLovinator/wutheringwaves/raw/branch/master/logo.png https://git.lovinator.space/TheLovinator/wutheringwaves/raw/branch/master/logo.png Copyright © {datetime.now(tz=UTC).year} Wuthering Waves Python Script Wuthering Waves wutheringwaves_ensupport@kurogames.com https://wutheringwaves.kurogames.com {"".join(atom_entries)} """ # noqa: E501 return atom_feed def create_atom_feeds(output_dir: Path) -> None: """Create Atom feeds for the articles. Current feeds are: - Last 10 articles - All articles Args: output_dir (Path): The directory to save the RSS feed files. """ menu_data: list[dict[Any, Any]] = [] # Load data from all the articles for file in output_dir.glob("*.json"): if file.stem == "ArticleMenu": continue with file.open("r", encoding="utf-8") as f: try: article_data: dict[Any, Any] = json.load(f) menu_data.append(article_data) except json.JSONDecodeError: logger.exception("Error decoding JSON from %s", file) continue if not menu_data: logger.error("Can't create Atom feeds, no articles found in %s", output_dir) return # Create the Atom feed for the latest articles amount_of_articles: int = 20 atom_feed_path: Path = Path("articles_latest.xml") latest_articles: list[dict[Any, Any]] = sorted( menu_data, key=lambda x: get_file_timestamp(x.get("createTime", "")), reverse=True, )[:amount_of_articles] logger.info("Dates of the last %s articles:", len(latest_articles)) for article in latest_articles: article_id: str = str(article.get("articleId", "")) article_create_time: str = article.get("createTime", "") logger.info("\tArticle ID: %s, Date: %s", article_id, article_create_time) atom_feed: str = generate_atom_feed(articles=latest_articles, file_name=atom_feed_path.name) with atom_feed_path.open("w", encoding="utf-8") as f: f.write(atom_feed) logger.info("Created Atom feed for the last %s articles: %s", len(latest_articles), atom_feed_path) # Create the Atom feed for all articles atom_feed_path_all: Path = Path("articles_all.xml") atom_feed_all_articles: str = generate_atom_feed(articles=menu_data, file_name=atom_feed_path_all.name) with atom_feed_path_all.open("w", encoding="utf-8") as f: f.write(atom_feed_all_articles) logger.info("Created Atom feed for all articles: %s", atom_feed_path_all) def add_data_to_articles(menu_data: dict[Any, Any], output_dir: Path) -> None: """ArticleMenu.json contains data that should be added to the articles. Fields not in the article JSON: - articleDesc (Currently empty in ArticleMenu.json) - createTime - sortingMark - suggestCover - top Args: menu_data (dict[Any, Any]): The article menu data. output_dir (Path): Directory containing the article files. """ for item in menu_data: article_id: str = str(item.get("articleId", "")) if not article_id: continue # Check if the article file exists article_file: Path = output_dir / f"{article_id}.json" if not article_file.is_file(): logger.warning("Article file %s does not exist, skipping...", article_file) continue # Read the existing article data with article_file.open("r", encoding="utf-8") as f: try: article_data: dict[Any, Any] = json.load(f) except json.JSONDecodeError: logger.exception("Error decoding JSON from %s", article_file) continue old_article_data = article_data # Add missing fields from ArticleMenu.json for key in ["articleDesc", "createTime", "sortingMark", "suggestCover", "top"]: if key in item and key not in article_data: article_data[key] = item[key] # Save the updated article data if any changes were made if old_article_data != article_data: with article_file.open("w", encoding="utf-8") as f: json.dump(article_data, f, indent=2, ensure_ascii=False) logger.info("Updated %s with data from ArticleMenu.json", article_file) async def main() -> Literal[1, 0]: """Fetch and save articles from the Wuthering Waves website. Returns: Literal[1, 0]: 1 if an error occurred, 0 otherwise. """ # Setup current_time = int(time.time() * 1000) # Current time in milliseconds base_url = "https://hw-media-cdn-mingchao.kurogame.com/akiwebsite/website2.0/json/G152/en" article_menu_url: str = f"{base_url}/ArticleMenu.json?t={current_time}" article_base_url: str = f"{base_url}/article/" output_dir = Path("articles") output_dir.mkdir(exist_ok=True) logger.info("Fetching article menu from %s", article_menu_url) async with httpx.AsyncClient(timeout=30.0) as client: # Fetch the article menu menu_data: dict[Any, Any] | None = await fetch_json(article_menu_url, client) if not menu_data: logger.error("Error: Fetched ArticleMenu.json is empty") return 1 # Save and prettify the menu JSON menu_file: Path = output_dir / "ArticleMenu.json" if await save_prettified_json(menu_data, menu_file): logger.info("Menu JSON saved and prettified to %s", menu_file) # Extract article IDs logger.info("Extracting article IDs...") article_ids: list[str] = [str(item["articleId"]) for item in menu_data if item.get("articleId")] if not article_ids: logger.warning("No article IDs found. Please check the JSON structure of ArticleMenu.json.") logger.warning("Full menu response for debugging:") logger.warning(json.dumps(menu_data, indent=2)) return 1 # Get list of already downloaded article IDs existing_files: list[str] = [file.stem for file in output_dir.glob("*.json") if file.stem != "ArticleMenu"] # Filter out already downloaded articles new_article_ids: list[str] = [article_id for article_id in article_ids if article_id not in existing_files] if new_article_ids: logger.info("Found %s new articles to download", len(new_article_ids)) # Download each new article download_tasks: list[Coroutine[Any, Any, dict[Any, Any] | None]] = [] for article_id in new_article_ids: article_url: str = f"{article_base_url}{article_id}.json?t={current_time}" output_file: Path = output_dir / f"{article_id}.json" logger.info("Downloading article %s from %s", article_id, article_url) download_tasks.append(fetch_json(article_url, client)) # Wait for all downloads to complete results: list[dict[Any, Any] | BaseException | None] = await asyncio.gather(*download_tasks, return_exceptions=True) # Process the downloaded articles for i, result in enumerate(results): article_id: str = new_article_ids[i] output_file = output_dir / f"{article_id}.json" if isinstance(result, Exception): logger.error("Error downloading article %s: %s", article_id, result) continue if not result: logger.warning("Downloaded article %s is empty or invalid", article_id) continue # Save the article JSON if isinstance(result, dict) and await save_prettified_json(result, output_file): logger.info("Successfully downloaded and prettified %s", output_file) else: logger.info("No new articles to download") add_data_to_articles(menu_data, output_dir) add_articles_to_readme(menu_data) create_atom_feeds(output_dir) batch_process_timestamps(menu_data, output_dir) logger.info("Script finished. Articles are in the '%s' directory.", output_dir) return 0 if __name__ == "__main__": asyncio.run(main())