From 6dfc72d3b0c8ac4499d7f34e85a1af8fceec67f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Hells=C3=A9n?= Date: Mon, 10 Feb 2025 05:17:46 +0100 Subject: [PATCH 01/10] Add discord_rss_bot directory to Dockerfile --- Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/Dockerfile b/Dockerfile index 72714a0..adaf76c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,6 +9,7 @@ COPY --chown=botuser:botuser requirements.txt /home/botuser/discord-rss-bot/ RUN --mount=type=cache,target=/root/.cache/uv \ --mount=type=bind,source=pyproject.toml,target=pyproject.toml \ uv sync --no-install-project +COPY --chown=botuser:botuser discord_rss_bot/ /home/botuser/discord-rss-bot/discord_rss_bot/ EXPOSE 5000 VOLUME ["/home/botuser/.local/share/discord_rss_bot/"] CMD ["uv", "run", "uvicorn", "discord_rss_bot.main:app", "--host=0.0.0.0", "--port=5000", "--proxy-headers", "--forwarded-allow-ips='*'", "--log-level", "debug"] From 8408db9afd6674afe0d4cdb61ad0c3dcf3591644 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Hells=C3=A9n?= Date: Tue, 1 Apr 2025 22:56:54 +0200 Subject: [PATCH 02/10] Enhance YouTube feed display in index.html with username and channel ID formatting --- discord_rss_bot/templates/index.html | 45 ++++++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/discord_rss_bot/templates/index.html b/discord_rss_bot/templates/index.html index 78f0729..3db4a50 100644 --- a/discord_rss_bot/templates/index.html +++ b/discord_rss_bot/templates/index.html @@ -40,7 +40,20 @@ {% set hook_from_feed = feed_webhook["webhook"] %} {% if hook_from_context.url == hook_from_feed %}
- {{ feed.url }} + + {# Display username@youtube for YouTube feeds #} + {% if "youtube.com/feeds/videos.xml" in feed.url %} + {% if "user=" in feed.url %} + {{ feed.url.split("user=")[1] }}@youtube + {% elif "channel_id=" in feed.url %} + {{ feed.title if feed.title else feed.url.split("channel_id=")[1] }}@youtube + {% else %} + {{ feed.url }} + {% endif %} + {% else %} + {{ feed.url }} + {% endif %} + {% if not feed.updates_enabled %}Disabled{% endif %} {% if feed.last_exception %}({{ feed.last_exception.value_str }}){% endif %} @@ -72,7 +85,20 @@
@@ -83,7 +109,20 @@ From 84e39c9f792897eef8477b4729bf8703eab82382 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Hells=C3=A9n?= Date: Tue, 1 Apr 2025 22:58:42 +0200 Subject: [PATCH 03/10] Add .gitattributes to set Jinja as the language for HTML files --- .gitattributes | 1 + 1 file changed, 1 insertion(+) create mode 100644 .gitattributes diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..ccb351b --- /dev/null +++ b/.gitattributes @@ -0,0 +1 @@ +*.html linguist-language=jinja From ac63041b28d1ce87685523a7957c3b3360c8229c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Hells=C3=A9n?= Date: Thu, 3 Apr 2025 05:44:50 +0200 Subject: [PATCH 04/10] =?UTF-8?q?Add=20regex=20support=20to=20blacklist=20?= =?UTF-8?q?and=20whitelist=20filters.=20Strong=20code,=20many=20bananas!?= =?UTF-8?q?=20=F0=9F=A6=8D=F0=9F=A6=8D=F0=9F=A6=8D=F0=9F=A6=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .pre-commit-config.yaml | 2 +- discord_rss_bot/filter/blacklist.py | 85 +++++++++++++++++++---- discord_rss_bot/filter/utils.py | 50 ++++++++++++++ discord_rss_bot/filter/whitelist.py | 71 +++++++++++++++---- discord_rss_bot/main.py | 58 ++++++++++++++-- discord_rss_bot/templates/blacklist.html | 43 ++++++++++++ discord_rss_bot/templates/whitelist.html | 45 ++++++++++++- pyproject.toml | 4 +- tests/test_blacklist.py | 86 ++++++++++++++++++++++++ tests/test_utils.py | 50 +++++++++++++- tests/test_whitelist.py | 71 +++++++++++++++++++ 11 files changed, 526 insertions(+), 39 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a3c42c0..908367d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -38,7 +38,7 @@ repos: # An extremely fast Python linter and formatter. - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.9.5 + rev: v0.11.2 hooks: - id: ruff-format - id: ruff diff --git a/discord_rss_bot/filter/blacklist.py b/discord_rss_bot/filter/blacklist.py index 808d7c9..87b4913 100644 --- a/discord_rss_bot/filter/blacklist.py +++ b/discord_rss_bot/filter/blacklist.py @@ -2,7 +2,7 @@ from __future__ import annotations from typing import TYPE_CHECKING -from discord_rss_bot.filter.utils import is_word_in_text +from discord_rss_bot.filter.utils import is_regex_match, is_word_in_text if TYPE_CHECKING: from reader import Entry, Feed, Reader @@ -12,9 +12,14 @@ def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool: """Return True if the feed has blacklist tags. The following tags are checked: - - blacklist_title + - blacklist_author + - blacklist_content - blacklist_summary - - blacklist_content. + - blacklist_title + - regex_blacklist_author + - regex_blacklist_content + - regex_blacklist_summary + - regex_blacklist_title Args: custom_reader: The reader. @@ -23,14 +28,29 @@ def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool: Returns: bool: If the feed has any of the tags. """ - blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", "")) - blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", "")) - blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", "")) + blacklist_author: str = str(custom_reader.get_tag(feed, "blacklist_author", "")).strip() + blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", "")).strip() + blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", "")).strip() + blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", "")).strip() - return bool(blacklist_title or blacklist_summary or blacklist_content) + regex_blacklist_author: str = str(custom_reader.get_tag(feed, "regex_blacklist_author", "")).strip() + regex_blacklist_content: str = str(custom_reader.get_tag(feed, "regex_blacklist_content", "")).strip() + regex_blacklist_summary: str = str(custom_reader.get_tag(feed, "regex_blacklist_summary", "")).strip() + regex_blacklist_title: str = str(custom_reader.get_tag(feed, "regex_blacklist_title", "")).strip() + + return bool( + blacklist_title + or blacklist_author + or blacklist_content + or blacklist_summary + or regex_blacklist_author + or regex_blacklist_content + or regex_blacklist_summary + or regex_blacklist_title, + ) -def entry_should_be_skipped(custom_reader: Reader, entry: Entry) -> bool: +def entry_should_be_skipped(custom_reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 """Return True if the entry is in the blacklist. Args: @@ -40,21 +60,58 @@ def entry_should_be_skipped(custom_reader: Reader, entry: Entry) -> bool: Returns: bool: If the entry is in the blacklist. """ - blacklist_title: str = str(custom_reader.get_tag(entry.feed, "blacklist_title", "")) - blacklist_summary: str = str(custom_reader.get_tag(entry.feed, "blacklist_summary", "")) - blacklist_content: str = str(custom_reader.get_tag(entry.feed, "blacklist_content", "")) - blacklist_author: str = str(custom_reader.get_tag(entry.feed, "blacklist_author", "")) + feed = entry.feed + + blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", "")).strip() + blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", "")).strip() + blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", "")).strip() + blacklist_author: str = str(custom_reader.get_tag(feed, "blacklist_author", "")).strip() + + regex_blacklist_title: str = str(custom_reader.get_tag(feed, "regex_blacklist_title", "")).strip() + regex_blacklist_summary: str = str(custom_reader.get_tag(feed, "regex_blacklist_summary", "")).strip() + regex_blacklist_content: str = str(custom_reader.get_tag(feed, "regex_blacklist_content", "")).strip() + regex_blacklist_author: str = str(custom_reader.get_tag(feed, "regex_blacklist_author", "")).strip() # TODO(TheLovinator): Also add support for entry_text and more. + # Check regular blacklist if entry.title and blacklist_title and is_word_in_text(blacklist_title, entry.title): return True if entry.summary and blacklist_summary and is_word_in_text(blacklist_summary, entry.summary): return True + if ( + entry.content + and entry.content[0].value + and blacklist_content + and is_word_in_text(blacklist_content, entry.content[0].value) + ): + return True if entry.author and blacklist_author and is_word_in_text(blacklist_author, entry.author): return True + if ( + entry.content + and entry.content[0].value + and blacklist_content + and is_word_in_text(blacklist_content, entry.content[0].value) + ): + return True + + # Check regex blacklist + if entry.title and regex_blacklist_title and is_regex_match(regex_blacklist_title, entry.title): + return True + if entry.summary and regex_blacklist_summary and is_regex_match(regex_blacklist_summary, entry.summary): + return True + if ( + entry.content + and entry.content[0].value + and regex_blacklist_content + and is_regex_match(regex_blacklist_content, entry.content[0].value) + ): + return True + if entry.author and regex_blacklist_author and is_regex_match(regex_blacklist_author, entry.author): + return True return bool( entry.content and entry.content[0].value - and blacklist_content - and is_word_in_text(blacklist_content, entry.content[0].value), + and regex_blacklist_content + and is_regex_match(regex_blacklist_content, entry.content[0].value), ) diff --git a/discord_rss_bot/filter/utils.py b/discord_rss_bot/filter/utils.py index 090518d..ff93e59 100644 --- a/discord_rss_bot/filter/utils.py +++ b/discord_rss_bot/filter/utils.py @@ -1,7 +1,10 @@ from __future__ import annotations +import logging import re +logger: logging.Logger = logging.getLogger(__name__) + def is_word_in_text(word_string: str, text: str) -> bool: """Check if any of the words are in the text. @@ -20,3 +23,50 @@ def is_word_in_text(word_string: str, text: str) -> bool: # Check if any pattern matches the text. return any(pattern.search(text) for pattern in patterns) + + +def is_regex_match(regex_string: str, text: str) -> bool: + """Check if any of the regex patterns match the text. + + Args: + regex_string: A string containing regex patterns, separated by newlines or commas. + text: The text to search in. + + Returns: + bool: True if any regex pattern matches the text, otherwise False. + """ + if not regex_string or not text: + return False + + # Split by newlines first, then by commas (for backward compatibility) + regex_list: list[str] = [] + + # First split by newlines + lines: list[str] = regex_string.split("\n") + for line in lines: + stripped_line: str = line.strip() + if stripped_line: + # For backward compatibility, also split by commas if there are any + if "," in stripped_line: + regex_list.extend([part.strip() for part in stripped_line.split(",") if part.strip()]) + else: + regex_list.append(stripped_line) + + # Attempt to compile and apply each regex pattern + for pattern_str in regex_list: + if not pattern_str: + logger.warning("Empty regex pattern found in the list.") + continue + + try: + pattern: re.Pattern[str] = re.compile(pattern_str, re.IGNORECASE) + if pattern.search(text): + logger.info("Regex pattern matched: %s", pattern_str) + return True + except re.error: + logger.warning("Invalid regex pattern: %s", pattern_str) + continue + + logger.info("No regex patterns matched.") + + return False diff --git a/discord_rss_bot/filter/whitelist.py b/discord_rss_bot/filter/whitelist.py index a55a514..b4b5c23 100644 --- a/discord_rss_bot/filter/whitelist.py +++ b/discord_rss_bot/filter/whitelist.py @@ -2,7 +2,7 @@ from __future__ import annotations from typing import TYPE_CHECKING -from discord_rss_bot.filter.utils import is_word_in_text +from discord_rss_bot.filter.utils import is_regex_match, is_word_in_text if TYPE_CHECKING: from reader import Entry, Feed, Reader @@ -12,9 +12,14 @@ def has_white_tags(custom_reader: Reader, feed: Feed) -> bool: """Return True if the feed has whitelist tags. The following tags are checked: - - whitelist_title + - regex_whitelist_author + - regex_whitelist_content + - regex_whitelist_summary + - regex_whitelist_title + - whitelist_author + - whitelist_content - whitelist_summary - - whitelist_content. + - whitelist_title Args: custom_reader: The reader. @@ -23,14 +28,29 @@ def has_white_tags(custom_reader: Reader, feed: Feed) -> bool: Returns: bool: If the feed has any of the tags. """ - whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")) - whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")) - whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")) + whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")).strip() + whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")).strip() + whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")).strip() + whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", "")).strip() - return bool(whitelist_title or whitelist_summary or whitelist_content) + regex_whitelist_title: str = str(custom_reader.get_tag(feed, "regex_whitelist_title", "")).strip() + regex_whitelist_summary: str = str(custom_reader.get_tag(feed, "regex_whitelist_summary", "")).strip() + regex_whitelist_content: str = str(custom_reader.get_tag(feed, "regex_whitelist_content", "")).strip() + regex_whitelist_author: str = str(custom_reader.get_tag(feed, "regex_whitelist_author", "")).strip() + + return bool( + whitelist_title + or whitelist_author + or whitelist_content + or whitelist_summary + or regex_whitelist_author + or regex_whitelist_content + or regex_whitelist_summary + or regex_whitelist_title, + ) -def should_be_sent(custom_reader: Reader, entry: Entry) -> bool: +def should_be_sent(custom_reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 """Return True if the entry is in the whitelist. Args: @@ -41,20 +61,43 @@ def should_be_sent(custom_reader: Reader, entry: Entry) -> bool: bool: If the entry is in the whitelist. """ feed: Feed = entry.feed - whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")) - whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")) - whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")) - whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", "")) + # Regular whitelist tags + whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")).strip() + whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")).strip() + whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")).strip() + whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", "")).strip() + # Regex whitelist tags + regex_whitelist_title: str = str(custom_reader.get_tag(feed, "regex_whitelist_title", "")).strip() + regex_whitelist_summary: str = str(custom_reader.get_tag(feed, "regex_whitelist_summary", "")).strip() + regex_whitelist_content: str = str(custom_reader.get_tag(feed, "regex_whitelist_content", "")).strip() + regex_whitelist_author: str = str(custom_reader.get_tag(feed, "regex_whitelist_author", "")).strip() + + # Check regular whitelist if entry.title and whitelist_title and is_word_in_text(whitelist_title, entry.title): return True if entry.summary and whitelist_summary and is_word_in_text(whitelist_summary, entry.summary): return True if entry.author and whitelist_author and is_word_in_text(whitelist_author, entry.author): return True - return bool( + if ( entry.content and entry.content[0].value and whitelist_content - and is_word_in_text(whitelist_content, entry.content[0].value), + and is_word_in_text(whitelist_content, entry.content[0].value) + ): + return True + + # Check regex whitelist + if entry.title and regex_whitelist_title and is_regex_match(regex_whitelist_title, entry.title): + return True + if entry.summary and regex_whitelist_summary and is_regex_match(regex_whitelist_summary, entry.summary): + return True + if entry.author and regex_whitelist_author and is_regex_match(regex_whitelist_author, entry.author): + return True + return bool( + entry.content + and entry.content[0].value + and regex_whitelist_content + and is_regex_match(regex_whitelist_content, entry.content[0].value), ) diff --git a/discord_rss_bot/main.py b/discord_rss_bot/main.py index 3a1f0ca..a7c6510 100644 --- a/discord_rss_bot/main.py +++ b/discord_rss_bot/main.py @@ -43,7 +43,7 @@ from discord_rss_bot.search import create_html_for_search_results from discord_rss_bot.settings import get_reader if TYPE_CHECKING: - from collections.abc import Iterable + from collections.abc import AsyncGenerator, Iterable from reader.types import JSONType @@ -88,8 +88,15 @@ reader: Reader = get_reader() @asynccontextmanager -async def lifespan(app: FastAPI) -> typing.AsyncGenerator[None]: - """This is needed for the ASGI server to run.""" +async def lifespan(app: FastAPI) -> AsyncGenerator[None]: + """Lifespan for the FastAPI app. + + Args: + app: The FastAPI app. + + Yields: + None: Nothing. + """ add_missing_tags(reader) scheduler: AsyncIOScheduler = AsyncIOScheduler() @@ -250,6 +257,10 @@ async def post_set_whitelist( whitelist_summary: Annotated[str, Form()] = "", whitelist_content: Annotated[str, Form()] = "", whitelist_author: Annotated[str, Form()] = "", + regex_whitelist_title: Annotated[str, Form()] = "", + regex_whitelist_summary: Annotated[str, Form()] = "", + regex_whitelist_content: Annotated[str, Form()] = "", + regex_whitelist_author: Annotated[str, Form()] = "", feed_url: Annotated[str, Form()] = "", ) -> RedirectResponse: """Set what the whitelist should be sent, if you have this set only words in the whitelist will be sent. @@ -259,6 +270,10 @@ async def post_set_whitelist( whitelist_summary: Whitelisted words for when checking the summary. whitelist_content: Whitelisted words for when checking the content. whitelist_author: Whitelisted words for when checking the author. + regex_whitelist_title: Whitelisted regex for when checking the title. + regex_whitelist_summary: Whitelisted regex for when checking the summary. + regex_whitelist_content: Whitelisted regex for when checking the content. + regex_whitelist_author: Whitelisted regex for when checking the author. feed_url: The feed we should set the whitelist for. Returns: @@ -269,6 +284,10 @@ async def post_set_whitelist( reader.set_tag(clean_feed_url, "whitelist_summary", whitelist_summary) # pyright: ignore[reportArgumentType][call-overload] reader.set_tag(clean_feed_url, "whitelist_content", whitelist_content) # pyright: ignore[reportArgumentType][call-overload] reader.set_tag(clean_feed_url, "whitelist_author", whitelist_author) # pyright: ignore[reportArgumentType][call-overload] + reader.set_tag(clean_feed_url, "regex_whitelist_title", regex_whitelist_title) # pyright: ignore[reportArgumentType][call-overload] + reader.set_tag(clean_feed_url, "regex_whitelist_summary", regex_whitelist_summary) # pyright: ignore[reportArgumentType][call-overload] + reader.set_tag(clean_feed_url, "regex_whitelist_content", regex_whitelist_content) # pyright: ignore[reportArgumentType][call-overload] + reader.set_tag(clean_feed_url, "regex_whitelist_author", regex_whitelist_author) # pyright: ignore[reportArgumentType][call-overload] return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @@ -287,11 +306,14 @@ async def get_whitelist(feed_url: str, request: Request): clean_feed_url: str = feed_url.strip() feed: Feed = reader.get_feed(urllib.parse.unquote(clean_feed_url)) - # Get previous data, this is used when creating the form. whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", "")) whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", "")) whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", "")) whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", "")) + regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", "")) + regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", "")) + regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", "")) + regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", "")) context = { "request": request, @@ -300,6 +322,10 @@ async def get_whitelist(feed_url: str, request: Request): "whitelist_summary": whitelist_summary, "whitelist_content": whitelist_content, "whitelist_author": whitelist_author, + "regex_whitelist_title": regex_whitelist_title, + "regex_whitelist_summary": regex_whitelist_summary, + "regex_whitelist_content": regex_whitelist_content, + "regex_whitelist_author": regex_whitelist_author, } return templates.TemplateResponse(request=request, name="whitelist.html", context=context) @@ -310,6 +336,10 @@ async def post_set_blacklist( blacklist_summary: Annotated[str, Form()] = "", blacklist_content: Annotated[str, Form()] = "", blacklist_author: Annotated[str, Form()] = "", + regex_blacklist_title: Annotated[str, Form()] = "", + regex_blacklist_summary: Annotated[str, Form()] = "", + regex_blacklist_content: Annotated[str, Form()] = "", + regex_blacklist_author: Annotated[str, Form()] = "", feed_url: Annotated[str, Form()] = "", ) -> RedirectResponse: """Set the blacklist. @@ -322,6 +352,10 @@ async def post_set_blacklist( blacklist_summary: Blacklisted words for when checking the summary. blacklist_content: Blacklisted words for when checking the content. blacklist_author: Blacklisted words for when checking the author. + regex_blacklist_title: Blacklisted regex for when checking the title. + regex_blacklist_summary: Blacklisted regex for when checking the summary. + regex_blacklist_content: Blacklisted regex for when checking the content. + regex_blacklist_author: Blacklisted regex for when checking the author. feed_url: What feed we should set the blacklist for. Returns: @@ -332,7 +366,10 @@ async def post_set_blacklist( reader.set_tag(clean_feed_url, "blacklist_summary", blacklist_summary) # pyright: ignore[reportArgumentType][call-overload] reader.set_tag(clean_feed_url, "blacklist_content", blacklist_content) # pyright: ignore[reportArgumentType][call-overload] reader.set_tag(clean_feed_url, "blacklist_author", blacklist_author) # pyright: ignore[reportArgumentType][call-overload] - + reader.set_tag(clean_feed_url, "regex_blacklist_title", regex_blacklist_title) # pyright: ignore[reportArgumentType][call-overload] + reader.set_tag(clean_feed_url, "regex_blacklist_summary", regex_blacklist_summary) # pyright: ignore[reportArgumentType][call-overload] + reader.set_tag(clean_feed_url, "regex_blacklist_content", regex_blacklist_content) # pyright: ignore[reportArgumentType][call-overload] + reader.set_tag(clean_feed_url, "regex_blacklist_author", regex_blacklist_author) # pyright: ignore[reportArgumentType][call-overload] return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @@ -349,11 +386,14 @@ async def get_blacklist(feed_url: str, request: Request): """ feed: Feed = reader.get_feed(urllib.parse.unquote(feed_url)) - # Get previous data, this is used when creating the form. blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", "")) blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", "")) blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", "")) blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", "")) + regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", "")) + regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", "")) + regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", "")) + regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", "")) context = { "request": request, @@ -362,6 +402,10 @@ async def get_blacklist(feed_url: str, request: Request): "blacklist_summary": blacklist_summary, "blacklist_content": blacklist_content, "blacklist_author": blacklist_author, + "regex_blacklist_title": regex_blacklist_title, + "regex_blacklist_summary": regex_blacklist_summary, + "regex_blacklist_content": regex_blacklist_content, + "regex_blacklist_author": regex_blacklist_author, } return templates.TemplateResponse(request=request, name="blacklist.html", context=context) @@ -461,7 +505,7 @@ async def get_embed_page(feed_url: str, request: Request): @app.post("/embed", response_class=HTMLResponse) -async def post_embed( # noqa: PLR0913, PLR0917 +async def post_embed( feed_url: Annotated[str, Form()], title: Annotated[str, Form()] = "", description: Annotated[str, Form()] = "", diff --git a/discord_rss_bot/templates/blacklist.html b/discord_rss_bot/templates/blacklist.html index 3632277..ec16bce 100644 --- a/discord_rss_bot/templates/blacklist.html +++ b/discord_rss_bot/templates/blacklist.html @@ -42,6 +42,49 @@ + +
+
+
    +
  • + Regular expression patterns for advanced filtering. Each pattern should be on a new + line. +
  • +
  • Patterns are case-insensitive.
  • +
  • + Examples: + +
    +^New Release:.*
    +\b(update|version|patch)\s+\d+\.\d+
    +.*\[(important|notice)\].*
    +
    +
    +
  • +
+
+ + + + + + + + + + + +
diff --git a/discord_rss_bot/templates/whitelist.html b/discord_rss_bot/templates/whitelist.html index 5a958f6..61755e2 100644 --- a/discord_rss_bot/templates/whitelist.html +++ b/discord_rss_bot/templates/whitelist.html @@ -1,6 +1,6 @@ {% extends "base.html" %} {% block title %} -| Blacklist +| Whitelist {% endblock title %} {% block content %}
@@ -42,6 +42,49 @@ + +
+
+
    +
  • + Regular expression patterns for advanced filtering. Each pattern should be on a new + line. +
  • +
  • Patterns are case-insensitive.
  • +
  • + Examples: + +
    +^New Release:.*
    +\b(update|version|patch)\s+\d+\.\d+
    +.*\[(important|notice)\].*
    +
    +
    +
  • +
+
+ + + + + + + + + + + +
diff --git a/pyproject.toml b/pyproject.toml index 4cda1f6..21ab35a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,7 @@ platformdirs = "*" python-dotenv = "*" python-multipart = "*" reader = "*" -sentry-sdk = {version = "*", extras = ["fastapi"]} +sentry-sdk = { version = "*", extras = ["fastapi"] } uvicorn = "*" [tool.poetry.group.dev.dependencies] @@ -86,6 +86,8 @@ lint.ignore = [ "PLR6301", # Checks for the presence of unused self parameter in methods definitions. "RUF029", # Checks for functions declared async that do not await or otherwise use features requiring the function to be declared async. "TD003", # Checks that a TODO comment is associated with a link to a relevant issue or ticket. + "PLR0913", # Checks for function definitions that include too many arguments. + "PLR0917", # Checks for function definitions that include too many positional arguments. # Conflicting lint rules when using Ruff's formatter # https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules diff --git a/tests/test_blacklist.py b/tests/test_blacklist.py index 4f5a317..d2a785b 100644 --- a/tests/test_blacklist.py +++ b/tests/test_blacklist.py @@ -39,6 +39,13 @@ def test_has_black_tags() -> None: check_if_has_tag(reader, feed, "blacklist_title") check_if_has_tag(reader, feed, "blacklist_summary") check_if_has_tag(reader, feed, "blacklist_content") + check_if_has_tag(reader, feed, "blacklist_author") + + # Test regex blacklist tags + check_if_has_tag(reader, feed, "regex_blacklist_title") + check_if_has_tag(reader, feed, "regex_blacklist_summary") + check_if_has_tag(reader, feed, "regex_blacklist_content") + check_if_has_tag(reader, feed, "regex_blacklist_author") # Clean up reader.delete_feed(feed_url) @@ -74,6 +81,7 @@ def test_should_be_skipped() -> None: # Test entry without any blacklists assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}" + # Test standard blacklist functionality reader.set_tag(feed, "blacklist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] assert entry_should_be_skipped(reader, first_entry[0]) is True, f"Entry should be skipped: {first_entry[0]}" reader.delete_tag(feed, "blacklist_title") @@ -113,3 +121,81 @@ def test_should_be_skipped() -> None: assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}" reader.delete_tag(feed, "blacklist_author") assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}" + + +def test_regex_should_be_skipped() -> None: + """Test the regex filtering functionality for blacklist.""" + reader: Reader = get_reader() + + # Add feed and update entries + reader.add_feed(feed_url) + feed: Feed = reader.get_feed(feed_url) + reader.update_feeds() + + # Get first entry + first_entry: list[Entry] = [] + entries: Iterable[Entry] = reader.get_entries(feed=feed) + assert entries is not None, f"Entries should not be None: {entries}" + for entry in entries: + first_entry.append(entry) + break + assert len(first_entry) == 1, f"First entry should be added: {first_entry}" + + # Test entry without any regex blacklists + assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}" + + # Test regex blacklist for title + reader.set_tag(feed, "regex_blacklist_title", r"fvnnn\w+") # pyright: ignore[reportArgumentType] + assert entry_should_be_skipped(reader, first_entry[0]) is True, ( + f"Entry should be skipped with regex title match: {first_entry[0]}" + ) + reader.delete_tag(feed, "regex_blacklist_title") + assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}" + + # Test regex blacklist for summary + reader.set_tag(feed, "regex_blacklist_summary", r"ffdnfdn\w+") # pyright: ignore[reportArgumentType] + assert entry_should_be_skipped(reader, first_entry[0]) is True, ( + f"Entry should be skipped with regex summary match: {first_entry[0]}" + ) + reader.delete_tag(feed, "regex_blacklist_summary") + assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}" + + # Test regex blacklist for content + reader.set_tag(feed, "regex_blacklist_content", r"ffdnfdnfdn\w+") # pyright: ignore[reportArgumentType] + assert entry_should_be_skipped(reader, first_entry[0]) is True, ( + f"Entry should be skipped with regex content match: {first_entry[0]}" + ) + reader.delete_tag(feed, "regex_blacklist_content") + assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}" + + # Test regex blacklist for author + reader.set_tag(feed, "regex_blacklist_author", r"TheLovinator\d*") # pyright: ignore[reportArgumentType] + assert entry_should_be_skipped(reader, first_entry[0]) is True, ( + f"Entry should be skipped with regex author match: {first_entry[0]}" + ) + reader.delete_tag(feed, "regex_blacklist_author") + assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}" + + # Test invalid regex pattern (should not raise an exception) + reader.set_tag(feed, "regex_blacklist_title", r"[incomplete") # pyright: ignore[reportArgumentType] + assert entry_should_be_skipped(reader, first_entry[0]) is False, ( + f"Entry should not be skipped with invalid regex: {first_entry[0]}" + ) + reader.delete_tag(feed, "regex_blacklist_title") + + # Test multiple regex patterns separated by commas + reader.set_tag(feed, "regex_blacklist_author", r"pattern1,TheLovinator\d*,pattern3") # pyright: ignore[reportArgumentType] + assert entry_should_be_skipped(reader, first_entry[0]) is True, ( + f"Entry should be skipped with one matching pattern in list: {first_entry[0]}" + ) + reader.delete_tag(feed, "regex_blacklist_author") + assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}" + + # Test newline-separated regex patterns + newline_patterns = "pattern1\nTheLovinator\\d*\npattern3" + reader.set_tag(feed, "regex_blacklist_author", newline_patterns) # pyright: ignore[reportArgumentType] + assert entry_should_be_skipped(reader, first_entry[0]) is True, ( + f"Entry should be skipped with newline-separated patterns: {first_entry[0]}" + ) + reader.delete_tag(feed, "regex_blacklist_author") + assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}" diff --git a/tests/test_utils.py b/tests/test_utils.py index 0bccb6b..5274eb8 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,6 +1,6 @@ from __future__ import annotations -from discord_rss_bot.filter.utils import is_word_in_text +from discord_rss_bot.filter.utils import is_regex_match, is_word_in_text def test_is_word_in_text() -> None: @@ -14,3 +14,51 @@ def test_is_word_in_text() -> None: assert is_word_in_text("Alert,Forma", "Outbreak - Mutagen Mass - Rhea (Saturn)") is False, msg_false assert is_word_in_text("Alert,Forma", "Outbreak - Mutagen Mass - Rhea (Saturn)") is False, msg_false assert is_word_in_text("word1,word2", "This is a sample text containing none of the words.") is False, msg_false + + +def test_is_regex_match() -> None: + msg_true = "Should return True" + msg_false = "Should return False" + + # Test basic regex patterns + assert is_regex_match(r"word\d+", "This text contains word123") is True, msg_true + assert is_regex_match(r"^Hello", "Hello world") is True, msg_true + assert is_regex_match(r"world$", "Hello world") is True, msg_true + + # Test case insensitivity + assert is_regex_match(r"hello", "This text contains HELLO") is True, msg_true + + # Test comma-separated patterns + assert is_regex_match(r"pattern1,pattern2", "This contains pattern2") is True, msg_true + assert is_regex_match(r"pattern1, pattern2", "This contains pattern1") is True, msg_true + + # Test regex that shouldn't match + assert is_regex_match(r"^start", "This doesn't start with the pattern") is False, msg_false + assert is_regex_match(r"end$", "This doesn't end with the pattern") is False, msg_false + + # Test with empty input + assert is_regex_match("", "Some text") is False, msg_false + assert is_regex_match("pattern", "") is False, msg_false + + # Test with invalid regex (should not raise an exception and return False) + assert is_regex_match(r"[incomplete", "Some text") is False, msg_false + + # Test with multiple patterns where one is invalid + assert is_regex_match(r"valid, [invalid, \w+", "Contains word") is True, msg_true + + # Test newline-separated patterns + newline_patterns = "pattern1\n^start\ncontains\\d+" + assert is_regex_match(newline_patterns, "This contains123 text") is True, msg_true + assert is_regex_match(newline_patterns, "start of line") is True, msg_true + assert is_regex_match(newline_patterns, "pattern1 is here") is True, msg_true + assert is_regex_match(newline_patterns, "None of these match") is False, msg_false + + # Test mixed newline and comma patterns (for backward compatibility) + mixed_patterns = "pattern1\npattern2,pattern3\npattern4" + assert is_regex_match(mixed_patterns, "Contains pattern3") is True, msg_true + assert is_regex_match(mixed_patterns, "Contains pattern4") is True, msg_true + + # Test with empty lines and spaces + whitespace_patterns = "\\s+\n \n\npattern\n\n" + assert is_regex_match(whitespace_patterns, "text with spaces") is True, msg_true + assert is_regex_match(whitespace_patterns, "text with pattern") is True, msg_true diff --git a/tests/test_whitelist.py b/tests/test_whitelist.py index cf39aa0..9fbb712 100644 --- a/tests/test_whitelist.py +++ b/tests/test_whitelist.py @@ -38,6 +38,13 @@ def test_has_white_tags() -> None: check_if_has_tag(reader, feed, "whitelist_title") check_if_has_tag(reader, feed, "whitelist_summary") check_if_has_tag(reader, feed, "whitelist_content") + check_if_has_tag(reader, feed, "whitelist_author") + + # Test regex whitelist tags + check_if_has_tag(reader, feed, "regex_whitelist_title") + check_if_has_tag(reader, feed, "regex_whitelist_summary") + check_if_has_tag(reader, feed, "regex_whitelist_content") + check_if_has_tag(reader, feed, "regex_whitelist_author") # Clean up reader.delete_feed(feed_url) @@ -109,3 +116,67 @@ def test_should_be_sent() -> None: assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent" reader.delete_tag(feed, "whitelist_author") assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent" + + +def test_regex_should_be_sent() -> None: + """Test the regex filtering functionality for whitelist.""" + reader: Reader = get_reader() + + # Add feed and update entries + reader.add_feed(feed_url) + feed: Feed = reader.get_feed(feed_url) + reader.update_feeds() + + # Get first entry + first_entry: list[Entry] = [] + entries: Iterable[Entry] = reader.get_entries(feed=feed) + assert entries is not None, "Entries should not be None" + for entry in entries: + first_entry.append(entry) + break + assert len(first_entry) == 1, "First entry should be added" + + # Test entry without any regex whitelists + assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent" + + # Test regex whitelist for title + reader.set_tag(feed, "regex_whitelist_title", r"fvnnn\w+") # pyright: ignore[reportArgumentType] + assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with regex title match" + reader.delete_tag(feed, "regex_whitelist_title") + assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent" + + # Test regex whitelist for summary + reader.set_tag(feed, "regex_whitelist_summary", r"ffdnfdn\w+") # pyright: ignore[reportArgumentType] + assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with regex summary match" + reader.delete_tag(feed, "regex_whitelist_summary") + assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent" + + # Test regex whitelist for content + reader.set_tag(feed, "regex_whitelist_content", r"ffdnfdnfdn\w+") # pyright: ignore[reportArgumentType] + assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with regex content match" + reader.delete_tag(feed, "regex_whitelist_content") + assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent" + + # Test regex whitelist for author + reader.set_tag(feed, "regex_whitelist_author", r"TheLovinator\d*") # pyright: ignore[reportArgumentType] + assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with regex author match" + reader.delete_tag(feed, "regex_whitelist_author") + assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent" + + # Test invalid regex pattern (should not raise an exception) + reader.set_tag(feed, "regex_whitelist_title", r"[incomplete") # pyright: ignore[reportArgumentType] + assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent with invalid regex" + reader.delete_tag(feed, "regex_whitelist_title") + + # Test multiple regex patterns separated by commas + reader.set_tag(feed, "regex_whitelist_author", r"pattern1,TheLovinator\d*,pattern3") # pyright: ignore[reportArgumentType] + assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with one matching pattern in list" + reader.delete_tag(feed, "regex_whitelist_author") + assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent" + + # Test newline-separated regex patterns + newline_patterns = "pattern1\nTheLovinator\\d*\npattern3" + reader.set_tag(feed, "regex_whitelist_author", newline_patterns) # pyright: ignore[reportArgumentType] + assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with newline-separated patterns" + reader.delete_tag(feed, "regex_whitelist_author") + assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent" From 97d06ddb434cb87d9f9ae4a447985b153fcdbc59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Hells=C3=A9n?= Date: Thu, 3 Apr 2025 06:20:01 +0200 Subject: [PATCH 05/10] =?UTF-8?q?Embed=20YouTube=20videos=20in=20/feed=20H?= =?UTF-8?q?TML.=20Strong=20code,=20many=20bananas!=20=F0=9F=A6=8D?= =?UTF-8?q?=F0=9F=A6=8D=F0=9F=A6=8D=F0=9F=A6=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- discord_rss_bot/feeds.py | 20 +++++ discord_rss_bot/main.py | 45 +++++++++++ discord_rss_bot/templates/feed.html | 4 + tests/test_feeds.py | 119 +++++++++++++++++++++++++++- 4 files changed, 187 insertions(+), 1 deletion(-) diff --git a/discord_rss_bot/feeds.py b/discord_rss_bot/feeds.py index ccb0a14..46c6e50 100644 --- a/discord_rss_bot/feeds.py +++ b/discord_rss_bot/feeds.py @@ -67,6 +67,10 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url) should_send_embed = True + # YouTube feeds should never use embeds + if is_youtube_feed(entry.feed.url): + should_send_embed = False + if should_send_embed: webhook = create_embed_webhook(webhook_url, entry) else: @@ -295,6 +299,18 @@ def execute_webhook(webhook: DiscordWebhook, entry: Entry) -> None: logger.info("Sent entry to Discord: %s", entry.id) +def is_youtube_feed(feed_url: str) -> bool: + """Check if the feed is a YouTube feed. + + Args: + feed_url: The feed URL to check. + + Returns: + bool: True if the feed is a YouTube feed, False otherwise. + """ + return "youtube.com/feeds/videos.xml" in feed_url + + def should_send_embed_check(reader: Reader, entry: Entry) -> bool: """Check if we should send an embed to Discord. @@ -305,6 +321,10 @@ def should_send_embed_check(reader: Reader, entry: Entry) -> bool: Returns: bool: True if we should send an embed, False otherwise. """ + # YouTube feeds should never use embeds - only links + if is_youtube_feed(entry.feed.url): + return False + try: should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed")) except TagNotFoundError: diff --git a/discord_rss_bot/main.py b/discord_rss_bot/main.py index a7c6510..00349ac 100644 --- a/discord_rss_bot/main.py +++ b/discord_rss_bot/main.py @@ -732,6 +732,27 @@ def create_html_for_feed(entries: Iterable[Entry]) -> str: entry_id: str = urllib.parse.quote(entry.id) to_discord_html: str = f"Send to Discord" + + # Check if this is a YouTube feed entry and the entry has a link + is_youtube_feed = "youtube.com/feeds/videos.xml" in entry.feed.url + video_embed_html = "" + + if is_youtube_feed and entry.link: + # Extract the video ID and create an embed if possible + video_id: str | None = extract_youtube_video_id(entry.link) + if video_id: + video_embed_html: str = f""" +
+ +
+ """ + # Don't use the first image if we have a video embed + first_image = "" + image_html: str = f"" if first_image else "" html += f"""
@@ -739,6 +760,7 @@ def create_html_for_feed(entries: Iterable[Entry]) -> str: {f"By {entry.author} @" if entry.author else ""}{published} - {to_discord_html} {text} +{video_embed_html} {image_html}
""" @@ -991,6 +1013,29 @@ def modify_webhook(old_hook: Annotated[str, Form()], new_hook: Annotated[str, Fo return RedirectResponse(url="/webhooks", status_code=303) +def extract_youtube_video_id(url: str) -> str | None: + """Extract YouTube video ID from a YouTube video URL. + + Args: + url: The YouTube video URL. + + Returns: + The video ID if found, None otherwise. + """ + if not url: + return None + + # Handle standard YouTube URLs (youtube.com/watch?v=VIDEO_ID) + if "youtube.com/watch" in url and "v=" in url: + return url.split("v=")[1].split("&")[0] + + # Handle shortened YouTube URLs (youtu.be/VIDEO_ID) + if "youtu.be/" in url: + return url.split("youtu.be/")[1].split("?")[0] + + return None + + if __name__ == "__main__": sentry_sdk.init( dsn="https://6e77a0d7acb9c7ea22e85a375e0ff1f4@o4505228040339456.ingest.us.sentry.io/4508792887967744", diff --git a/discord_rss_bot/templates/feed.html b/discord_rss_bot/templates/feed.html index 5dd85c0..ce983ff 100644 --- a/discord_rss_bot/templates/feed.html +++ b/discord_rss_bot/templates/feed.html @@ -43,6 +43,7 @@ {% endif %} + {% if not "youtube.com/feeds/videos.xml" in feed.url %} {% if should_send_embed %}
{% endif %} + {% endif %} @@ -65,9 +67,11 @@ Customize message {% if not should_send_embed %}(Currently active){% endif %} + {% if not "youtube.com/feeds/videos.xml" in feed.url %} Customize embed {% if should_send_embed %}(Currently active){% endif %} + {% endif %} diff --git a/tests/test_feeds.py b/tests/test_feeds.py index e6e1381..037711b 100644 --- a/tests/test_feeds.py +++ b/tests/test_feeds.py @@ -4,11 +4,18 @@ import os import tempfile from pathlib import Path from typing import LiteralString +from unittest.mock import MagicMock, patch import pytest from reader import Feed, Reader, make_reader -from discord_rss_bot.feeds import send_to_discord, truncate_webhook_message +from discord_rss_bot.feeds import ( + is_youtube_feed, + send_entry_to_discord, + send_to_discord, + should_send_embed_check, + truncate_webhook_message, +) from discord_rss_bot.missing_tags import add_missing_tags @@ -85,3 +92,113 @@ def test_truncate_webhook_message_long_message(): # Test the end of the message assert_msg = "The end of the truncated message should be '...' to indicate truncation." assert truncated_message[-half_length:] == "A" * half_length, assert_msg + + +def test_is_youtube_feed(): + """Test the is_youtube_feed function.""" + # YouTube feed URLs + assert is_youtube_feed("https://www.youtube.com/feeds/videos.xml?channel_id=123456") is True + assert is_youtube_feed("https://www.youtube.com/feeds/videos.xml?user=username") is True + + # Non-YouTube feed URLs + assert is_youtube_feed("https://www.example.com/feed.xml") is False + assert is_youtube_feed("https://www.youtube.com/watch?v=123456") is False + assert is_youtube_feed("https://www.reddit.com/r/Python/.rss") is False + + +@patch("discord_rss_bot.feeds.logger") +def test_should_send_embed_check_youtube_feeds(mock_logger: MagicMock) -> None: + """Test should_send_embed_check returns False for YouTube feeds regardless of settings.""" + # Create mocks + mock_reader = MagicMock() + mock_entry = MagicMock() + + # Configure a YouTube feed + mock_entry.feed.url = "https://www.youtube.com/feeds/videos.xml?channel_id=123456" + + # Set reader to return True for should_send_embed (would normally create an embed) + mock_reader.get_tag.return_value = True + + # Result should be False, overriding the feed settings + result = should_send_embed_check(mock_reader, mock_entry) + assert result is False, "YouTube feeds should never use embeds" + + # Function should not even call get_tag for YouTube feeds + mock_reader.get_tag.assert_not_called() + + +@patch("discord_rss_bot.feeds.logger") +def test_should_send_embed_check_normal_feeds(mock_logger: MagicMock) -> None: + """Test should_send_embed_check returns feed settings for non-YouTube feeds.""" + # Create mocks + mock_reader = MagicMock() + mock_entry = MagicMock() + + # Configure a normal feed + mock_entry.feed.url = "https://www.example.com/feed.xml" + + # Test with should_send_embed set to True + mock_reader.get_tag.return_value = True + result = should_send_embed_check(mock_reader, mock_entry) + assert result is True, "Normal feeds should use embeds when enabled" + + # Test with should_send_embed set to False + mock_reader.get_tag.return_value = False + result = should_send_embed_check(mock_reader, mock_entry) + assert result is False, "Normal feeds should not use embeds when disabled" + + +@patch("discord_rss_bot.feeds.get_reader") +@patch("discord_rss_bot.feeds.get_custom_message") +@patch("discord_rss_bot.feeds.replace_tags_in_text_message") +@patch("discord_rss_bot.feeds.create_embed_webhook") +@patch("discord_rss_bot.feeds.DiscordWebhook") +@patch("discord_rss_bot.feeds.execute_webhook") +def test_send_entry_to_discord_youtube_feed( + mock_execute_webhook: MagicMock, + mock_discord_webhook: MagicMock, + mock_create_embed: MagicMock, + mock_replace_tags: MagicMock, + mock_get_custom_message: MagicMock, + mock_get_reader: MagicMock, +): + """Test send_entry_to_discord function with YouTube feeds.""" + # Set up mocks + mock_reader = MagicMock() + mock_get_reader.return_value = mock_reader + mock_entry = MagicMock() + mock_feed = MagicMock() + + # Configure a YouTube feed + mock_entry.feed = mock_feed + mock_entry.feed.url = "https://www.youtube.com/feeds/videos.xml?channel_id=123456" + mock_entry.feed_url = "https://www.youtube.com/feeds/videos.xml?channel_id=123456" + + # Mock the tags + mock_reader.get_tag.side_effect = lambda feed, tag, default=None: { # noqa: ARG005 + "webhook": "https://discord.com/api/webhooks/123/abc", + "should_send_embed": True, # This should be ignored for YouTube feeds + }.get(tag, default) + + # Mock custom message + mock_get_custom_message.return_value = "Custom message" + mock_replace_tags.return_value = "Formatted message with {{entry_link}}" + + # Mock webhook + mock_webhook = MagicMock() + mock_discord_webhook.return_value = mock_webhook + + # Call the function + send_entry_to_discord(mock_entry) + + # Assertions + mock_create_embed.assert_not_called() + mock_discord_webhook.assert_called_once() + + # Check webhook was created with the right message + webhook_call_kwargs = mock_discord_webhook.call_args[1] + assert "content" in webhook_call_kwargs, "Webhook should have content" + assert webhook_call_kwargs["url"] == "https://discord.com/api/webhooks/123/abc" + + # Verify execute_webhook was called + mock_execute_webhook.assert_called_once_with(mock_webhook, mock_entry) From 8b50003edaac9c9550dfee0a1cb0a2254e3ccade Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Hells=C3=A9n?= Date: Thu, 3 Apr 2025 16:47:53 +0200 Subject: [PATCH 06/10] Group feeds by domain --- discord_rss_bot/feeds.py | 53 ++++++++++++++++++ discord_rss_bot/main.py | 5 +- discord_rss_bot/templates/index.html | 81 ++++++++++++++++++---------- tests/test_feeds.py | 55 +++++++++++++++++++ tests/test_main.py | 10 ++-- 5 files changed, 168 insertions(+), 36 deletions(-) diff --git a/discord_rss_bot/feeds.py b/discord_rss_bot/feeds.py index 46c6e50..83ac2fd 100644 --- a/discord_rss_bot/feeds.py +++ b/discord_rss_bot/feeds.py @@ -3,7 +3,9 @@ from __future__ import annotations import datetime import logging import pprint +import re from typing import TYPE_CHECKING +from urllib.parse import ParseResult, urlparse from discord_webhook import DiscordEmbed, DiscordWebhook from fastapi import HTTPException @@ -29,6 +31,57 @@ if TYPE_CHECKING: logger: logging.Logger = logging.getLogger(__name__) +def extract_domain(url: str) -> str: # noqa: PLR0911 + """Extract the domain name from a URL. + + Args: + url: The URL to extract the domain from. + + Returns: + str: The domain name, formatted for display. + """ + # Check for empty URL first + if not url: + return "Other" + + try: + # Special handling for YouTube feeds + if "youtube.com/feeds/videos.xml" in url: + return "YouTube" + + # Special handling for Reddit feeds + if "reddit.com" in url or (".rss" in url and "r/" in url): + return "Reddit" + + # Parse the URL and extract the domain + parsed_url: ParseResult = urlparse(url) + domain: str = parsed_url.netloc + + # If we couldn't extract a domain, return "Other" + if not domain: + return "Other" + + # Remove www. prefix if present + domain = re.sub(r"^www\.", "", domain) + + # Special handling for common domains + domain_mapping: dict[str, str] = {"github.com": "GitHub"} + + if domain in domain_mapping: + return domain_mapping[domain] + + # For other domains, capitalize the first part before the TLD + parts: list[str] = domain.split(".") + min_domain_parts = 2 + if len(parts) >= min_domain_parts: + return parts[0].capitalize() + + return domain.capitalize() + except (ValueError, AttributeError, TypeError) as e: + logger.warning("Error extracting domain from %s: %s", url, e) + return "Other" + + def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> str | None: """Send a single entry to Discord. diff --git a/discord_rss_bot/main.py b/discord_rss_bot/main.py index 00349ac..7ae706f 100644 --- a/discord_rss_bot/main.py +++ b/discord_rss_bot/main.py @@ -37,7 +37,7 @@ from discord_rss_bot.custom_message import ( replace_tags_in_text_message, save_embed, ) -from discord_rss_bot.feeds import create_feed, send_entry_to_discord, send_to_discord +from discord_rss_bot.feeds import create_feed, extract_domain, send_entry_to_discord, send_to_discord from discord_rss_bot.missing_tags import add_missing_tags from discord_rss_bot.search import create_html_for_search_results from discord_rss_bot.settings import get_reader @@ -875,11 +875,12 @@ def make_context_index(request: Request): broken_feeds = [] feeds_without_attached_webhook = [] + # Get all feeds and organize them feeds: Iterable[Feed] = reader.get_feeds() for feed in feeds: try: webhook = reader.get_tag(feed.url, "webhook") - feed_list.append({"feed": feed, "webhook": webhook}) + feed_list.append({"feed": feed, "webhook": webhook, "domain": extract_domain(feed.url)}) except TagNotFoundError: broken_feeds.append(feed) continue diff --git a/discord_rss_bot/templates/index.html b/discord_rss_bot/templates/index.html index 3db4a50..f9dfc0d 100644 --- a/discord_rss_bot/templates/index.html +++ b/discord_rss_bot/templates/index.html @@ -28,45 +28,66 @@ {{ entry_count.averages[2]|round(1) }})

- + + {% for hook_from_context in webhooks %} -
-

+
+

{{ hook_from_context.name }}

- +
+ +
+
+ {% endfor %} + {% else %} +

No feeds associated with this webhook.

+ {% endif %}

{% endfor %} {% else %}

Hello there!
+
You need to add a webhook here to get started. After that, you can add feeds here. You can find both of these links in the navigation bar above. @@ -79,6 +100,7 @@ Thanks!

{% endif %} + {% if broken_feeds %}
@@ -103,6 +125,7 @@
{% endif %} + {% if feeds_without_attached_webhook %}
diff --git a/tests/test_feeds.py b/tests/test_feeds.py index 037711b..8fa6c4b 100644 --- a/tests/test_feeds.py +++ b/tests/test_feeds.py @@ -10,6 +10,7 @@ import pytest from reader import Feed, Reader, make_reader from discord_rss_bot.feeds import ( + extract_domain, is_youtube_feed, send_entry_to_discord, send_to_discord, @@ -202,3 +203,57 @@ def test_send_entry_to_discord_youtube_feed( # Verify execute_webhook was called mock_execute_webhook.assert_called_once_with(mock_webhook, mock_entry) + + +def test_extract_domain_youtube_feed() -> None: + """Test extract_domain for YouTube feeds.""" + url: str = "https://www.youtube.com/feeds/videos.xml?channel_id=123456" + assert extract_domain(url) == "YouTube", "YouTube feeds should return 'YouTube' as the domain." + + +def test_extract_domain_reddit_feed() -> None: + """Test extract_domain for Reddit feeds.""" + url: str = "https://www.reddit.com/r/Python/.rss" + assert extract_domain(url) == "Reddit", "Reddit feeds should return 'Reddit' as the domain." + + +def test_extract_domain_github_feed() -> None: + """Test extract_domain for GitHub feeds.""" + url: str = "https://www.github.com/user/repo" + assert extract_domain(url) == "GitHub", "GitHub feeds should return 'GitHub' as the domain." + + +def test_extract_domain_custom_domain() -> None: + """Test extract_domain for custom domains.""" + url: str = "https://www.example.com/feed" + assert extract_domain(url) == "Example", "Custom domains should return the capitalized first part of the domain." + + +def test_extract_domain_no_www_prefix() -> None: + """Test extract_domain removes 'www.' prefix.""" + url: str = "https://www.example.com/feed" + assert extract_domain(url) == "Example", "The 'www.' prefix should be removed from the domain." + + +def test_extract_domain_no_tld() -> None: + """Test extract_domain for domains without a TLD.""" + url: str = "https://localhost/feed" + assert extract_domain(url) == "Localhost", "Domains without a TLD should return the capitalized domain." + + +def test_extract_domain_invalid_url() -> None: + """Test extract_domain for invalid URLs.""" + url: str = "not-a-valid-url" + assert extract_domain(url) == "Other", "Invalid URLs should return 'Other' as the domain." + + +def test_extract_domain_empty_url() -> None: + """Test extract_domain for empty URLs.""" + url: str = "" + assert extract_domain(url) == "Other", "Empty URLs should return 'Other' as the domain." + + +def test_extract_domain_special_characters() -> None: + """Test extract_domain for URLs with special characters.""" + url: str = "https://www.ex-ample.com/feed" + assert extract_domain(url) == "Ex-ample", "Domains with special characters should return the capitalized domain." diff --git a/tests/test_main.py b/tests/test_main.py index 59bd109..c86901f 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -45,7 +45,7 @@ def test_search() -> None: # Check that the feed was added. response = client.get(url="/") assert response.status_code == 200, f"Failed to get /: {response.text}" - assert feed_url in response.text, f"Feed not found in /: {response.text}" + assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}" # Search for an entry. response: Response = client.get(url="/search/?query=a") @@ -85,7 +85,7 @@ def test_create_feed() -> None: # Check that the feed was added. response = client.get(url="/") assert response.status_code == 200, f"Failed to get /: {response.text}" - assert feed_url in response.text, f"Feed not found in /: {response.text}" + assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}" def test_get() -> None: @@ -103,7 +103,7 @@ def test_get() -> None: # Check that the feed was added. response = client.get("/") assert response.status_code == 200, f"Failed to get /: {response.text}" - assert feed_url in response.text, f"Feed not found in /: {response.text}" + assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}" response: Response = client.get(url="/add") assert response.status_code == 200, f"/add failed: {response.text}" @@ -157,7 +157,7 @@ def test_pause_feed() -> None: # Check that the feed was paused. response = client.get(url="/") assert response.status_code == 200, f"Failed to get /: {response.text}" - assert feed_url in response.text, f"Feed not found in /: {response.text}" + assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}" def test_unpause_feed() -> None: @@ -184,7 +184,7 @@ def test_unpause_feed() -> None: # Check that the feed was unpaused. response = client.get(url="/") assert response.status_code == 200, f"Failed to get /: {response.text}" - assert feed_url in response.text, f"Feed not found in /: {response.text}" + assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}" def test_remove_feed() -> None: From cd0f63d59a99224a915c23112b7bcf777011cfb7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Hells=C3=A9n?= Date: Wed, 16 Apr 2025 13:32:31 +0200 Subject: [PATCH 07/10] Add tldextract for improved domain extraction and add new tests for extract_domain function --- discord_rss_bot/feeds.py | 11 +++++------ pyproject.toml | 1 + tests/test_feeds.py | 19 +++++++++++++++++++ 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/discord_rss_bot/feeds.py b/discord_rss_bot/feeds.py index 83ac2fd..7852b0d 100644 --- a/discord_rss_bot/feeds.py +++ b/discord_rss_bot/feeds.py @@ -7,6 +7,7 @@ import re from typing import TYPE_CHECKING from urllib.parse import ParseResult, urlparse +import tldextract from discord_webhook import DiscordEmbed, DiscordWebhook from fastapi import HTTPException from reader import Entry, EntryNotFoundError, Feed, FeedExistsError, Reader, ReaderError, StorageError, TagNotFoundError @@ -70,12 +71,10 @@ def extract_domain(url: str) -> str: # noqa: PLR0911 if domain in domain_mapping: return domain_mapping[domain] - # For other domains, capitalize the first part before the TLD - parts: list[str] = domain.split(".") - min_domain_parts = 2 - if len(parts) >= min_domain_parts: - return parts[0].capitalize() - + # Use tldextract to get the domain (SLD) + ext = tldextract.extract(url) + if ext.domain: + return ext.domain.capitalize() return domain.capitalize() except (ValueError, AttributeError, TypeError) as e: logger.warning("Error extracting domain from %s: %s", url, e) diff --git a/pyproject.toml b/pyproject.toml index 21ab35a..f5758e7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,7 @@ dependencies = [ "python-multipart", "reader", "sentry-sdk[fastapi]", + "tldextract", "uvicorn", ] diff --git a/tests/test_feeds.py b/tests/test_feeds.py index 8fa6c4b..2b3a2b4 100644 --- a/tests/test_feeds.py +++ b/tests/test_feeds.py @@ -257,3 +257,22 @@ def test_extract_domain_special_characters() -> None: """Test extract_domain for URLs with special characters.""" url: str = "https://www.ex-ample.com/feed" assert extract_domain(url) == "Ex-ample", "Domains with special characters should return the capitalized domain." + + +@pytest.mark.parametrize( + argnames=("url", "expected"), + argvalues=[ + ("https://blog.something.com", "Something"), + ("https://www.something.com", "Something"), + ("https://subdomain.example.co.uk", "Example"), + ("https://github.com/user/repo", "GitHub"), + ("https://youtube.com/feeds/videos.xml?channel_id=abc", "YouTube"), + ("https://reddit.com/r/python/.rss", "Reddit"), + ("", "Other"), + ("not a url", "Other"), + ("https://www.example.com", "Example"), + ("https://foo.bar.baz.com", "Baz"), + ], +) +def test_extract_domain(url: str, expected: str) -> None: + assert extract_domain(url) == expected From e33b331564732b9b32f89651dd933db31a5dcc18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Hells=C3=A9n?= Date: Wed, 16 Apr 2025 13:33:18 +0200 Subject: [PATCH 08/10] Update ruff-pre-commit to version 0.11.5 --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 908367d..867131e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -38,7 +38,7 @@ repos: # An extremely fast Python linter and formatter. - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.11.2 + rev: v0.11.5 hooks: - id: ruff-format - id: ruff From 544ef6dca3820a65c3d61e1c19a07f29e720f068 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Hells=C3=A9n?= Date: Sat, 3 May 2025 19:42:20 +0200 Subject: [PATCH 09/10] Update ruff-pre-commit to version 0.11.8 --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 867131e..aca9273 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -38,7 +38,7 @@ repos: # An extremely fast Python linter and formatter. - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.11.5 + rev: v0.11.8 hooks: - id: ruff-format - id: ruff From ffd6f2f9f25150079635035e879c0e75a4b88586 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Hells=C3=A9n?= Date: Sun, 4 May 2025 03:48:22 +0200 Subject: [PATCH 10/10] Add Hoyolab API integration --- .vscode/settings.json | 2 + README.md | 16 ++- discord_rss_bot/custom_message.py | 13 +- discord_rss_bot/feeds.py | 28 ++++- discord_rss_bot/hoyolab_api.py | 193 ++++++++++++++++++++++++++++++ tests/test_hoyolab_api.py | 39 ++++++ 6 files changed, 276 insertions(+), 15 deletions(-) create mode 100644 discord_rss_bot/hoyolab_api.py create mode 100644 tests/test_hoyolab_api.py diff --git a/.vscode/settings.json b/.vscode/settings.json index f929fff..85832f8 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -2,6 +2,8 @@ "cSpell.words": [ "botuser", "Genshins", + "healthcheck", + "Hoyolab", "levelname", "Lovinator", "markdownified", diff --git a/README.md b/README.md index 849fb98..8232dea 100644 --- a/README.md +++ b/README.md @@ -2,8 +2,20 @@ Subscribe to RSS feeds and get updates to a Discord webhook. -> [!NOTE] -> You should look at [MonitoRSS](https://github.com/synzen/monitorss) for a more feature-rich project. +## Features + +- Subscribe to RSS feeds and get updates to a Discord webhook. +- Web interface to manage subscriptions. +- Customizable message format for each feed. +- Choose between Discord embed or plain text. +- Regex filters for RSS feeds. +- Blacklist/whitelist words in the title/description/author/etc. +- Gets extra information from APIs if available, currently for: + - [https://feeds.c3kay.de/](https://feeds.c3kay.de/) + - Genshin Impact News + - Honkai Impact 3rd News + - Honkai Starrail News + - Zenless Zone Zero News ## Installation diff --git a/discord_rss_bot/custom_message.py b/discord_rss_bot/custom_message.py index 9cb03e5..d3ca74d 100644 --- a/discord_rss_bot/custom_message.py +++ b/discord_rss_bot/custom_message.py @@ -152,14 +152,7 @@ def get_first_image(summary: str | None, content: str | None) -> str: logger.warning("Invalid URL: %s", src) continue - # Genshins first image is a divider, so we ignore it. - # https://hyl-static-res-prod.hoyolab.com/divider_config/PC/line_3.png - skip_images: list[str] = [ - "https://img-os-static.hoyolab.com/divider_config/", - "https://hyl-static-res-prod.hoyolab.com/divider_config/", - ] - if not str(image.attrs["src"]).startswith(tuple(skip_images)): - return str(image.attrs["src"]) + return str(image.attrs["src"]) if summary and (images := BeautifulSoup(summary, features="lxml").find_all("img")): for image in images: if not isinstance(image, Tag) or "src" not in image.attrs: @@ -170,9 +163,7 @@ def get_first_image(summary: str | None, content: str | None) -> str: logger.warning("Invalid URL: %s", image.attrs["src"]) continue - # Genshins first image is a divider, so we ignore it. - if not str(image.attrs["src"]).startswith("https://img-os-static.hoyolab.com/divider_config"): - return str(image.attrs["src"]) + return str(image.attrs["src"]) return "" diff --git a/discord_rss_bot/feeds.py b/discord_rss_bot/feeds.py index 7852b0d..90350b0 100644 --- a/discord_rss_bot/feeds.py +++ b/discord_rss_bot/feeds.py @@ -4,7 +4,7 @@ import datetime import logging import pprint import re -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from urllib.parse import ParseResult, urlparse import tldextract @@ -20,6 +20,12 @@ from discord_rss_bot.custom_message import ( ) from discord_rss_bot.filter.blacklist import entry_should_be_skipped from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent +from discord_rss_bot.hoyolab_api import ( + create_hoyolab_webhook, + extract_post_id_from_hoyolab_url, + fetch_hoyolab_post, + is_c3kay_feed, +) from discord_rss_bot.is_url_valid import is_url_valid from discord_rss_bot.missing_tags import add_missing_tags from discord_rss_bot.settings import default_custom_message, get_reader @@ -81,7 +87,7 @@ def extract_domain(url: str) -> str: # noqa: PLR0911 return "Other" -def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> str | None: +def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> str | None: # noqa: PLR0912 """Send a single entry to Discord. Args: @@ -99,6 +105,24 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> if not webhook_url: return "No webhook URL found." + # Check if this is a c3kay feed + if is_c3kay_feed(entry.feed.url): + entry_link: str | None = entry.link + if entry_link: + post_id: str | None = extract_post_id_from_hoyolab_url(entry_link) + if post_id: + post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id) + if post_data: + webhook = create_hoyolab_webhook(webhook_url, entry, post_data) + execute_webhook(webhook, entry) + return None + logger.warning( + "Failed to create Hoyolab webhook for feed %s, falling back to regular processing", + entry.feed.url, + ) + else: + logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url) + webhook_message: str = "" # Try to get the custom message for the feed. If the user has none, we will use the default message. diff --git a/discord_rss_bot/hoyolab_api.py b/discord_rss_bot/hoyolab_api.py new file mode 100644 index 0000000..cb1ed71 --- /dev/null +++ b/discord_rss_bot/hoyolab_api.py @@ -0,0 +1,193 @@ +from __future__ import annotations + +import contextlib +import json +import logging +import re +from typing import TYPE_CHECKING, Any + +import requests +from discord_webhook import DiscordEmbed, DiscordWebhook + +if TYPE_CHECKING: + from reader import Entry + + +logger: logging.Logger = logging.getLogger(__name__) + + +def is_c3kay_feed(feed_url: str) -> bool: + """Check if the feed is from c3kay.de. + + Args: + feed_url: The feed URL to check. + + Returns: + bool: True if the feed is from c3kay.de, False otherwise. + """ + return "feeds.c3kay.de" in feed_url + + +def extract_post_id_from_hoyolab_url(url: str) -> str | None: + """Extract the post ID from a Hoyolab URL. + + Args: + url: The Hoyolab URL to extract the post ID from. + For example: https://www.hoyolab.com/article/38588239 + + Returns: + str | None: The post ID if found, None otherwise. + """ + try: + match: re.Match[str] | None = re.search(r"/article/(\d+)", url) + if match: + return match.group(1) + except (ValueError, AttributeError, TypeError) as e: + logger.warning("Error extracting post ID from Hoyolab URL %s: %s", url, e) + + return None + + +def fetch_hoyolab_post(post_id: str) -> dict[str, Any] | None: + """Fetch post data from the Hoyolab API. + + Args: + post_id: The post ID to fetch. + + Returns: + dict[str, Any] | None: The post data if successful, None otherwise. + """ + if not post_id: + return None + + http_ok = 200 + try: + url: str = f"https://bbs-api-os.hoyolab.com/community/post/wapi/getPostFull?post_id={post_id}" + response: requests.Response = requests.get(url, timeout=10) + + if response.status_code == http_ok: + data: dict[str, Any] = response.json() + if data.get("retcode") == 0 and "data" in data and "post" in data["data"]: + return data["data"]["post"] + + logger.warning("Failed to fetch Hoyolab post %s: %s", post_id, response.text) + except (requests.RequestException, ValueError): + logger.exception("Error fetching Hoyolab post %s", post_id) + + return None + + +def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: dict[str, Any]) -> DiscordWebhook: # noqa: C901, PLR0912, PLR0914, PLR0915 + """Create a webhook with data from the Hoyolab API. + + Args: + webhook_url: The webhook URL. + entry: The entry to send to Discord. + post_data: The post data from the Hoyolab API. + + Returns: + DiscordWebhook: The webhook with the embed. + """ + entry_link: str = entry.link or entry.feed.url + webhook = DiscordWebhook(url=webhook_url, rate_limit_retry=True) + + # Extract relevant data from the post + post: dict[str, Any] = post_data.get("post", {}) + subject: str = post.get("subject", "") + content: str = post.get("content", "{}") + + logger.debug("Post subject: %s", subject) + logger.debug("Post content: %s", content) + + content_data: dict[str, str] = {} + with contextlib.suppress(json.JSONDecodeError, ValueError): + content_data = json.loads(content) + + logger.debug("Content data: %s", content_data) + + description: str = content_data.get("describe", "") + if not description: + description = post.get("desc", "") + + # Create the embed + discord_embed = DiscordEmbed() + + # Set title and description + discord_embed.set_title(subject) + discord_embed.set_url(entry_link) + + # Get post.image_list + image_list: list[dict[str, Any]] = post_data.get("image_list", []) + if image_list: + image_url: str = str(image_list[0].get("url", "")) + image_height: int = int(image_list[0].get("height", 1080)) + image_width: int = int(image_list[0].get("width", 1920)) + + logger.debug("Image URL: %s, Height: %s, Width: %s", image_url, image_height, image_width) + discord_embed.set_image(url=image_url, height=image_height, width=image_width) + + video: dict[str, str | int | bool] = post_data.get("video", {}) + if video and video.get("url"): + video_url: str = str(video.get("url", "")) + logger.debug("Video URL: %s", video_url) + with contextlib.suppress(requests.RequestException): + video_response: requests.Response = requests.get(video_url, stream=True, timeout=10) + if video_response.ok: + webhook.add_file( + file=video_response.content, + filename=f"{entry.id}.mp4", + ) + + game = post_data.get("game", {}) + + if game and game.get("color"): + game_color = str(game.get("color", "")) + discord_embed.set_color(game_color.removeprefix("#")) + + user: dict[str, str | int | bool] = post_data.get("user", {}) + author_name: str = str(user.get("nickname", "")) + avatar_url: str = str(user.get("avatar_url", "")) + if author_name: + webhook.avatar_url = avatar_url + webhook.username = author_name + + classification = post_data.get("classification", {}) + + if classification and classification.get("name"): + footer = str(classification.get("name", "")) + discord_embed.set_footer(text=footer) + + webhook.add_embed(discord_embed) + + # Only show Youtube URL if available + structured_content: str = post.get("structured_content", "") + if structured_content: # noqa: PLR1702 + try: + structured_content_data: list[dict[str, Any]] = json.loads(structured_content) + for item in structured_content_data: + if item.get("insert") and isinstance(item["insert"], dict): + video_url: str = str(item["insert"].get("video", "")) + if video_url: + video_id_match: re.Match[str] | None = re.search(r"embed/([a-zA-Z0-9_-]+)", video_url) + if video_id_match: + video_id: str = video_id_match.group(1) + logger.debug("Video ID: %s", video_id) + webhook.content = f"https://www.youtube.com/watch?v={video_id}" + webhook.remove_embeds() + + except (json.JSONDecodeError, ValueError) as e: + logger.warning("Error parsing structured content: %s", e) + + event_start_date: str = post.get("event_start_date", "") + if event_start_date and event_start_date != "0": + discord_embed.add_embed_field(name="Start", value=f"") + + event_end_date: str = post.get("event_end_date", "") + if event_end_date and event_end_date != "0": + discord_embed.add_embed_field(name="End", value=f"") + + created_at: str = post.get("created_at", "") + if created_at and created_at != "0": + discord_embed.set_timestamp(timestamp=created_at) + + return webhook diff --git a/tests/test_hoyolab_api.py b/tests/test_hoyolab_api.py new file mode 100644 index 0000000..60c83ae --- /dev/null +++ b/tests/test_hoyolab_api.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from discord_rss_bot.hoyolab_api import extract_post_id_from_hoyolab_url + + +class TestExtractPostIdFromHoyolabUrl: + def test_extract_post_id_from_article_url(self) -> None: + """Test extracting post ID from a direct article URL.""" + test_cases: list[str] = [ + "https://www.hoyolab.com/article/38588239", + "http://hoyolab.com/article/12345", + "https://www.hoyolab.com/article/987654321/comments", + ] + + expected_ids: list[str] = ["38588239", "12345", "987654321"] + + for url, expected_id in zip(test_cases, expected_ids, strict=False): + assert extract_post_id_from_hoyolab_url(url) == expected_id + + def test_url_without_post_id(self) -> None: + """Test with a URL that doesn't have a post ID.""" + test_cases: list[str] = [ + "https://www.hoyolab.com/community", + ] + + for url in test_cases: + assert extract_post_id_from_hoyolab_url(url) is None + + def test_edge_cases(self) -> None: + """Test edge cases like None, empty string, and malformed URLs.""" + test_cases: list[str | None] = [ + None, + "", + "not_a_url", + "http:/", # Malformed URL + ] + + for url in test_cases: + assert extract_post_id_from_hoyolab_url(url) is None # type: ignore