Use anyio.Path for async filesystem ops, tighten formatting, and harden mdformat

- Switch to anyio.Path for non-blocking filesystem operations (mkdir, glob)
  so article directory creation and listing are async-friendly.
- Replace blocking sync glob with an async comprehension to build existing_files.
- Harden mdformat usage: call formatter inside try/except and fall back to
  unformatted markdown on error to avoid crashes from unsupported options.
- Set logging to DEBUG for more verbose output during runs.
- Miscellaneous cleanups: reformat imports/long lists, collapse multi-line
  constructs, and simplify timestamp parsing/formatting.
This commit is contained in:
2025-11-04 22:11:59 +01:00
parent 192615fddb
commit a09ea0bd9a

209
scrape.py
View File

@@ -1,4 +1,4 @@
import asyncio # noqa: CPY001, D100 import asyncio
import json import json
import logging import logging
import os import os
@@ -6,25 +6,27 @@ import re
import shutil import shutil
import subprocess # noqa: S404 import subprocess # noqa: S404
import time import time
from datetime import UTC, datetime from datetime import UTC
from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal from typing import TYPE_CHECKING
from typing import Any
from typing import Literal
import aiofiles import aiofiles
import anyio
import httpx import httpx
import markdown import markdown
import mdformat import mdformat
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from markdownify import MarkdownConverter # pyright: ignore[reportMissingTypeStubs] from markdownify import MarkdownConverter
from markupsafe import Markup, escape from markupsafe import Markup
from markupsafe import escape
if TYPE_CHECKING: if TYPE_CHECKING:
from collections.abc import Coroutine from collections.abc import Coroutine
logging.basicConfig( logging.basicConfig(level=logging.DEBUG, format="%(message)s")
level=logging.INFO,
format="%(message)s",
)
logger: logging.Logger = logging.getLogger("wutheringwaves") logger: logging.Logger = logging.getLogger("wutheringwaves")
@@ -83,9 +85,7 @@ def set_file_timestamp(filepath: Path, timestamp_str: str) -> bool:
""" """
try: try:
# Parse the timestamp string # Parse the timestamp string
dt: datetime = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S").replace( dt: datetime = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S").replace(tzinfo=UTC)
tzinfo=UTC
)
# Convert to Unix timestamp # Convert to Unix timestamp
timestamp: float = dt.timestamp() timestamp: float = dt.timestamp()
@@ -116,9 +116,7 @@ def get_file_timestamp(timestamp_str: str) -> float:
try: try:
# Parse the timestamp string # Parse the timestamp string
dt: datetime = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S").replace( dt: datetime = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S").replace(tzinfo=UTC)
tzinfo=UTC
)
# Convert to Unix timestamp # Convert to Unix timestamp
return dt.timestamp() return dt.timestamp()
except ValueError: except ValueError:
@@ -137,13 +135,7 @@ def commit_file_with_timestamp(filepath: Path) -> bool: # noqa: PLR0911
""" """
# Check in Git history if we already have this file # Check in Git history if we already have this file
git_log_cmd: list[str] = [ git_log_cmd: list[str] = ["git", "log", "--pretty=format:%H", "--follow", str(filepath)]
"git",
"log",
"--pretty=format:%H",
"--follow",
str(filepath),
]
try: try:
git_log_output: str = subprocess.check_output(git_log_cmd, text=True).strip() # noqa: S603 git_log_output: str = subprocess.check_output(git_log_cmd, text=True).strip() # noqa: S603
if git_log_output: if git_log_output:
@@ -167,25 +159,14 @@ def commit_file_with_timestamp(filepath: Path) -> bool: # noqa: PLR0911
# Get the file's modification time # Get the file's modification time
timestamp: float = filepath.stat().st_mtime timestamp: float = filepath.stat().st_mtime
git_time: str = datetime.fromtimestamp(timestamp, tz=UTC).strftime( git_time: str = datetime.fromtimestamp(timestamp, tz=UTC).strftime("%Y-%m-%dT%H:%M:%S")
"%Y-%m-%dT%H:%M:%S"
)
# Stage the file # Stage the file
subprocess.run([git_executable, "add", str(filepath)], check=True, text=True) # noqa: S603 subprocess.run([git_executable, "add", str(filepath)], check=True, text=True) # noqa: S603
# Commit the file with the modification time as the commit time # Commit the file with the modification time as the commit time
env: dict[str, str] = { env: dict[str, str] = {**os.environ, "GIT_AUTHOR_DATE": git_time, "GIT_COMMITTER_DATE": git_time}
**os.environ, subprocess.run([git_executable, "commit", "-m", f"Add {filepath.name}"], check=True, env=env, text=True) # noqa: S603
"GIT_AUTHOR_DATE": git_time,
"GIT_COMMITTER_DATE": git_time,
}
subprocess.run( # noqa: S603
[git_executable, "commit", "-m", f"Add {filepath.name}"],
check=True,
env=env,
text=True,
)
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
logger.exception("Subprocess error occurred while committing the file.") logger.exception("Subprocess error occurred while committing the file.")
return False return False
@@ -222,9 +203,7 @@ def add_articles_to_readme(articles: dict[Any, Any] | None = None) -> None:
# Create new content # Create new content
new_lines: list[str] = [] new_lines: list[str] = []
if articles_section_index >= 0: if articles_section_index >= 0:
new_lines = lines[ new_lines = lines[: articles_section_index + 1] # Keep everything up to "## Articles"
: articles_section_index + 1
] # Keep everything up to "## Articles"
else: else:
new_lines = lines new_lines = lines
if new_lines and not new_lines[-1].endswith("\n"): if new_lines and not new_lines[-1].endswith("\n"):
@@ -233,17 +212,11 @@ def add_articles_to_readme(articles: dict[Any, Any] | None = None) -> None:
# Add articles # Add articles
new_lines.append("\n") # Add a blank line after the heading new_lines.append("\n") # Add a blank line after the heading
for article in sorted( for article in sorted(articles, key=lambda x: x.get("createTime", ""), reverse=True):
articles, key=lambda x: x.get("createTime", ""), reverse=True
):
article_id: str = str(article.get("articleId", "")) article_id: str = str(article.get("articleId", ""))
article_title: str = article.get("articleTitle", "No Title") article_title: str = article.get("articleTitle", "No Title")
article_url: str = ( article_url: str = f"https://wutheringwaves.kurogames.com/en/main/news/detail/{article_id}"
f"https://wutheringwaves.kurogames.com/en/main/news/detail/{article_id}" new_lines.append(f"- [{article_title}]({article_url}) [[json]](articles/{article_id}.json)\n")
)
new_lines.append(
f"- [{article_title}]({article_url}) [[json]](articles/{article_id}.json)\n",
)
# Add articles directory section # Add articles directory section
new_lines.append("\n## Articles Directory\n\n") new_lines.append("\n## Articles Directory\n\n")
@@ -325,11 +298,7 @@ def format_discord_links(md: str) -> str:
# Before: [Link](https://example.com "Link") # Before: [Link](https://example.com "Link")
# After: [Link](https://example.com) # After: [Link](https://example.com)
formatted_links_md: str = re.sub( formatted_links_md: str = re.sub(pattern=r'\[([^\]]+)\]\((https?://[^\s)]+) "\2"\)', repl=repl, string=md)
pattern=r'\[([^\]]+)\]\((https?://[^\s)]+) "\2"\)',
repl=repl,
string=md,
)
return formatted_links_md return formatted_links_md
@@ -372,7 +341,7 @@ def handle_stars(text: str) -> str:
return "\n\n".join(output) return "\n\n".join(output)
def generate_atom_feed(articles: list[dict[Any, Any]], file_name: str) -> str: # noqa: PLR0914, PLR0915 def generate_atom_feed(articles: list[dict[Any, Any]], file_name: str) -> str: # noqa: C901, PLR0914, PLR0915
"""Generate an Atom feed from a list of articles. """Generate an Atom feed from a list of articles.
Args: Args:
@@ -390,11 +359,7 @@ def generate_atom_feed(articles: list[dict[Any, Any]], file_name: str) -> str:
if articles: if articles:
latest_entry = articles[0].get("createTime", "") latest_entry = articles[0].get("createTime", "")
if latest_entry: if latest_entry:
latest_entry = ( latest_entry = datetime.strptime(str(latest_entry), "%Y-%m-%d %H:%M:%S").replace(tzinfo=UTC).isoformat()
datetime.strptime(str(latest_entry), "%Y-%m-%d %H:%M:%S")
.replace(tzinfo=UTC)
.isoformat()
)
for article in articles: for article in articles:
article_id: str = str(article.get("articleId", "")) article_id: str = str(article.get("articleId", ""))
@@ -411,11 +376,8 @@ def generate_atom_feed(articles: list[dict[Any, Any]], file_name: str) -> str:
if not article_content: if not article_content:
article_content = article_title article_content = article_title
converter: MarkdownConverter = MarkdownConverter( converter: MarkdownConverter = MarkdownConverter(heading_style="ATX", strip=["pre", "code"])
heading_style="ATX", article_content_converted = str(converter.convert(article_content).strip())
strip=["pre", "code"],
)
article_content_converted = str(converter.convert(article_content).strip()) # type: ignore # noqa: PGH003
if not article_content_converted: if not article_content_converted:
msg: str = f"Article content is empty for article ID: {article_id}" msg: str = f"Article content is empty for article ID: {article_id}"
@@ -423,48 +385,24 @@ def generate_atom_feed(articles: list[dict[Any, Any]], file_name: str) -> str:
article_content_converted = "No content available" article_content_converted = "No content available"
# Remove non-breaking spaces # Remove non-breaking spaces
xa0_removed: str = re.sub( xa0_removed: str = re.sub(r"\xa0", " ", article_content_converted) # Replace non-breaking spaces with regular spaces
r"\xa0", " ", article_content_converted
) # Replace non-breaking spaces with regular spaces
# Replace non-breaking spaces with regular spaces # Replace non-breaking spaces with regular spaces
non_breaking_space_removed: str = xa0_removed.replace( non_breaking_space_removed: str = xa0_removed.replace(" ", " ") # noqa: RUF001
" ", # noqa: RUF001
" ",
)
# Remove code blocks that has only spaces and newlines inside them # Remove code blocks that has only spaces and newlines inside them
empty_code_block_removed: str = re.sub( empty_code_block_removed: str = re.sub(pattern=r"```[ \t]*\n[ \t]*\n```", repl="", string=non_breaking_space_removed)
pattern=r"```[ \t]*\n[ \t]*\n```",
repl="",
string=non_breaking_space_removed, # type: ignore # noqa: PGH003
)
# [How to Update] should be # How to Update # [How to Update] should be # How to Update
square_brackets_converted: str = re.sub( square_brackets_converted: str = re.sub(pattern=r"^\s*\[([^\]]+)\]\s*$", repl=r"# \1", string=empty_code_block_removed, flags=re.MULTILINE)
pattern=r"^\s*\[([^\]]+)\]\s*$",
repl=r"# \1",
string=empty_code_block_removed, # type: ignore # noqa: PGH003
flags=re.MULTILINE,
)
stars_converted: str = handle_stars(square_brackets_converted) stars_converted: str = handle_stars(square_brackets_converted)
# If `● Word` is in the content, replace it `## Word` instead with regex # If `● Word` is in the content, replace it `## Word` instead with regex
ball_converted: str = re.sub( ball_converted: str = re.sub(pattern=r"\s*(.*?)\n", repl=r"\n\n## \1\n\n", string=stars_converted, flags=re.MULTILINE)
pattern=r"\s*(.*?)\n",
repl=r"\n\n## \1\n\n",
string=stars_converted,
flags=re.MULTILINE,
)
# If `※ Word` is in the content, replace it `* word * ` instead with regex # If `※ Word` is in the content, replace it `* word * ` instead with regex
reference_mark_converted: str = re.sub( reference_mark_converted: str = re.sub(pattern=r"^\s*※\s*(\S.*?)\s*$", repl=r"\n\n*\1*\n\n", string=ball_converted, flags=re.MULTILINE)
pattern=r"^\s*※\s*(\S.*?)\s*$",
repl=r"\n\n*\1*\n\n",
string=ball_converted,
flags=re.MULTILINE,
)
# Replace circled Unicode numbers (①-⑳) with plain numbered text (e.g., "1. ", "2. ", ..., "20. ") # Replace circled Unicode numbers (①-⑳) with plain numbered text (e.g., "1. ", "2. ", ..., "20. ")
number_symbol: dict[str, str] = { number_symbol: dict[str, str] = {
@@ -494,35 +432,32 @@ def generate_atom_feed(articles: list[dict[Any, Any]], file_name: str) -> str:
flags=re.MULTILINE, flags=re.MULTILINE,
) )
markdown_formatted: str = mdformat.text( # type: ignore # noqa: PGH003 # Format Markdown safely. mdformat doesn't support a "number" option here,
space_before_star_added, # and unknown options can raise at runtime. We avoid passing invalid options
options={ # and fall back to the raw text if formatting fails for any reason.
"number": True, # Allow 1., 2., 3. numbering try:
}, formatter: Any = mdformat # Help the type checker by treating mdformat as Any here
) markdown_formatted: str = str(formatter.text(space_before_star_added))
except Exception:
logger.exception("mdformat failed; using unformatted markdown text")
markdown_formatted = space_before_star_added
links_fixed: str = format_discord_links(markdown_formatted) links_fixed: str = format_discord_links(markdown_formatted)
article_escaped: Markup = escape(links_fixed) article_escaped: Markup = escape(links_fixed)
article_url: str = ( article_url: str = f"https://wutheringwaves.kurogames.com/en/main/news/detail/{article_id}"
f"https://wutheringwaves.kurogames.com/en/main/news/detail/{article_id}"
)
article_create_time: str = article.get("createTime", "") article_create_time: str = article.get("createTime", "")
published: str = "" published: str = ""
updated: str = latest_entry updated: str = latest_entry
if article_create_time: if article_create_time:
timestamp: datetime = datetime.strptime( timestamp: datetime = datetime.strptime(str(article_create_time), "%Y-%m-%d %H:%M:%S").replace(tzinfo=UTC)
str(article_create_time), "%Y-%m-%d %H:%M:%S"
).replace(tzinfo=UTC)
iso_time: str = timestamp.isoformat() iso_time: str = timestamp.isoformat()
published = f"<published>{iso_time}</published>" published = f"<published>{iso_time}</published>"
updated = iso_time updated = iso_time
article_category: str = article.get("articleTypeName", "Wuthering Waves") article_category: str = article.get("articleTypeName", "Wuthering Waves")
category: str = ( category: str = f'<category term="{escape(article_category)}"/>' if article_category else ""
f'<category term="{escape(article_category)}"/>' if article_category else ""
)
html: str = markdown.markdown( html: str = markdown.markdown(
text=article_escaped, text=article_escaped,
@@ -583,7 +518,7 @@ def generate_atom_feed(articles: list[dict[Any, Any]], file_name: str) -> str:
</author> </author>
{"".join(atom_entries)} {"".join(atom_entries)}
</feed> </feed>
""" # noqa: E501 """
return atom_feed return atom_feed
@@ -633,9 +568,7 @@ def create_atom_feeds(output_dir: Path) -> None:
article_create_time: str = article.get("createTime", "") article_create_time: str = article.get("createTime", "")
logger.info("\tArticle ID: %s, Date: %s", article_id, article_create_time) logger.info("\tArticle ID: %s, Date: %s", article_id, article_create_time)
atom_feed: str = generate_atom_feed( atom_feed: str = generate_atom_feed(articles=latest_articles, file_name=atom_feed_path.name)
articles=latest_articles, file_name=atom_feed_path.name
)
with atom_feed_path.open("w", encoding="utf-8") as f: with atom_feed_path.open("w", encoding="utf-8") as f:
f.write(atom_feed) f.write(atom_feed)
logger.info( logger.info(
@@ -646,9 +579,7 @@ def create_atom_feeds(output_dir: Path) -> None:
# Create the Atom feed for all articles # Create the Atom feed for all articles
atom_feed_path_all: Path = Path("articles_all.xml") atom_feed_path_all: Path = Path("articles_all.xml")
atom_feed_all_articles: str = generate_atom_feed( atom_feed_all_articles: str = generate_atom_feed(articles=articles_sorted, file_name=atom_feed_path_all.name)
articles=articles_sorted, file_name=atom_feed_path_all.name
)
with atom_feed_path_all.open("w", encoding="utf-8") as f: with atom_feed_path_all.open("w", encoding="utf-8") as f:
f.write(atom_feed_all_articles) f.write(atom_feed_all_articles)
logger.info("Created Atom feed for all articles: %s", atom_feed_path_all) logger.info("Created Atom feed for all articles: %s", atom_feed_path_all)
@@ -711,13 +642,11 @@ async def main() -> Literal[1, 0]:
""" """
# Setup # Setup
current_time = int(time.time() * 1000) # Current time in milliseconds current_time = int(time.time() * 1000) # Current time in milliseconds
base_url = ( base_url = "https://hw-media-cdn-mingchao.kurogame.com/akiwebsite/website2.0/json/G152/en"
"https://hw-media-cdn-mingchao.kurogame.com/akiwebsite/website2.0/json/G152/en"
)
article_menu_url: str = f"{base_url}/ArticleMenu.json?t={current_time}" article_menu_url: str = f"{base_url}/ArticleMenu.json?t={current_time}"
article_base_url: str = f"{base_url}/article/" article_base_url: str = f"{base_url}/article/"
output_dir = Path("articles") output_dir = Path("articles")
output_dir.mkdir(exist_ok=True) await anyio.Path(output_dir).mkdir(exist_ok=True)
logger.info("Fetching article menu from %s", article_menu_url) logger.info("Fetching article menu from %s", article_menu_url)
@@ -735,29 +664,19 @@ async def main() -> Literal[1, 0]:
# Extract article IDs # Extract article IDs
logger.info("Extracting article IDs...") logger.info("Extracting article IDs...")
article_ids: list[str] = [ article_ids: list[str] = [str(item["articleId"]) for item in menu_data if item.get("articleId")]
str(item["articleId"]) for item in menu_data if item.get("articleId")
]
if not article_ids: if not article_ids:
logger.warning( logger.warning("No article IDs found. Please check the JSON structure of ArticleMenu.json.")
"No article IDs found. Please check the JSON structure of ArticleMenu.json."
)
logger.warning("Full menu response for debugging:") logger.warning("Full menu response for debugging:")
logger.warning(json.dumps(menu_data, indent=2)) logger.warning(json.dumps(menu_data, indent=2))
return 1 return 1
# Get list of already downloaded article IDs # Get list of already downloaded article IDs
existing_files: list[str] = [ existing_files: list[str] = [file.stem async for file in anyio.Path(output_dir).glob("*.json") if file.stem != "ArticleMenu"]
file.stem
for file in output_dir.glob("*.json")
if file.stem != "ArticleMenu"
]
# Filter out already downloaded articles # Filter out already downloaded articles
new_article_ids: list[str] = [ # Filter out already downloaded articles
article_id for article_id in article_ids if article_id not in existing_files new_article_ids: list[str] = [article_id for article_id in article_ids if article_id not in existing_files]
]
if new_article_ids: if new_article_ids:
logger.info("Found %s new articles to download", len(new_article_ids)) logger.info("Found %s new articles to download", len(new_article_ids))
@@ -765,18 +684,14 @@ async def main() -> Literal[1, 0]:
# Download each new article # Download each new article
download_tasks: list[Coroutine[Any, Any, dict[Any, Any] | None]] = [] download_tasks: list[Coroutine[Any, Any, dict[Any, Any] | None]] = []
for article_id in new_article_ids: for article_id in new_article_ids:
article_url: str = ( article_url: str = f"{article_base_url}{article_id}.json?t={current_time}"
f"{article_base_url}{article_id}.json?t={current_time}"
)
output_file: Path = output_dir / f"{article_id}.json" output_file: Path = output_dir / f"{article_id}.json"
logger.info("Downloading article %s from %s", article_id, article_url) logger.info("Downloading article %s from %s", article_id, article_url)
download_tasks.append(fetch_json(article_url, client)) download_tasks.append(fetch_json(article_url, client))
# Wait for all downloads to complete # Wait for all downloads to complete
results: list[dict[Any, Any] | BaseException | None] = await asyncio.gather( results: list[dict[Any, Any] | BaseException | None] = await asyncio.gather(*download_tasks, return_exceptions=True)
*download_tasks, return_exceptions=True
)
# Process the downloaded articles # Process the downloaded articles
for i, result in enumerate(results): for i, result in enumerate(results):
@@ -788,18 +703,12 @@ async def main() -> Literal[1, 0]:
continue continue
if not result: if not result:
logger.warning( logger.warning("Downloaded article %s is empty or invalid", article_id)
"Downloaded article %s is empty or invalid", article_id
)
continue continue
# Save the article JSON # Save the article JSON
if isinstance(result, dict) and await save_prettified_json( if isinstance(result, dict) and await save_prettified_json(result, output_file):
result, output_file logger.info("Successfully downloaded and prettified %s", output_file)
):
logger.info(
"Successfully downloaded and prettified %s", output_file
)
else: else:
logger.info("No new articles to download") logger.info("No new articles to download")