From a09ea0bd9abc1a7893641ffccb9d378ab0d5482e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Hells=C3=A9n?= Date: Tue, 4 Nov 2025 22:11:59 +0100 Subject: [PATCH] Use anyio.Path for async filesystem ops, tighten formatting, and harden mdformat - Switch to anyio.Path for non-blocking filesystem operations (mkdir, glob) so article directory creation and listing are async-friendly. - Replace blocking sync glob with an async comprehension to build existing_files. - Harden mdformat usage: call formatter inside try/except and fall back to unformatted markdown on error to avoid crashes from unsupported options. - Set logging to DEBUG for more verbose output during runs. - Miscellaneous cleanups: reformat imports/long lists, collapse multi-line constructs, and simplify timestamp parsing/formatting. --- scrape.py | 209 +++++++++++++++--------------------------------------- 1 file changed, 59 insertions(+), 150 deletions(-) diff --git a/scrape.py b/scrape.py index 08b7e24..df13c22 100644 --- a/scrape.py +++ b/scrape.py @@ -1,4 +1,4 @@ -import asyncio # noqa: CPY001, D100 +import asyncio import json import logging import os @@ -6,25 +6,27 @@ import re import shutil import subprocess # noqa: S404 import time -from datetime import UTC, datetime +from datetime import UTC +from datetime import datetime from pathlib import Path -from typing import TYPE_CHECKING, Any, Literal +from typing import TYPE_CHECKING +from typing import Any +from typing import Literal import aiofiles +import anyio import httpx import markdown import mdformat from bs4 import BeautifulSoup -from markdownify import MarkdownConverter # pyright: ignore[reportMissingTypeStubs] -from markupsafe import Markup, escape +from markdownify import MarkdownConverter +from markupsafe import Markup +from markupsafe import escape if TYPE_CHECKING: from collections.abc import Coroutine -logging.basicConfig( - level=logging.INFO, - format="%(message)s", -) +logging.basicConfig(level=logging.DEBUG, format="%(message)s") logger: logging.Logger = logging.getLogger("wutheringwaves") @@ -83,9 +85,7 @@ def set_file_timestamp(filepath: Path, timestamp_str: str) -> bool: """ try: # Parse the timestamp string - dt: datetime = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S").replace( - tzinfo=UTC - ) + dt: datetime = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S").replace(tzinfo=UTC) # Convert to Unix timestamp timestamp: float = dt.timestamp() @@ -116,9 +116,7 @@ def get_file_timestamp(timestamp_str: str) -> float: try: # Parse the timestamp string - dt: datetime = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S").replace( - tzinfo=UTC - ) + dt: datetime = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S").replace(tzinfo=UTC) # Convert to Unix timestamp return dt.timestamp() except ValueError: @@ -137,13 +135,7 @@ def commit_file_with_timestamp(filepath: Path) -> bool: # noqa: PLR0911 """ # Check in Git history if we already have this file - git_log_cmd: list[str] = [ - "git", - "log", - "--pretty=format:%H", - "--follow", - str(filepath), - ] + git_log_cmd: list[str] = ["git", "log", "--pretty=format:%H", "--follow", str(filepath)] try: git_log_output: str = subprocess.check_output(git_log_cmd, text=True).strip() # noqa: S603 if git_log_output: @@ -167,25 +159,14 @@ def commit_file_with_timestamp(filepath: Path) -> bool: # noqa: PLR0911 # Get the file's modification time timestamp: float = filepath.stat().st_mtime - git_time: str = datetime.fromtimestamp(timestamp, tz=UTC).strftime( - "%Y-%m-%dT%H:%M:%S" - ) + git_time: str = datetime.fromtimestamp(timestamp, tz=UTC).strftime("%Y-%m-%dT%H:%M:%S") # Stage the file subprocess.run([git_executable, "add", str(filepath)], check=True, text=True) # noqa: S603 # Commit the file with the modification time as the commit time - env: dict[str, str] = { - **os.environ, - "GIT_AUTHOR_DATE": git_time, - "GIT_COMMITTER_DATE": git_time, - } - subprocess.run( # noqa: S603 - [git_executable, "commit", "-m", f"Add {filepath.name}"], - check=True, - env=env, - text=True, - ) + env: dict[str, str] = {**os.environ, "GIT_AUTHOR_DATE": git_time, "GIT_COMMITTER_DATE": git_time} + subprocess.run([git_executable, "commit", "-m", f"Add {filepath.name}"], check=True, env=env, text=True) # noqa: S603 except subprocess.CalledProcessError: logger.exception("Subprocess error occurred while committing the file.") return False @@ -222,9 +203,7 @@ def add_articles_to_readme(articles: dict[Any, Any] | None = None) -> None: # Create new content new_lines: list[str] = [] if articles_section_index >= 0: - new_lines = lines[ - : articles_section_index + 1 - ] # Keep everything up to "## Articles" + new_lines = lines[: articles_section_index + 1] # Keep everything up to "## Articles" else: new_lines = lines if new_lines and not new_lines[-1].endswith("\n"): @@ -233,17 +212,11 @@ def add_articles_to_readme(articles: dict[Any, Any] | None = None) -> None: # Add articles new_lines.append("\n") # Add a blank line after the heading - for article in sorted( - articles, key=lambda x: x.get("createTime", ""), reverse=True - ): + for article in sorted(articles, key=lambda x: x.get("createTime", ""), reverse=True): article_id: str = str(article.get("articleId", "")) article_title: str = article.get("articleTitle", "No Title") - article_url: str = ( - f"https://wutheringwaves.kurogames.com/en/main/news/detail/{article_id}" - ) - new_lines.append( - f"- [{article_title}]({article_url}) [[json]](articles/{article_id}.json)\n", - ) + article_url: str = f"https://wutheringwaves.kurogames.com/en/main/news/detail/{article_id}" + new_lines.append(f"- [{article_title}]({article_url}) [[json]](articles/{article_id}.json)\n") # Add articles directory section new_lines.append("\n## Articles Directory\n\n") @@ -325,11 +298,7 @@ def format_discord_links(md: str) -> str: # Before: [Link](https://example.com "Link") # After: [Link](https://example.com) - formatted_links_md: str = re.sub( - pattern=r'\[([^\]]+)\]\((https?://[^\s)]+) "\2"\)', - repl=repl, - string=md, - ) + formatted_links_md: str = re.sub(pattern=r'\[([^\]]+)\]\((https?://[^\s)]+) "\2"\)', repl=repl, string=md) return formatted_links_md @@ -372,7 +341,7 @@ def handle_stars(text: str) -> str: return "\n\n".join(output) -def generate_atom_feed(articles: list[dict[Any, Any]], file_name: str) -> str: # noqa: PLR0914, PLR0915 +def generate_atom_feed(articles: list[dict[Any, Any]], file_name: str) -> str: # noqa: C901, PLR0914, PLR0915 """Generate an Atom feed from a list of articles. Args: @@ -390,11 +359,7 @@ def generate_atom_feed(articles: list[dict[Any, Any]], file_name: str) -> str: if articles: latest_entry = articles[0].get("createTime", "") if latest_entry: - latest_entry = ( - datetime.strptime(str(latest_entry), "%Y-%m-%d %H:%M:%S") - .replace(tzinfo=UTC) - .isoformat() - ) + latest_entry = datetime.strptime(str(latest_entry), "%Y-%m-%d %H:%M:%S").replace(tzinfo=UTC).isoformat() for article in articles: article_id: str = str(article.get("articleId", "")) @@ -411,11 +376,8 @@ def generate_atom_feed(articles: list[dict[Any, Any]], file_name: str) -> str: if not article_content: article_content = article_title - converter: MarkdownConverter = MarkdownConverter( - heading_style="ATX", - strip=["pre", "code"], - ) - article_content_converted = str(converter.convert(article_content).strip()) # type: ignore # noqa: PGH003 + converter: MarkdownConverter = MarkdownConverter(heading_style="ATX", strip=["pre", "code"]) + article_content_converted = str(converter.convert(article_content).strip()) if not article_content_converted: msg: str = f"Article content is empty for article ID: {article_id}" @@ -423,48 +385,24 @@ def generate_atom_feed(articles: list[dict[Any, Any]], file_name: str) -> str: article_content_converted = "No content available" # Remove non-breaking spaces - xa0_removed: str = re.sub( - r"\xa0", " ", article_content_converted - ) # Replace non-breaking spaces with regular spaces + xa0_removed: str = re.sub(r"\xa0", " ", article_content_converted) # Replace non-breaking spaces with regular spaces # Replace non-breaking spaces with regular spaces - non_breaking_space_removed: str = xa0_removed.replace( - " ", # noqa: RUF001 - " ", - ) + non_breaking_space_removed: str = xa0_removed.replace(" ", " ") # noqa: RUF001 # Remove code blocks that has only spaces and newlines inside them - empty_code_block_removed: str = re.sub( - pattern=r"```[ \t]*\n[ \t]*\n```", - repl="", - string=non_breaking_space_removed, # type: ignore # noqa: PGH003 - ) + empty_code_block_removed: str = re.sub(pattern=r"```[ \t]*\n[ \t]*\n```", repl="", string=non_breaking_space_removed) # [How to Update] should be # How to Update - square_brackets_converted: str = re.sub( - pattern=r"^\s*\[([^\]]+)\]\s*$", - repl=r"# \1", - string=empty_code_block_removed, # type: ignore # noqa: PGH003 - flags=re.MULTILINE, - ) + square_brackets_converted: str = re.sub(pattern=r"^\s*\[([^\]]+)\]\s*$", repl=r"# \1", string=empty_code_block_removed, flags=re.MULTILINE) stars_converted: str = handle_stars(square_brackets_converted) # If `● Word` is in the content, replace it `## Word` instead with regex - ball_converted: str = re.sub( - pattern=r"●\s*(.*?)\n", - repl=r"\n\n## \1\n\n", - string=stars_converted, - flags=re.MULTILINE, - ) + ball_converted: str = re.sub(pattern=r"●\s*(.*?)\n", repl=r"\n\n## \1\n\n", string=stars_converted, flags=re.MULTILINE) # If `※ Word` is in the content, replace it `* word * ` instead with regex - reference_mark_converted: str = re.sub( - pattern=r"^\s*※\s*(\S.*?)\s*$", - repl=r"\n\n*\1*\n\n", - string=ball_converted, - flags=re.MULTILINE, - ) + reference_mark_converted: str = re.sub(pattern=r"^\s*※\s*(\S.*?)\s*$", repl=r"\n\n*\1*\n\n", string=ball_converted, flags=re.MULTILINE) # Replace circled Unicode numbers (①-⑳) with plain numbered text (e.g., "1. ", "2. ", ..., "20. ") number_symbol: dict[str, str] = { @@ -494,35 +432,32 @@ def generate_atom_feed(articles: list[dict[Any, Any]], file_name: str) -> str: flags=re.MULTILINE, ) - markdown_formatted: str = mdformat.text( # type: ignore # noqa: PGH003 - space_before_star_added, - options={ - "number": True, # Allow 1., 2., 3. numbering - }, - ) + # Format Markdown safely. mdformat doesn't support a "number" option here, + # and unknown options can raise at runtime. We avoid passing invalid options + # and fall back to the raw text if formatting fails for any reason. + try: + formatter: Any = mdformat # Help the type checker by treating mdformat as Any here + markdown_formatted: str = str(formatter.text(space_before_star_added)) + except Exception: + logger.exception("mdformat failed; using unformatted markdown text") + markdown_formatted = space_before_star_added links_fixed: str = format_discord_links(markdown_formatted) article_escaped: Markup = escape(links_fixed) - article_url: str = ( - f"https://wutheringwaves.kurogames.com/en/main/news/detail/{article_id}" - ) + article_url: str = f"https://wutheringwaves.kurogames.com/en/main/news/detail/{article_id}" article_create_time: str = article.get("createTime", "") published: str = "" updated: str = latest_entry if article_create_time: - timestamp: datetime = datetime.strptime( - str(article_create_time), "%Y-%m-%d %H:%M:%S" - ).replace(tzinfo=UTC) + timestamp: datetime = datetime.strptime(str(article_create_time), "%Y-%m-%d %H:%M:%S").replace(tzinfo=UTC) iso_time: str = timestamp.isoformat() published = f"{iso_time}" updated = iso_time article_category: str = article.get("articleTypeName", "Wuthering Waves") - category: str = ( - f'' if article_category else "" - ) + category: str = f'' if article_category else "" html: str = markdown.markdown( text=article_escaped, @@ -583,7 +518,7 @@ def generate_atom_feed(articles: list[dict[Any, Any]], file_name: str) -> str: {"".join(atom_entries)} -""" # noqa: E501 +""" return atom_feed @@ -633,9 +568,7 @@ def create_atom_feeds(output_dir: Path) -> None: article_create_time: str = article.get("createTime", "") logger.info("\tArticle ID: %s, Date: %s", article_id, article_create_time) - atom_feed: str = generate_atom_feed( - articles=latest_articles, file_name=atom_feed_path.name - ) + atom_feed: str = generate_atom_feed(articles=latest_articles, file_name=atom_feed_path.name) with atom_feed_path.open("w", encoding="utf-8") as f: f.write(atom_feed) logger.info( @@ -646,9 +579,7 @@ def create_atom_feeds(output_dir: Path) -> None: # Create the Atom feed for all articles atom_feed_path_all: Path = Path("articles_all.xml") - atom_feed_all_articles: str = generate_atom_feed( - articles=articles_sorted, file_name=atom_feed_path_all.name - ) + atom_feed_all_articles: str = generate_atom_feed(articles=articles_sorted, file_name=atom_feed_path_all.name) with atom_feed_path_all.open("w", encoding="utf-8") as f: f.write(atom_feed_all_articles) logger.info("Created Atom feed for all articles: %s", atom_feed_path_all) @@ -711,13 +642,11 @@ async def main() -> Literal[1, 0]: """ # Setup current_time = int(time.time() * 1000) # Current time in milliseconds - base_url = ( - "https://hw-media-cdn-mingchao.kurogame.com/akiwebsite/website2.0/json/G152/en" - ) + base_url = "https://hw-media-cdn-mingchao.kurogame.com/akiwebsite/website2.0/json/G152/en" article_menu_url: str = f"{base_url}/ArticleMenu.json?t={current_time}" article_base_url: str = f"{base_url}/article/" output_dir = Path("articles") - output_dir.mkdir(exist_ok=True) + await anyio.Path(output_dir).mkdir(exist_ok=True) logger.info("Fetching article menu from %s", article_menu_url) @@ -735,29 +664,19 @@ async def main() -> Literal[1, 0]: # Extract article IDs logger.info("Extracting article IDs...") - article_ids: list[str] = [ - str(item["articleId"]) for item in menu_data if item.get("articleId") - ] + article_ids: list[str] = [str(item["articleId"]) for item in menu_data if item.get("articleId")] if not article_ids: - logger.warning( - "No article IDs found. Please check the JSON structure of ArticleMenu.json." - ) + logger.warning("No article IDs found. Please check the JSON structure of ArticleMenu.json.") logger.warning("Full menu response for debugging:") logger.warning(json.dumps(menu_data, indent=2)) return 1 - # Get list of already downloaded article IDs - existing_files: list[str] = [ - file.stem - for file in output_dir.glob("*.json") - if file.stem != "ArticleMenu" - ] + existing_files: list[str] = [file.stem async for file in anyio.Path(output_dir).glob("*.json") if file.stem != "ArticleMenu"] # Filter out already downloaded articles - new_article_ids: list[str] = [ - article_id for article_id in article_ids if article_id not in existing_files - ] + # Filter out already downloaded articles + new_article_ids: list[str] = [article_id for article_id in article_ids if article_id not in existing_files] if new_article_ids: logger.info("Found %s new articles to download", len(new_article_ids)) @@ -765,18 +684,14 @@ async def main() -> Literal[1, 0]: # Download each new article download_tasks: list[Coroutine[Any, Any, dict[Any, Any] | None]] = [] for article_id in new_article_ids: - article_url: str = ( - f"{article_base_url}{article_id}.json?t={current_time}" - ) + article_url: str = f"{article_base_url}{article_id}.json?t={current_time}" output_file: Path = output_dir / f"{article_id}.json" logger.info("Downloading article %s from %s", article_id, article_url) download_tasks.append(fetch_json(article_url, client)) # Wait for all downloads to complete - results: list[dict[Any, Any] | BaseException | None] = await asyncio.gather( - *download_tasks, return_exceptions=True - ) + results: list[dict[Any, Any] | BaseException | None] = await asyncio.gather(*download_tasks, return_exceptions=True) # Process the downloaded articles for i, result in enumerate(results): @@ -788,18 +703,12 @@ async def main() -> Literal[1, 0]: continue if not result: - logger.warning( - "Downloaded article %s is empty or invalid", article_id - ) + logger.warning("Downloaded article %s is empty or invalid", article_id) continue # Save the article JSON - if isinstance(result, dict) and await save_prettified_json( - result, output_file - ): - logger.info( - "Successfully downloaded and prettified %s", output_file - ) + if isinstance(result, dict) and await save_prettified_json(result, output_file): + logger.info("Successfully downloaded and prettified %s", output_file) else: logger.info("No new articles to download")