Files
wutheringwaves/scrape.py
Joakim Hellsén e03da5b2f7
All checks were successful
Run Scraper / scrape (push) Successful in 18s
Go back to ATX headers; don't use sortingMark
2025-05-17 03:41:58 +02:00

747 lines
27 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio # noqa: CPY001, D100
import json
import logging
import os
import re
import shutil
import subprocess # noqa: S404
import time
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal
import aiofiles
import httpx
import markdown
import mdformat
from bs4 import BeautifulSoup
from markdownify import MarkdownConverter # pyright: ignore[reportMissingTypeStubs]
from markupsafe import Markup, escape
if TYPE_CHECKING:
from collections.abc import Coroutine
logging.basicConfig(
level=logging.INFO,
format="%(message)s",
)
logger: logging.Logger = logging.getLogger("wutheringwaves")
async def fetch_json(url: str, client: httpx.AsyncClient) -> dict[Any, Any] | None:
"""Fetch JSON data from a URL.
Args:
url (str): The URL to fetch data from.
client (httpx.AsyncClient): The HTTP client to use for the request.
Returns:
dict[Any, Any] | None: The parsed JSON data if successful, None otherwise.
"""
try:
response: httpx.Response = await client.get(url)
response.raise_for_status()
return response.json()
except (httpx.RequestError, json.JSONDecodeError):
logger.exception("Error fetching %s:", url)
return None
async def save_prettified_json(data: dict[Any, Any], filepath: Path) -> bool:
"""Save JSON data to a file with pretty formatting.
Args:
data (dict[Any, Any]): The JSON data to save.
filepath (Path): The path to the file where the data will be saved.
Returns:
bool: True if the data was saved successfully, False otherwise.
"""
try:
async with aiofiles.open(filepath, "w", encoding="utf-8") as f:
await f.write(json.dumps(data, indent=2, ensure_ascii=False))
except Exception:
logger.exception("Error saving %s:", filepath)
return False
else:
return True
def set_file_timestamp(filepath: Path, timestamp_str: str) -> bool:
"""Set file's modification time based on ISO timestamp string.
Args:
filepath (Path): The path to the file.
timestamp_str (str): The ISO timestamp string.
Returns:
bool: True if the timestamp was set successfully, False otherwise.
"""
try:
# Parse the timestamp string
dt: datetime = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S").replace(tzinfo=UTC)
# Convert to Unix timestamp
timestamp: float = dt.timestamp()
# Set the file's modification time
os.utime(filepath, (timestamp, timestamp))
except ValueError:
logger.info("Error setting timestamp for %s", filepath)
return False
else:
logger.info("Timestamp for %s set to %s", filepath, dt.isoformat())
return True
def get_file_timestamp(timestamp_str: str) -> float:
"""Convert ISO timestamp string to Unix timestamp.
Args:
timestamp_str (str): The ISO timestamp string.
Returns:
float: The Unix timestamp, or 0 if conversion failed.
"""
if not timestamp_str:
logger.info("Empty timestamp string")
return 0.0
try:
# Parse the timestamp string
dt: datetime = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S").replace(tzinfo=UTC)
# Convert to Unix timestamp
return dt.timestamp()
except ValueError:
logger.info("Error converting timestamp %s", timestamp_str)
return 0.0
def commit_file_with_timestamp(filepath: Path) -> bool: # noqa: PLR0911
"""Commit a file to Git with its modification time as the commit time.
Args:
filepath (Path): The path to the file to commit.
Returns:
bool: True if the commit was successful, False otherwise.
"""
# Check in Git history if we already have this file
git_log_cmd: list[str] = ["git", "log", "--pretty=format:%H", "--follow", str(filepath)]
try:
git_log_output: str = subprocess.check_output(git_log_cmd, text=True).strip() # noqa: S603
if git_log_output:
logger.info("File %s already exists in Git history.", filepath)
return True
except subprocess.CalledProcessError:
logger.exception("Error checking Git history for %s", filepath)
return False
try:
# Get the full path to the Git executable
git_executable: str | None = shutil.which("git")
if not git_executable:
logger.error("Git executable not found.")
return False
# Validate the filepath
if not filepath.is_file():
logger.error("Invalid file path: %s", filepath)
return False
# Get the file's modification time
timestamp: float = filepath.stat().st_mtime
git_time: str = datetime.fromtimestamp(timestamp, tz=UTC).strftime("%Y-%m-%dT%H:%M:%S")
# Stage the file
subprocess.run([git_executable, "add", str(filepath)], check=True, text=True) # noqa: S603
# Commit the file with the modification time as the commit time
env: dict[str, str] = {
**os.environ,
"GIT_AUTHOR_DATE": git_time,
"GIT_COMMITTER_DATE": git_time,
}
subprocess.run( # noqa: S603
[git_executable, "commit", "-m", f"Add {filepath.name}"],
check=True,
env=env,
text=True,
)
except subprocess.CalledProcessError:
logger.exception("Subprocess error occurred while committing the file.")
return False
except Exception:
logger.exception("Error committing %s to Git", filepath)
return False
else:
logger.info("Successfully committed %s to Git", filepath)
return True
def add_articles_to_readme(articles: dict[Any, Any] | None = None) -> None:
"""Add the list of articles to the README.md file."""
if articles is None:
logger.warning("No articles to add to README.md")
return
readme_file: Path = Path("README.md")
if not readme_file.is_file():
logger.error("README.md file not found.")
return
with readme_file.open("r+", encoding="utf-8") as f:
# Read existing content
lines: list[str] = f.readlines()
# Find "## Articles" section or add it
articles_section_index = -1
for i, line in enumerate(lines):
if line.strip() == "## Articles":
articles_section_index: int = i
break
# Create new content
new_lines: list[str] = []
if articles_section_index >= 0:
new_lines = lines[: articles_section_index + 1] # Keep everything up to "## Articles"
else:
new_lines = lines
if new_lines and not new_lines[-1].endswith("\n"):
new_lines.append("\n")
new_lines.append("## Articles\n")
# Add articles
new_lines.append("\n") # Add a blank line after the heading
for article in sorted(articles, key=lambda x: x.get("createTime", ""), reverse=True):
article_id: str = str(article.get("articleId", ""))
article_title: str = article.get("articleTitle", "No Title")
article_url: str = f"https://wutheringwaves.kurogames.com/en/main/news/detail/{article_id}"
new_lines.append(
f"- [{article_title}]({article_url}) [[json]](articles/{article_id}.json)\n",
)
# Add articles directory section
new_lines.append("\n## Articles Directory\n\n")
new_lines.append("The articles are saved in the `articles` directory.\n")
new_lines.append("You can view them [here](articles).\n")
# Write the updated content
f.seek(0)
f.truncate()
f.writelines(new_lines)
logger.info("Articles added to README.md")
def batch_process_timestamps(menu_data: dict[Any, Any], output_dir: Path) -> None:
"""Process all timestamps in batch for better performance.
Args:
menu_data (list[dict[str, Any]]): The article menu data containing timestamps.
output_dir (Path): Directory containing the article files.
"""
# Extract article IDs and timestamps
timestamp_map: dict[str, str] = {}
for item in menu_data:
article_id = str(item.get("articleId", ""))
create_time = item.get("createTime")
if article_id and create_time:
timestamp_map[article_id] = create_time
logger.info("Collected %s timestamps from menu data", len(timestamp_map))
# Check which files need timestamp updates
files_to_update: list[tuple[Path, str]] = []
for article_id, create_time in timestamp_map.items():
file_path: Path = output_dir / f"{article_id}.json"
if not file_path.exists():
continue
expected_timestamp: float = get_file_timestamp(create_time)
if expected_timestamp == 0.0:
continue
actual_timestamp: float = file_path.stat().st_mtime
# Only update if timestamps don't match (with a small tolerance)
if abs(actual_timestamp - expected_timestamp) > 1.0:
files_to_update.append((file_path, create_time))
logger.info("Found %s files that need timestamp updates", len(files_to_update))
# Update timestamps and commit files
for file_path, create_time in files_to_update:
logger.info("Setting %s timestamp to %s", file_path, create_time)
if set_file_timestamp(file_path, create_time):
if not commit_file_with_timestamp(file_path):
logger.error("Failed to commit file %s to Git", file_path)
else:
logger.error("Failed to update timestamp for %s", file_path)
def format_discord_links(md: str) -> str:
"""Make links work in Discord.
Discord doesn't support links with titles, so we need to remove them.
Args:
md (str): The Markdown text containing links.
Returns:
str: The modified Markdown text with simplified links.
"""
def repl(match: re.Match[str]) -> str:
url: str | Any = match.group(2)
display: str = re.sub(pattern=r"^https?://(www\.)?", repl="", string=url)
return f"[{display}]({url})"
# Before: [Link](https://example.com "Link")
# After: [Link](https://example.com)
formatted_links_md: str = re.sub(
pattern=r'\[([^\]]+)\]\((https?://[^\s)]+) "\2"\)',
repl=repl,
string=md,
)
return formatted_links_md
def handle_stars(text: str) -> str:
"""Handle stars in the text.
Args:
text (str): The text to process.
Returns:
str: The processed text with stars replaced by headers.
"""
lines: list[str] = text.strip().splitlines()
output: list[str] = []
for line in lines:
line: str = line.strip() # noqa: PLW2901
# Before: ✦ Title ✦
# After: # Title
if line.startswith("") and line.endswith(""):
title: str = line.removeprefix("").removesuffix("").strip()
output.append(f"# {title}")
# Before: **✦ Title ✦**
# After: # Title
elif line.startswith("**✦") and line.endswith("✦**"):
title: str = line.removeprefix("**✦").removesuffix("✦**").strip()
output.append(f"# {title}")
# Before: ✦ Title
# After: * Title
elif line.startswith(""):
title: str = line.removeprefix("").strip()
output.append(f"* {title}")
elif line:
output.append(line)
return "\n\n".join(output)
def generate_atom_feed(articles: list[dict[Any, Any]], file_name: str) -> str: # noqa: PLR0914, PLR0915
"""Generate an Atom feed from a list of articles.
Args:
articles (list[dict[Any, Any]]): The list of articles to include in the feed.
file_name (str): The name of the file to save the feed to.
Returns:
str: The generated Atom feed as a string.
"""
atom_entries: list[str] = []
latest_entry: str = datetime.now(UTC).isoformat()
# Get the latest entry date
if articles:
latest_entry = articles[0].get("createTime", "")
if latest_entry:
latest_entry = datetime.strptime(str(latest_entry), "%Y-%m-%d %H:%M:%S").replace(tzinfo=UTC).isoformat()
for article in articles:
article_id: str = str(article.get("articleId", ""))
# Use stable identifier based on article ID
entry_id: str = (
f"urn:article:{article_id}"
if article_id
else f"urn:wutheringwaves:unknown-article-{hash(article.get('articleTitle', '') + article.get('createTime', ''))}"
)
article_title: str = article.get("articleTitle", "No Title")
article_content: str = article.get("articleContent", str(article_title))
if not article_content:
article_content = article_title
converter: MarkdownConverter = MarkdownConverter(
heading_style="ATX",
strip=["pre", "code"],
)
article_content_converted = str(converter.convert(article_content).strip()) # type: ignore # noqa: PGH003
if not article_content_converted:
msg: str = f"Article content is empty for article ID: {article_id}"
logger.warning(msg)
article_content_converted = "No content available"
# Remove non-breaking spaces
xa0_removed: str = re.sub(r"\xa0", " ", article_content_converted) # Replace non-breaking spaces with regular spaces
# Replace non-breaking spaces with regular spaces
non_breaking_space_removed: str = xa0_removed.replace(
" ", # noqa: RUF001
" ",
)
# Remove code blocks that has only spaces and newlines inside them
empty_code_block_removed: str = re.sub(
pattern=r"```[ \t]*\n[ \t]*\n```",
repl="",
string=non_breaking_space_removed, # type: ignore # noqa: PGH003
)
# [How to Update] should be # How to Update
square_brackets_converted: str = re.sub(
pattern=r"^\s*\[([^\]]+)\]\s*$",
repl=r"# \1",
string=empty_code_block_removed, # type: ignore # noqa: PGH003
flags=re.MULTILINE,
)
stars_converted: str = handle_stars(square_brackets_converted)
# If `● Word` is in the content, replace it `## Word` instead with regex
ball_converted: str = re.sub(pattern=r"\s*(.*?)\n", repl=r"\n\n## \1\n\n", string=stars_converted, flags=re.MULTILINE)
# If `※ Word` is in the content, replace it `* word * ` instead with regex
reference_mark_converted: str = re.sub(
pattern=r"^\s*※\s*(\S.*?)\s*$",
repl=r"\n\n*\1*\n\n",
string=ball_converted,
flags=re.MULTILINE,
)
# Replace circled Unicode numbers (①-⑳) with plain numbered text (e.g., "1. ", "2. ", ..., "20. ")
number_symbol: dict[str, str] = {
"": "1",
"": "2",
"": "3",
"": "4",
"": "5",
"": "6",
"": "7",
"": "8",
"": "9",
"": "10",
}
for symbol, number in number_symbol.items():
reference_mark_converted = re.sub(
pattern=rf"^\s*{re.escape(symbol)}\s*(.*?)\s*$",
repl=rf"\n\n{number}. \1\n\n",
string=reference_mark_converted,
flags=re.MULTILINE,
)
space_before_star_added: str = re.sub(pattern=r"\\\*(.*)", repl=r"* \1", string=reference_mark_converted, flags=re.MULTILINE)
markdown_formatted: str = mdformat.text( # type: ignore # noqa: PGH003
space_before_star_added,
options={
"number": True, # Allow 1., 2., 3. numbering
},
)
links_fixed: str = format_discord_links(markdown_formatted)
article_escaped: Markup = escape(links_fixed)
article_url: str = f"https://wutheringwaves.kurogames.com/en/main/news/detail/{article_id}"
article_create_time: str = article.get("createTime", "")
published: str = ""
updated: str = latest_entry
if article_create_time:
timestamp: datetime = datetime.strptime(str(article_create_time), "%Y-%m-%d %H:%M:%S").replace(tzinfo=UTC)
iso_time: str = timestamp.isoformat()
published = f"<published>{iso_time}</published>"
updated = iso_time
article_category: str = article.get("articleTypeName", "Wuthering Waves")
category: str = f'<category term="{escape(article_category)}"/>' if article_category else ""
html: str = markdown.markdown(
text=article_escaped,
extensions=[
"markdown.extensions.sane_lists",
],
)
atom_entries.append(
f"""
<entry>
<id>{entry_id}</id>
<title>{escape(article_title)}</title>
<link href="{article_url}" rel="alternate" type="text/html"/>
<content type="html"><![CDATA[{html}]]></content>
{published}
<updated>{updated}</updated>
{category}
<author>
<name>Wuthering Waves</name>
<email>wutheringwaves_ensupport@kurogames.com</email>
<uri>https://wutheringwaves.kurogames.com</uri>
</author>
</entry>
""",
)
# If HTML not already saved to /html, save it
html_dir: Path = Path("html")
html_dir.mkdir(exist_ok=True)
html_file: Path = html_dir / f"{article_id}.html"
if not html_file.is_file():
with html_file.open("w", encoding="utf-8") as f:
f.write(str(BeautifulSoup(html, "html.parser").prettify()))
logger.info("Saved HTML for article %s to %s", article_id, html_file)
# Set the file timestamp
if not set_file_timestamp(html_file, article_create_time):
logger.error("Failed to set timestamp for %s", html_file)
# Create the complete Atom feed
atom_feed: str = f"""<?xml version="1.0" encoding="UTF-8"?>
<feed xmlns="http://www.w3.org/2005/Atom">
<title>Wuthering Waves Articles</title>
<link href="https://wutheringwaves.kurogames.com/en/main/news/" rel="alternate" type="text/html"/>
<link href="https://git.lovinator.space/TheLovinator/wutheringwaves/raw/branch/master/{file_name}" rel="self" type="application/atom+xml"/>
<id>urn:wutheringwaves:feed</id>
<updated>{latest_entry}</updated>
<subtitle>Latest articles from Wuthering Waves</subtitle>
<icon>https://git.lovinator.space/TheLovinator/wutheringwaves/raw/branch/master/logo.png</icon>
<logo>https://git.lovinator.space/TheLovinator/wutheringwaves/raw/branch/master/logo.png</logo>
<rights>Copyright © {datetime.now(tz=UTC).year} Wuthering Waves</rights>
<generator uri="https://git.lovinator.space/TheLovinator/wutheringwaves" version="1.0">Python Script</generator>
<author>
<name>Wuthering Waves</name>
<email>wutheringwaves_ensupport@kurogames.com</email>
<uri>https://wutheringwaves.kurogames.com</uri>
</author>
{"".join(atom_entries)}
</feed>
""" # noqa: E501
return atom_feed
def create_atom_feeds(output_dir: Path) -> None:
"""Create Atom feeds for the articles.
Current feeds are:
- Last 10 articles
- All articles
Args:
output_dir (Path): The directory to save the RSS feed files.
"""
menu_data: list[dict[Any, Any]] = []
# Load data from all the articles
for file in output_dir.glob("*.json"):
if file.stem == "ArticleMenu":
continue
with file.open("r", encoding="utf-8") as f:
try:
article_data: dict[Any, Any] = json.load(f)
menu_data.append(article_data)
except json.JSONDecodeError:
logger.exception("Error decoding JSON from %s", file)
continue
if not menu_data:
logger.error("Can't create Atom feeds, no articles found in %s", output_dir)
return
articles_sorted: list[dict[Any, Any]] = sorted(
menu_data,
key=lambda x: get_file_timestamp(x.get("createTime", "")),
reverse=True,
)
# Create the Atom feed for the latest articles
amount_of_articles: int = 20
atom_feed_path: Path = Path("articles_latest.xml")
latest_articles: list[dict[Any, Any]] = articles_sorted[:amount_of_articles]
logger.info("Dates of the last %s articles:", len(latest_articles))
for article in latest_articles:
article_id: str = str(article.get("articleId", ""))
article_create_time: str = article.get("createTime", "")
logger.info("\tArticle ID: %s, Date: %s", article_id, article_create_time)
atom_feed: str = generate_atom_feed(articles=latest_articles, file_name=atom_feed_path.name)
with atom_feed_path.open("w", encoding="utf-8") as f:
f.write(atom_feed)
logger.info("Created Atom feed for the last %s articles: %s", len(latest_articles), atom_feed_path)
# Create the Atom feed for all articles
atom_feed_path_all: Path = Path("articles_all.xml")
atom_feed_all_articles: str = generate_atom_feed(articles=articles_sorted, file_name=atom_feed_path_all.name)
with atom_feed_path_all.open("w", encoding="utf-8") as f:
f.write(atom_feed_all_articles)
logger.info("Created Atom feed for all articles: %s", atom_feed_path_all)
def add_data_to_articles(menu_data: dict[Any, Any], output_dir: Path) -> None:
"""ArticleMenu.json contains data that should be added to the articles.
Fields not in the article JSON:
- articleDesc (Currently empty in ArticleMenu.json)
- createTime
- suggestCover
- top
Args:
menu_data (dict[Any, Any]): The article menu data.
output_dir (Path): Directory containing the article files.
"""
for item in menu_data:
article_id: str = str(item.get("articleId", ""))
if not article_id:
continue
# Check if the article file exists
article_file: Path = output_dir / f"{article_id}.json"
if not article_file.is_file():
logger.warning("Article file %s does not exist, skipping...", article_file)
continue
# Read the existing article data
with article_file.open("r", encoding="utf-8") as f:
try:
article_data: dict[Any, Any] = json.load(f)
except json.JSONDecodeError:
logger.exception("Error decoding JSON from %s", article_file)
continue
old_article_data = article_data.copy()
# Add missing fields from ArticleMenu.json
# Also has sortingMark, but it creates too many diffs
for key in ["articleDesc", "createTime", "suggestCover", "top"]:
if key in item and key not in article_data:
article_data[key] = item[key]
# Save the updated article data if any changes were made
if old_article_data != article_data:
with article_file.open("w", encoding="utf-8") as f:
json.dump(article_data, f, indent=2, ensure_ascii=False)
logger.info("Updated %s with data from ArticleMenu.json", article_file)
async def main() -> Literal[1, 0]:
"""Fetch and save articles from the Wuthering Waves website.
Returns:
Literal[1, 0]: 1 if an error occurred, 0 otherwise.
"""
# Setup
current_time = int(time.time() * 1000) # Current time in milliseconds
base_url = "https://hw-media-cdn-mingchao.kurogame.com/akiwebsite/website2.0/json/G152/en"
article_menu_url: str = f"{base_url}/ArticleMenu.json?t={current_time}"
article_base_url: str = f"{base_url}/article/"
output_dir = Path("articles")
output_dir.mkdir(exist_ok=True)
logger.info("Fetching article menu from %s", article_menu_url)
async with httpx.AsyncClient(timeout=30.0) as client:
# Fetch the article menu
menu_data: dict[Any, Any] | None = await fetch_json(article_menu_url, client)
if not menu_data:
logger.error("Error: Fetched ArticleMenu.json is empty")
return 1
# Save and prettify the menu JSON
menu_file: Path = output_dir / "ArticleMenu.json"
if await save_prettified_json(menu_data, menu_file):
logger.info("Menu JSON saved and prettified to %s", menu_file)
# Extract article IDs
logger.info("Extracting article IDs...")
article_ids: list[str] = [str(item["articleId"]) for item in menu_data if item.get("articleId")]
if not article_ids:
logger.warning("No article IDs found. Please check the JSON structure of ArticleMenu.json.")
logger.warning("Full menu response for debugging:")
logger.warning(json.dumps(menu_data, indent=2))
return 1
# Get list of already downloaded article IDs
existing_files: list[str] = [file.stem for file in output_dir.glob("*.json") if file.stem != "ArticleMenu"]
# Filter out already downloaded articles
new_article_ids: list[str] = [article_id for article_id in article_ids if article_id not in existing_files]
if new_article_ids:
logger.info("Found %s new articles to download", len(new_article_ids))
# Download each new article
download_tasks: list[Coroutine[Any, Any, dict[Any, Any] | None]] = []
for article_id in new_article_ids:
article_url: str = f"{article_base_url}{article_id}.json?t={current_time}"
output_file: Path = output_dir / f"{article_id}.json"
logger.info("Downloading article %s from %s", article_id, article_url)
download_tasks.append(fetch_json(article_url, client))
# Wait for all downloads to complete
results: list[dict[Any, Any] | BaseException | None] = await asyncio.gather(*download_tasks, return_exceptions=True)
# Process the downloaded articles
for i, result in enumerate(results):
article_id: str = new_article_ids[i]
output_file = output_dir / f"{article_id}.json"
if isinstance(result, Exception):
logger.error("Error downloading article %s: %s", article_id, result)
continue
if not result:
logger.warning("Downloaded article %s is empty or invalid", article_id)
continue
# Save the article JSON
if isinstance(result, dict) and await save_prettified_json(result, output_file):
logger.info("Successfully downloaded and prettified %s", output_file)
else:
logger.info("No new articles to download")
add_data_to_articles(menu_data, output_dir)
add_articles_to_readme(menu_data)
create_atom_feeds(output_dir)
batch_process_timestamps(menu_data, output_dir)
logger.info("Script finished. Articles are in the '%s' directory.", output_dir)
return 0
if __name__ == "__main__":
asyncio.run(main())