Compare commits
10 Commits
2bc2bc008b
...
ffd6f2f9f2
Author | SHA1 | Date | |
---|---|---|---|
ffd6f2f9f2
|
|||
544ef6dca3
|
|||
e33b331564
|
|||
cd0f63d59a
|
|||
8b50003eda
|
|||
97d06ddb43
|
|||
ac63041b28
|
|||
84e39c9f79
|
|||
8408db9afd
|
|||
6dfc72d3b0
|
1
.gitattributes
vendored
Normal file
1
.gitattributes
vendored
Normal file
@ -0,0 +1 @@
|
||||
*.html linguist-language=jinja
|
@ -38,7 +38,7 @@ repos:
|
||||
|
||||
# An extremely fast Python linter and formatter.
|
||||
- repo: https://github.com/astral-sh/ruff-pre-commit
|
||||
rev: v0.9.5
|
||||
rev: v0.11.8
|
||||
hooks:
|
||||
- id: ruff-format
|
||||
- id: ruff
|
||||
|
2
.vscode/settings.json
vendored
2
.vscode/settings.json
vendored
@ -2,6 +2,8 @@
|
||||
"cSpell.words": [
|
||||
"botuser",
|
||||
"Genshins",
|
||||
"healthcheck",
|
||||
"Hoyolab",
|
||||
"levelname",
|
||||
"Lovinator",
|
||||
"markdownified",
|
||||
|
@ -9,6 +9,7 @@ COPY --chown=botuser:botuser requirements.txt /home/botuser/discord-rss-bot/
|
||||
RUN --mount=type=cache,target=/root/.cache/uv \
|
||||
--mount=type=bind,source=pyproject.toml,target=pyproject.toml \
|
||||
uv sync --no-install-project
|
||||
COPY --chown=botuser:botuser discord_rss_bot/ /home/botuser/discord-rss-bot/discord_rss_bot/
|
||||
EXPOSE 5000
|
||||
VOLUME ["/home/botuser/.local/share/discord_rss_bot/"]
|
||||
CMD ["uv", "run", "uvicorn", "discord_rss_bot.main:app", "--host=0.0.0.0", "--port=5000", "--proxy-headers", "--forwarded-allow-ips='*'", "--log-level", "debug"]
|
||||
|
16
README.md
16
README.md
@ -2,8 +2,20 @@
|
||||
|
||||
Subscribe to RSS feeds and get updates to a Discord webhook.
|
||||
|
||||
> [!NOTE]
|
||||
> You should look at [MonitoRSS](https://github.com/synzen/monitorss) for a more feature-rich project.
|
||||
## Features
|
||||
|
||||
- Subscribe to RSS feeds and get updates to a Discord webhook.
|
||||
- Web interface to manage subscriptions.
|
||||
- Customizable message format for each feed.
|
||||
- Choose between Discord embed or plain text.
|
||||
- Regex filters for RSS feeds.
|
||||
- Blacklist/whitelist words in the title/description/author/etc.
|
||||
- Gets extra information from APIs if available, currently for:
|
||||
- [https://feeds.c3kay.de/](https://feeds.c3kay.de/)
|
||||
- Genshin Impact News
|
||||
- Honkai Impact 3rd News
|
||||
- Honkai Starrail News
|
||||
- Zenless Zone Zero News
|
||||
|
||||
## Installation
|
||||
|
||||
|
@ -152,14 +152,7 @@ def get_first_image(summary: str | None, content: str | None) -> str:
|
||||
logger.warning("Invalid URL: %s", src)
|
||||
continue
|
||||
|
||||
# Genshins first image is a divider, so we ignore it.
|
||||
# https://hyl-static-res-prod.hoyolab.com/divider_config/PC/line_3.png
|
||||
skip_images: list[str] = [
|
||||
"https://img-os-static.hoyolab.com/divider_config/",
|
||||
"https://hyl-static-res-prod.hoyolab.com/divider_config/",
|
||||
]
|
||||
if not str(image.attrs["src"]).startswith(tuple(skip_images)):
|
||||
return str(image.attrs["src"])
|
||||
return str(image.attrs["src"])
|
||||
if summary and (images := BeautifulSoup(summary, features="lxml").find_all("img")):
|
||||
for image in images:
|
||||
if not isinstance(image, Tag) or "src" not in image.attrs:
|
||||
@ -170,9 +163,7 @@ def get_first_image(summary: str | None, content: str | None) -> str:
|
||||
logger.warning("Invalid URL: %s", image.attrs["src"])
|
||||
continue
|
||||
|
||||
# Genshins first image is a divider, so we ignore it.
|
||||
if not str(image.attrs["src"]).startswith("https://img-os-static.hoyolab.com/divider_config"):
|
||||
return str(image.attrs["src"])
|
||||
return str(image.attrs["src"])
|
||||
return ""
|
||||
|
||||
|
||||
|
@ -3,8 +3,11 @@ from __future__ import annotations
|
||||
import datetime
|
||||
import logging
|
||||
import pprint
|
||||
from typing import TYPE_CHECKING
|
||||
import re
|
||||
from typing import TYPE_CHECKING, Any
|
||||
from urllib.parse import ParseResult, urlparse
|
||||
|
||||
import tldextract
|
||||
from discord_webhook import DiscordEmbed, DiscordWebhook
|
||||
from fastapi import HTTPException
|
||||
from reader import Entry, EntryNotFoundError, Feed, FeedExistsError, Reader, ReaderError, StorageError, TagNotFoundError
|
||||
@ -17,6 +20,12 @@ from discord_rss_bot.custom_message import (
|
||||
)
|
||||
from discord_rss_bot.filter.blacklist import entry_should_be_skipped
|
||||
from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent
|
||||
from discord_rss_bot.hoyolab_api import (
|
||||
create_hoyolab_webhook,
|
||||
extract_post_id_from_hoyolab_url,
|
||||
fetch_hoyolab_post,
|
||||
is_c3kay_feed,
|
||||
)
|
||||
from discord_rss_bot.is_url_valid import is_url_valid
|
||||
from discord_rss_bot.missing_tags import add_missing_tags
|
||||
from discord_rss_bot.settings import default_custom_message, get_reader
|
||||
@ -29,7 +38,56 @@ if TYPE_CHECKING:
|
||||
logger: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> str | None:
|
||||
def extract_domain(url: str) -> str: # noqa: PLR0911
|
||||
"""Extract the domain name from a URL.
|
||||
|
||||
Args:
|
||||
url: The URL to extract the domain from.
|
||||
|
||||
Returns:
|
||||
str: The domain name, formatted for display.
|
||||
"""
|
||||
# Check for empty URL first
|
||||
if not url:
|
||||
return "Other"
|
||||
|
||||
try:
|
||||
# Special handling for YouTube feeds
|
||||
if "youtube.com/feeds/videos.xml" in url:
|
||||
return "YouTube"
|
||||
|
||||
# Special handling for Reddit feeds
|
||||
if "reddit.com" in url or (".rss" in url and "r/" in url):
|
||||
return "Reddit"
|
||||
|
||||
# Parse the URL and extract the domain
|
||||
parsed_url: ParseResult = urlparse(url)
|
||||
domain: str = parsed_url.netloc
|
||||
|
||||
# If we couldn't extract a domain, return "Other"
|
||||
if not domain:
|
||||
return "Other"
|
||||
|
||||
# Remove www. prefix if present
|
||||
domain = re.sub(r"^www\.", "", domain)
|
||||
|
||||
# Special handling for common domains
|
||||
domain_mapping: dict[str, str] = {"github.com": "GitHub"}
|
||||
|
||||
if domain in domain_mapping:
|
||||
return domain_mapping[domain]
|
||||
|
||||
# Use tldextract to get the domain (SLD)
|
||||
ext = tldextract.extract(url)
|
||||
if ext.domain:
|
||||
return ext.domain.capitalize()
|
||||
return domain.capitalize()
|
||||
except (ValueError, AttributeError, TypeError) as e:
|
||||
logger.warning("Error extracting domain from %s: %s", url, e)
|
||||
return "Other"
|
||||
|
||||
|
||||
def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> str | None: # noqa: PLR0912
|
||||
"""Send a single entry to Discord.
|
||||
|
||||
Args:
|
||||
@ -47,6 +105,24 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) ->
|
||||
if not webhook_url:
|
||||
return "No webhook URL found."
|
||||
|
||||
# Check if this is a c3kay feed
|
||||
if is_c3kay_feed(entry.feed.url):
|
||||
entry_link: str | None = entry.link
|
||||
if entry_link:
|
||||
post_id: str | None = extract_post_id_from_hoyolab_url(entry_link)
|
||||
if post_id:
|
||||
post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id)
|
||||
if post_data:
|
||||
webhook = create_hoyolab_webhook(webhook_url, entry, post_data)
|
||||
execute_webhook(webhook, entry)
|
||||
return None
|
||||
logger.warning(
|
||||
"Failed to create Hoyolab webhook for feed %s, falling back to regular processing",
|
||||
entry.feed.url,
|
||||
)
|
||||
else:
|
||||
logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url)
|
||||
|
||||
webhook_message: str = ""
|
||||
|
||||
# Try to get the custom message for the feed. If the user has none, we will use the default message.
|
||||
@ -67,6 +143,10 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) ->
|
||||
logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url)
|
||||
should_send_embed = True
|
||||
|
||||
# YouTube feeds should never use embeds
|
||||
if is_youtube_feed(entry.feed.url):
|
||||
should_send_embed = False
|
||||
|
||||
if should_send_embed:
|
||||
webhook = create_embed_webhook(webhook_url, entry)
|
||||
else:
|
||||
@ -295,6 +375,18 @@ def execute_webhook(webhook: DiscordWebhook, entry: Entry) -> None:
|
||||
logger.info("Sent entry to Discord: %s", entry.id)
|
||||
|
||||
|
||||
def is_youtube_feed(feed_url: str) -> bool:
|
||||
"""Check if the feed is a YouTube feed.
|
||||
|
||||
Args:
|
||||
feed_url: The feed URL to check.
|
||||
|
||||
Returns:
|
||||
bool: True if the feed is a YouTube feed, False otherwise.
|
||||
"""
|
||||
return "youtube.com/feeds/videos.xml" in feed_url
|
||||
|
||||
|
||||
def should_send_embed_check(reader: Reader, entry: Entry) -> bool:
|
||||
"""Check if we should send an embed to Discord.
|
||||
|
||||
@ -305,6 +397,10 @@ def should_send_embed_check(reader: Reader, entry: Entry) -> bool:
|
||||
Returns:
|
||||
bool: True if we should send an embed, False otherwise.
|
||||
"""
|
||||
# YouTube feeds should never use embeds - only links
|
||||
if is_youtube_feed(entry.feed.url):
|
||||
return False
|
||||
|
||||
try:
|
||||
should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed"))
|
||||
except TagNotFoundError:
|
||||
|
@ -2,7 +2,7 @@ from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from discord_rss_bot.filter.utils import is_word_in_text
|
||||
from discord_rss_bot.filter.utils import is_regex_match, is_word_in_text
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from reader import Entry, Feed, Reader
|
||||
@ -12,9 +12,14 @@ def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool:
|
||||
"""Return True if the feed has blacklist tags.
|
||||
|
||||
The following tags are checked:
|
||||
- blacklist_title
|
||||
- blacklist_author
|
||||
- blacklist_content
|
||||
- blacklist_summary
|
||||
- blacklist_content.
|
||||
- blacklist_title
|
||||
- regex_blacklist_author
|
||||
- regex_blacklist_content
|
||||
- regex_blacklist_summary
|
||||
- regex_blacklist_title
|
||||
|
||||
Args:
|
||||
custom_reader: The reader.
|
||||
@ -23,14 +28,29 @@ def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool:
|
||||
Returns:
|
||||
bool: If the feed has any of the tags.
|
||||
"""
|
||||
blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", ""))
|
||||
blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", ""))
|
||||
blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", ""))
|
||||
blacklist_author: str = str(custom_reader.get_tag(feed, "blacklist_author", "")).strip()
|
||||
blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", "")).strip()
|
||||
blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", "")).strip()
|
||||
blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", "")).strip()
|
||||
|
||||
return bool(blacklist_title or blacklist_summary or blacklist_content)
|
||||
regex_blacklist_author: str = str(custom_reader.get_tag(feed, "regex_blacklist_author", "")).strip()
|
||||
regex_blacklist_content: str = str(custom_reader.get_tag(feed, "regex_blacklist_content", "")).strip()
|
||||
regex_blacklist_summary: str = str(custom_reader.get_tag(feed, "regex_blacklist_summary", "")).strip()
|
||||
regex_blacklist_title: str = str(custom_reader.get_tag(feed, "regex_blacklist_title", "")).strip()
|
||||
|
||||
return bool(
|
||||
blacklist_title
|
||||
or blacklist_author
|
||||
or blacklist_content
|
||||
or blacklist_summary
|
||||
or regex_blacklist_author
|
||||
or regex_blacklist_content
|
||||
or regex_blacklist_summary
|
||||
or regex_blacklist_title,
|
||||
)
|
||||
|
||||
|
||||
def entry_should_be_skipped(custom_reader: Reader, entry: Entry) -> bool:
|
||||
def entry_should_be_skipped(custom_reader: Reader, entry: Entry) -> bool: # noqa: PLR0911
|
||||
"""Return True if the entry is in the blacklist.
|
||||
|
||||
Args:
|
||||
@ -40,21 +60,58 @@ def entry_should_be_skipped(custom_reader: Reader, entry: Entry) -> bool:
|
||||
Returns:
|
||||
bool: If the entry is in the blacklist.
|
||||
"""
|
||||
blacklist_title: str = str(custom_reader.get_tag(entry.feed, "blacklist_title", ""))
|
||||
blacklist_summary: str = str(custom_reader.get_tag(entry.feed, "blacklist_summary", ""))
|
||||
blacklist_content: str = str(custom_reader.get_tag(entry.feed, "blacklist_content", ""))
|
||||
blacklist_author: str = str(custom_reader.get_tag(entry.feed, "blacklist_author", ""))
|
||||
feed = entry.feed
|
||||
|
||||
blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", "")).strip()
|
||||
blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", "")).strip()
|
||||
blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", "")).strip()
|
||||
blacklist_author: str = str(custom_reader.get_tag(feed, "blacklist_author", "")).strip()
|
||||
|
||||
regex_blacklist_title: str = str(custom_reader.get_tag(feed, "regex_blacklist_title", "")).strip()
|
||||
regex_blacklist_summary: str = str(custom_reader.get_tag(feed, "regex_blacklist_summary", "")).strip()
|
||||
regex_blacklist_content: str = str(custom_reader.get_tag(feed, "regex_blacklist_content", "")).strip()
|
||||
regex_blacklist_author: str = str(custom_reader.get_tag(feed, "regex_blacklist_author", "")).strip()
|
||||
# TODO(TheLovinator): Also add support for entry_text and more.
|
||||
|
||||
# Check regular blacklist
|
||||
if entry.title and blacklist_title and is_word_in_text(blacklist_title, entry.title):
|
||||
return True
|
||||
if entry.summary and blacklist_summary and is_word_in_text(blacklist_summary, entry.summary):
|
||||
return True
|
||||
if (
|
||||
entry.content
|
||||
and entry.content[0].value
|
||||
and blacklist_content
|
||||
and is_word_in_text(blacklist_content, entry.content[0].value)
|
||||
):
|
||||
return True
|
||||
if entry.author and blacklist_author and is_word_in_text(blacklist_author, entry.author):
|
||||
return True
|
||||
if (
|
||||
entry.content
|
||||
and entry.content[0].value
|
||||
and blacklist_content
|
||||
and is_word_in_text(blacklist_content, entry.content[0].value)
|
||||
):
|
||||
return True
|
||||
|
||||
# Check regex blacklist
|
||||
if entry.title and regex_blacklist_title and is_regex_match(regex_blacklist_title, entry.title):
|
||||
return True
|
||||
if entry.summary and regex_blacklist_summary and is_regex_match(regex_blacklist_summary, entry.summary):
|
||||
return True
|
||||
if (
|
||||
entry.content
|
||||
and entry.content[0].value
|
||||
and regex_blacklist_content
|
||||
and is_regex_match(regex_blacklist_content, entry.content[0].value)
|
||||
):
|
||||
return True
|
||||
if entry.author and regex_blacklist_author and is_regex_match(regex_blacklist_author, entry.author):
|
||||
return True
|
||||
return bool(
|
||||
entry.content
|
||||
and entry.content[0].value
|
||||
and blacklist_content
|
||||
and is_word_in_text(blacklist_content, entry.content[0].value),
|
||||
and regex_blacklist_content
|
||||
and is_regex_match(regex_blacklist_content, entry.content[0].value),
|
||||
)
|
||||
|
@ -1,7 +1,10 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
|
||||
logger: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def is_word_in_text(word_string: str, text: str) -> bool:
|
||||
"""Check if any of the words are in the text.
|
||||
@ -20,3 +23,50 @@ def is_word_in_text(word_string: str, text: str) -> bool:
|
||||
|
||||
# Check if any pattern matches the text.
|
||||
return any(pattern.search(text) for pattern in patterns)
|
||||
|
||||
|
||||
def is_regex_match(regex_string: str, text: str) -> bool:
|
||||
"""Check if any of the regex patterns match the text.
|
||||
|
||||
Args:
|
||||
regex_string: A string containing regex patterns, separated by newlines or commas.
|
||||
text: The text to search in.
|
||||
|
||||
Returns:
|
||||
bool: True if any regex pattern matches the text, otherwise False.
|
||||
"""
|
||||
if not regex_string or not text:
|
||||
return False
|
||||
|
||||
# Split by newlines first, then by commas (for backward compatibility)
|
||||
regex_list: list[str] = []
|
||||
|
||||
# First split by newlines
|
||||
lines: list[str] = regex_string.split("\n")
|
||||
for line in lines:
|
||||
stripped_line: str = line.strip()
|
||||
if stripped_line:
|
||||
# For backward compatibility, also split by commas if there are any
|
||||
if "," in stripped_line:
|
||||
regex_list.extend([part.strip() for part in stripped_line.split(",") if part.strip()])
|
||||
else:
|
||||
regex_list.append(stripped_line)
|
||||
|
||||
# Attempt to compile and apply each regex pattern
|
||||
for pattern_str in regex_list:
|
||||
if not pattern_str:
|
||||
logger.warning("Empty regex pattern found in the list.")
|
||||
continue
|
||||
|
||||
try:
|
||||
pattern: re.Pattern[str] = re.compile(pattern_str, re.IGNORECASE)
|
||||
if pattern.search(text):
|
||||
logger.info("Regex pattern matched: %s", pattern_str)
|
||||
return True
|
||||
except re.error:
|
||||
logger.warning("Invalid regex pattern: %s", pattern_str)
|
||||
continue
|
||||
|
||||
logger.info("No regex patterns matched.")
|
||||
|
||||
return False
|
||||
|
@ -2,7 +2,7 @@ from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from discord_rss_bot.filter.utils import is_word_in_text
|
||||
from discord_rss_bot.filter.utils import is_regex_match, is_word_in_text
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from reader import Entry, Feed, Reader
|
||||
@ -12,9 +12,14 @@ def has_white_tags(custom_reader: Reader, feed: Feed) -> bool:
|
||||
"""Return True if the feed has whitelist tags.
|
||||
|
||||
The following tags are checked:
|
||||
- whitelist_title
|
||||
- regex_whitelist_author
|
||||
- regex_whitelist_content
|
||||
- regex_whitelist_summary
|
||||
- regex_whitelist_title
|
||||
- whitelist_author
|
||||
- whitelist_content
|
||||
- whitelist_summary
|
||||
- whitelist_content.
|
||||
- whitelist_title
|
||||
|
||||
Args:
|
||||
custom_reader: The reader.
|
||||
@ -23,14 +28,29 @@ def has_white_tags(custom_reader: Reader, feed: Feed) -> bool:
|
||||
Returns:
|
||||
bool: If the feed has any of the tags.
|
||||
"""
|
||||
whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", ""))
|
||||
whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", ""))
|
||||
whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", ""))
|
||||
whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")).strip()
|
||||
whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")).strip()
|
||||
whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")).strip()
|
||||
whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", "")).strip()
|
||||
|
||||
return bool(whitelist_title or whitelist_summary or whitelist_content)
|
||||
regex_whitelist_title: str = str(custom_reader.get_tag(feed, "regex_whitelist_title", "")).strip()
|
||||
regex_whitelist_summary: str = str(custom_reader.get_tag(feed, "regex_whitelist_summary", "")).strip()
|
||||
regex_whitelist_content: str = str(custom_reader.get_tag(feed, "regex_whitelist_content", "")).strip()
|
||||
regex_whitelist_author: str = str(custom_reader.get_tag(feed, "regex_whitelist_author", "")).strip()
|
||||
|
||||
return bool(
|
||||
whitelist_title
|
||||
or whitelist_author
|
||||
or whitelist_content
|
||||
or whitelist_summary
|
||||
or regex_whitelist_author
|
||||
or regex_whitelist_content
|
||||
or regex_whitelist_summary
|
||||
or regex_whitelist_title,
|
||||
)
|
||||
|
||||
|
||||
def should_be_sent(custom_reader: Reader, entry: Entry) -> bool:
|
||||
def should_be_sent(custom_reader: Reader, entry: Entry) -> bool: # noqa: PLR0911
|
||||
"""Return True if the entry is in the whitelist.
|
||||
|
||||
Args:
|
||||
@ -41,20 +61,43 @@ def should_be_sent(custom_reader: Reader, entry: Entry) -> bool:
|
||||
bool: If the entry is in the whitelist.
|
||||
"""
|
||||
feed: Feed = entry.feed
|
||||
whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", ""))
|
||||
whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", ""))
|
||||
whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", ""))
|
||||
whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", ""))
|
||||
# Regular whitelist tags
|
||||
whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")).strip()
|
||||
whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")).strip()
|
||||
whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")).strip()
|
||||
whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", "")).strip()
|
||||
|
||||
# Regex whitelist tags
|
||||
regex_whitelist_title: str = str(custom_reader.get_tag(feed, "regex_whitelist_title", "")).strip()
|
||||
regex_whitelist_summary: str = str(custom_reader.get_tag(feed, "regex_whitelist_summary", "")).strip()
|
||||
regex_whitelist_content: str = str(custom_reader.get_tag(feed, "regex_whitelist_content", "")).strip()
|
||||
regex_whitelist_author: str = str(custom_reader.get_tag(feed, "regex_whitelist_author", "")).strip()
|
||||
|
||||
# Check regular whitelist
|
||||
if entry.title and whitelist_title and is_word_in_text(whitelist_title, entry.title):
|
||||
return True
|
||||
if entry.summary and whitelist_summary and is_word_in_text(whitelist_summary, entry.summary):
|
||||
return True
|
||||
if entry.author and whitelist_author and is_word_in_text(whitelist_author, entry.author):
|
||||
return True
|
||||
return bool(
|
||||
if (
|
||||
entry.content
|
||||
and entry.content[0].value
|
||||
and whitelist_content
|
||||
and is_word_in_text(whitelist_content, entry.content[0].value),
|
||||
and is_word_in_text(whitelist_content, entry.content[0].value)
|
||||
):
|
||||
return True
|
||||
|
||||
# Check regex whitelist
|
||||
if entry.title and regex_whitelist_title and is_regex_match(regex_whitelist_title, entry.title):
|
||||
return True
|
||||
if entry.summary and regex_whitelist_summary and is_regex_match(regex_whitelist_summary, entry.summary):
|
||||
return True
|
||||
if entry.author and regex_whitelist_author and is_regex_match(regex_whitelist_author, entry.author):
|
||||
return True
|
||||
return bool(
|
||||
entry.content
|
||||
and entry.content[0].value
|
||||
and regex_whitelist_content
|
||||
and is_regex_match(regex_whitelist_content, entry.content[0].value),
|
||||
)
|
||||
|
193
discord_rss_bot/hoyolab_api.py
Normal file
193
discord_rss_bot/hoyolab_api.py
Normal file
@ -0,0 +1,193 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import contextlib
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
import requests
|
||||
from discord_webhook import DiscordEmbed, DiscordWebhook
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from reader import Entry
|
||||
|
||||
|
||||
logger: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def is_c3kay_feed(feed_url: str) -> bool:
|
||||
"""Check if the feed is from c3kay.de.
|
||||
|
||||
Args:
|
||||
feed_url: The feed URL to check.
|
||||
|
||||
Returns:
|
||||
bool: True if the feed is from c3kay.de, False otherwise.
|
||||
"""
|
||||
return "feeds.c3kay.de" in feed_url
|
||||
|
||||
|
||||
def extract_post_id_from_hoyolab_url(url: str) -> str | None:
|
||||
"""Extract the post ID from a Hoyolab URL.
|
||||
|
||||
Args:
|
||||
url: The Hoyolab URL to extract the post ID from.
|
||||
For example: https://www.hoyolab.com/article/38588239
|
||||
|
||||
Returns:
|
||||
str | None: The post ID if found, None otherwise.
|
||||
"""
|
||||
try:
|
||||
match: re.Match[str] | None = re.search(r"/article/(\d+)", url)
|
||||
if match:
|
||||
return match.group(1)
|
||||
except (ValueError, AttributeError, TypeError) as e:
|
||||
logger.warning("Error extracting post ID from Hoyolab URL %s: %s", url, e)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def fetch_hoyolab_post(post_id: str) -> dict[str, Any] | None:
|
||||
"""Fetch post data from the Hoyolab API.
|
||||
|
||||
Args:
|
||||
post_id: The post ID to fetch.
|
||||
|
||||
Returns:
|
||||
dict[str, Any] | None: The post data if successful, None otherwise.
|
||||
"""
|
||||
if not post_id:
|
||||
return None
|
||||
|
||||
http_ok = 200
|
||||
try:
|
||||
url: str = f"https://bbs-api-os.hoyolab.com/community/post/wapi/getPostFull?post_id={post_id}"
|
||||
response: requests.Response = requests.get(url, timeout=10)
|
||||
|
||||
if response.status_code == http_ok:
|
||||
data: dict[str, Any] = response.json()
|
||||
if data.get("retcode") == 0 and "data" in data and "post" in data["data"]:
|
||||
return data["data"]["post"]
|
||||
|
||||
logger.warning("Failed to fetch Hoyolab post %s: %s", post_id, response.text)
|
||||
except (requests.RequestException, ValueError):
|
||||
logger.exception("Error fetching Hoyolab post %s", post_id)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: dict[str, Any]) -> DiscordWebhook: # noqa: C901, PLR0912, PLR0914, PLR0915
|
||||
"""Create a webhook with data from the Hoyolab API.
|
||||
|
||||
Args:
|
||||
webhook_url: The webhook URL.
|
||||
entry: The entry to send to Discord.
|
||||
post_data: The post data from the Hoyolab API.
|
||||
|
||||
Returns:
|
||||
DiscordWebhook: The webhook with the embed.
|
||||
"""
|
||||
entry_link: str = entry.link or entry.feed.url
|
||||
webhook = DiscordWebhook(url=webhook_url, rate_limit_retry=True)
|
||||
|
||||
# Extract relevant data from the post
|
||||
post: dict[str, Any] = post_data.get("post", {})
|
||||
subject: str = post.get("subject", "")
|
||||
content: str = post.get("content", "{}")
|
||||
|
||||
logger.debug("Post subject: %s", subject)
|
||||
logger.debug("Post content: %s", content)
|
||||
|
||||
content_data: dict[str, str] = {}
|
||||
with contextlib.suppress(json.JSONDecodeError, ValueError):
|
||||
content_data = json.loads(content)
|
||||
|
||||
logger.debug("Content data: %s", content_data)
|
||||
|
||||
description: str = content_data.get("describe", "")
|
||||
if not description:
|
||||
description = post.get("desc", "")
|
||||
|
||||
# Create the embed
|
||||
discord_embed = DiscordEmbed()
|
||||
|
||||
# Set title and description
|
||||
discord_embed.set_title(subject)
|
||||
discord_embed.set_url(entry_link)
|
||||
|
||||
# Get post.image_list
|
||||
image_list: list[dict[str, Any]] = post_data.get("image_list", [])
|
||||
if image_list:
|
||||
image_url: str = str(image_list[0].get("url", ""))
|
||||
image_height: int = int(image_list[0].get("height", 1080))
|
||||
image_width: int = int(image_list[0].get("width", 1920))
|
||||
|
||||
logger.debug("Image URL: %s, Height: %s, Width: %s", image_url, image_height, image_width)
|
||||
discord_embed.set_image(url=image_url, height=image_height, width=image_width)
|
||||
|
||||
video: dict[str, str | int | bool] = post_data.get("video", {})
|
||||
if video and video.get("url"):
|
||||
video_url: str = str(video.get("url", ""))
|
||||
logger.debug("Video URL: %s", video_url)
|
||||
with contextlib.suppress(requests.RequestException):
|
||||
video_response: requests.Response = requests.get(video_url, stream=True, timeout=10)
|
||||
if video_response.ok:
|
||||
webhook.add_file(
|
||||
file=video_response.content,
|
||||
filename=f"{entry.id}.mp4",
|
||||
)
|
||||
|
||||
game = post_data.get("game", {})
|
||||
|
||||
if game and game.get("color"):
|
||||
game_color = str(game.get("color", ""))
|
||||
discord_embed.set_color(game_color.removeprefix("#"))
|
||||
|
||||
user: dict[str, str | int | bool] = post_data.get("user", {})
|
||||
author_name: str = str(user.get("nickname", ""))
|
||||
avatar_url: str = str(user.get("avatar_url", ""))
|
||||
if author_name:
|
||||
webhook.avatar_url = avatar_url
|
||||
webhook.username = author_name
|
||||
|
||||
classification = post_data.get("classification", {})
|
||||
|
||||
if classification and classification.get("name"):
|
||||
footer = str(classification.get("name", ""))
|
||||
discord_embed.set_footer(text=footer)
|
||||
|
||||
webhook.add_embed(discord_embed)
|
||||
|
||||
# Only show Youtube URL if available
|
||||
structured_content: str = post.get("structured_content", "")
|
||||
if structured_content: # noqa: PLR1702
|
||||
try:
|
||||
structured_content_data: list[dict[str, Any]] = json.loads(structured_content)
|
||||
for item in structured_content_data:
|
||||
if item.get("insert") and isinstance(item["insert"], dict):
|
||||
video_url: str = str(item["insert"].get("video", ""))
|
||||
if video_url:
|
||||
video_id_match: re.Match[str] | None = re.search(r"embed/([a-zA-Z0-9_-]+)", video_url)
|
||||
if video_id_match:
|
||||
video_id: str = video_id_match.group(1)
|
||||
logger.debug("Video ID: %s", video_id)
|
||||
webhook.content = f"https://www.youtube.com/watch?v={video_id}"
|
||||
webhook.remove_embeds()
|
||||
|
||||
except (json.JSONDecodeError, ValueError) as e:
|
||||
logger.warning("Error parsing structured content: %s", e)
|
||||
|
||||
event_start_date: str = post.get("event_start_date", "")
|
||||
if event_start_date and event_start_date != "0":
|
||||
discord_embed.add_embed_field(name="Start", value=f"<t:{event_start_date}:R>")
|
||||
|
||||
event_end_date: str = post.get("event_end_date", "")
|
||||
if event_end_date and event_end_date != "0":
|
||||
discord_embed.add_embed_field(name="End", value=f"<t:{event_end_date}:R>")
|
||||
|
||||
created_at: str = post.get("created_at", "")
|
||||
if created_at and created_at != "0":
|
||||
discord_embed.set_timestamp(timestamp=created_at)
|
||||
|
||||
return webhook
|
@ -37,13 +37,13 @@ from discord_rss_bot.custom_message import (
|
||||
replace_tags_in_text_message,
|
||||
save_embed,
|
||||
)
|
||||
from discord_rss_bot.feeds import create_feed, send_entry_to_discord, send_to_discord
|
||||
from discord_rss_bot.feeds import create_feed, extract_domain, send_entry_to_discord, send_to_discord
|
||||
from discord_rss_bot.missing_tags import add_missing_tags
|
||||
from discord_rss_bot.search import create_html_for_search_results
|
||||
from discord_rss_bot.settings import get_reader
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterable
|
||||
from collections.abc import AsyncGenerator, Iterable
|
||||
|
||||
from reader.types import JSONType
|
||||
|
||||
@ -88,8 +88,15 @@ reader: Reader = get_reader()
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI) -> typing.AsyncGenerator[None]:
|
||||
"""This is needed for the ASGI server to run."""
|
||||
async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
|
||||
"""Lifespan for the FastAPI app.
|
||||
|
||||
Args:
|
||||
app: The FastAPI app.
|
||||
|
||||
Yields:
|
||||
None: Nothing.
|
||||
"""
|
||||
add_missing_tags(reader)
|
||||
scheduler: AsyncIOScheduler = AsyncIOScheduler()
|
||||
|
||||
@ -250,6 +257,10 @@ async def post_set_whitelist(
|
||||
whitelist_summary: Annotated[str, Form()] = "",
|
||||
whitelist_content: Annotated[str, Form()] = "",
|
||||
whitelist_author: Annotated[str, Form()] = "",
|
||||
regex_whitelist_title: Annotated[str, Form()] = "",
|
||||
regex_whitelist_summary: Annotated[str, Form()] = "",
|
||||
regex_whitelist_content: Annotated[str, Form()] = "",
|
||||
regex_whitelist_author: Annotated[str, Form()] = "",
|
||||
feed_url: Annotated[str, Form()] = "",
|
||||
) -> RedirectResponse:
|
||||
"""Set what the whitelist should be sent, if you have this set only words in the whitelist will be sent.
|
||||
@ -259,6 +270,10 @@ async def post_set_whitelist(
|
||||
whitelist_summary: Whitelisted words for when checking the summary.
|
||||
whitelist_content: Whitelisted words for when checking the content.
|
||||
whitelist_author: Whitelisted words for when checking the author.
|
||||
regex_whitelist_title: Whitelisted regex for when checking the title.
|
||||
regex_whitelist_summary: Whitelisted regex for when checking the summary.
|
||||
regex_whitelist_content: Whitelisted regex for when checking the content.
|
||||
regex_whitelist_author: Whitelisted regex for when checking the author.
|
||||
feed_url: The feed we should set the whitelist for.
|
||||
|
||||
Returns:
|
||||
@ -269,6 +284,10 @@ async def post_set_whitelist(
|
||||
reader.set_tag(clean_feed_url, "whitelist_summary", whitelist_summary) # pyright: ignore[reportArgumentType][call-overload]
|
||||
reader.set_tag(clean_feed_url, "whitelist_content", whitelist_content) # pyright: ignore[reportArgumentType][call-overload]
|
||||
reader.set_tag(clean_feed_url, "whitelist_author", whitelist_author) # pyright: ignore[reportArgumentType][call-overload]
|
||||
reader.set_tag(clean_feed_url, "regex_whitelist_title", regex_whitelist_title) # pyright: ignore[reportArgumentType][call-overload]
|
||||
reader.set_tag(clean_feed_url, "regex_whitelist_summary", regex_whitelist_summary) # pyright: ignore[reportArgumentType][call-overload]
|
||||
reader.set_tag(clean_feed_url, "regex_whitelist_content", regex_whitelist_content) # pyright: ignore[reportArgumentType][call-overload]
|
||||
reader.set_tag(clean_feed_url, "regex_whitelist_author", regex_whitelist_author) # pyright: ignore[reportArgumentType][call-overload]
|
||||
|
||||
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
|
||||
|
||||
@ -287,11 +306,14 @@ async def get_whitelist(feed_url: str, request: Request):
|
||||
clean_feed_url: str = feed_url.strip()
|
||||
feed: Feed = reader.get_feed(urllib.parse.unquote(clean_feed_url))
|
||||
|
||||
# Get previous data, this is used when creating the form.
|
||||
whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", ""))
|
||||
whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", ""))
|
||||
whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", ""))
|
||||
whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", ""))
|
||||
regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", ""))
|
||||
regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", ""))
|
||||
regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", ""))
|
||||
regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", ""))
|
||||
|
||||
context = {
|
||||
"request": request,
|
||||
@ -300,6 +322,10 @@ async def get_whitelist(feed_url: str, request: Request):
|
||||
"whitelist_summary": whitelist_summary,
|
||||
"whitelist_content": whitelist_content,
|
||||
"whitelist_author": whitelist_author,
|
||||
"regex_whitelist_title": regex_whitelist_title,
|
||||
"regex_whitelist_summary": regex_whitelist_summary,
|
||||
"regex_whitelist_content": regex_whitelist_content,
|
||||
"regex_whitelist_author": regex_whitelist_author,
|
||||
}
|
||||
return templates.TemplateResponse(request=request, name="whitelist.html", context=context)
|
||||
|
||||
@ -310,6 +336,10 @@ async def post_set_blacklist(
|
||||
blacklist_summary: Annotated[str, Form()] = "",
|
||||
blacklist_content: Annotated[str, Form()] = "",
|
||||
blacklist_author: Annotated[str, Form()] = "",
|
||||
regex_blacklist_title: Annotated[str, Form()] = "",
|
||||
regex_blacklist_summary: Annotated[str, Form()] = "",
|
||||
regex_blacklist_content: Annotated[str, Form()] = "",
|
||||
regex_blacklist_author: Annotated[str, Form()] = "",
|
||||
feed_url: Annotated[str, Form()] = "",
|
||||
) -> RedirectResponse:
|
||||
"""Set the blacklist.
|
||||
@ -322,6 +352,10 @@ async def post_set_blacklist(
|
||||
blacklist_summary: Blacklisted words for when checking the summary.
|
||||
blacklist_content: Blacklisted words for when checking the content.
|
||||
blacklist_author: Blacklisted words for when checking the author.
|
||||
regex_blacklist_title: Blacklisted regex for when checking the title.
|
||||
regex_blacklist_summary: Blacklisted regex for when checking the summary.
|
||||
regex_blacklist_content: Blacklisted regex for when checking the content.
|
||||
regex_blacklist_author: Blacklisted regex for when checking the author.
|
||||
feed_url: What feed we should set the blacklist for.
|
||||
|
||||
Returns:
|
||||
@ -332,7 +366,10 @@ async def post_set_blacklist(
|
||||
reader.set_tag(clean_feed_url, "blacklist_summary", blacklist_summary) # pyright: ignore[reportArgumentType][call-overload]
|
||||
reader.set_tag(clean_feed_url, "blacklist_content", blacklist_content) # pyright: ignore[reportArgumentType][call-overload]
|
||||
reader.set_tag(clean_feed_url, "blacklist_author", blacklist_author) # pyright: ignore[reportArgumentType][call-overload]
|
||||
|
||||
reader.set_tag(clean_feed_url, "regex_blacklist_title", regex_blacklist_title) # pyright: ignore[reportArgumentType][call-overload]
|
||||
reader.set_tag(clean_feed_url, "regex_blacklist_summary", regex_blacklist_summary) # pyright: ignore[reportArgumentType][call-overload]
|
||||
reader.set_tag(clean_feed_url, "regex_blacklist_content", regex_blacklist_content) # pyright: ignore[reportArgumentType][call-overload]
|
||||
reader.set_tag(clean_feed_url, "regex_blacklist_author", regex_blacklist_author) # pyright: ignore[reportArgumentType][call-overload]
|
||||
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
|
||||
|
||||
|
||||
@ -349,11 +386,14 @@ async def get_blacklist(feed_url: str, request: Request):
|
||||
"""
|
||||
feed: Feed = reader.get_feed(urllib.parse.unquote(feed_url))
|
||||
|
||||
# Get previous data, this is used when creating the form.
|
||||
blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", ""))
|
||||
blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", ""))
|
||||
blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", ""))
|
||||
blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", ""))
|
||||
regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", ""))
|
||||
regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", ""))
|
||||
regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", ""))
|
||||
regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", ""))
|
||||
|
||||
context = {
|
||||
"request": request,
|
||||
@ -362,6 +402,10 @@ async def get_blacklist(feed_url: str, request: Request):
|
||||
"blacklist_summary": blacklist_summary,
|
||||
"blacklist_content": blacklist_content,
|
||||
"blacklist_author": blacklist_author,
|
||||
"regex_blacklist_title": regex_blacklist_title,
|
||||
"regex_blacklist_summary": regex_blacklist_summary,
|
||||
"regex_blacklist_content": regex_blacklist_content,
|
||||
"regex_blacklist_author": regex_blacklist_author,
|
||||
}
|
||||
return templates.TemplateResponse(request=request, name="blacklist.html", context=context)
|
||||
|
||||
@ -461,7 +505,7 @@ async def get_embed_page(feed_url: str, request: Request):
|
||||
|
||||
|
||||
@app.post("/embed", response_class=HTMLResponse)
|
||||
async def post_embed( # noqa: PLR0913, PLR0917
|
||||
async def post_embed(
|
||||
feed_url: Annotated[str, Form()],
|
||||
title: Annotated[str, Form()] = "",
|
||||
description: Annotated[str, Form()] = "",
|
||||
@ -688,6 +732,27 @@ def create_html_for_feed(entries: Iterable[Entry]) -> str:
|
||||
|
||||
entry_id: str = urllib.parse.quote(entry.id)
|
||||
to_discord_html: str = f"<a class='text-muted' href='/post_entry?entry_id={entry_id}'>Send to Discord</a>"
|
||||
|
||||
# Check if this is a YouTube feed entry and the entry has a link
|
||||
is_youtube_feed = "youtube.com/feeds/videos.xml" in entry.feed.url
|
||||
video_embed_html = ""
|
||||
|
||||
if is_youtube_feed and entry.link:
|
||||
# Extract the video ID and create an embed if possible
|
||||
video_id: str | None = extract_youtube_video_id(entry.link)
|
||||
if video_id:
|
||||
video_embed_html: str = f"""
|
||||
<div class="ratio ratio-16x9 mt-3 mb-3">
|
||||
<iframe src="https://www.youtube.com/embed/{video_id}"
|
||||
title="{entry.title}"
|
||||
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture"
|
||||
allowfullscreen>
|
||||
</iframe>
|
||||
</div>
|
||||
"""
|
||||
# Don't use the first image if we have a video embed
|
||||
first_image = ""
|
||||
|
||||
image_html: str = f"<img src='{first_image}' class='img-fluid'>" if first_image else ""
|
||||
|
||||
html += f"""<div class="p-2 mb-2 border border-dark">
|
||||
@ -695,6 +760,7 @@ def create_html_for_feed(entries: Iterable[Entry]) -> str:
|
||||
{f"By {entry.author} @" if entry.author else ""}{published} - {to_discord_html}
|
||||
|
||||
{text}
|
||||
{video_embed_html}
|
||||
{image_html}
|
||||
</div>
|
||||
"""
|
||||
@ -809,11 +875,12 @@ def make_context_index(request: Request):
|
||||
broken_feeds = []
|
||||
feeds_without_attached_webhook = []
|
||||
|
||||
# Get all feeds and organize them
|
||||
feeds: Iterable[Feed] = reader.get_feeds()
|
||||
for feed in feeds:
|
||||
try:
|
||||
webhook = reader.get_tag(feed.url, "webhook")
|
||||
feed_list.append({"feed": feed, "webhook": webhook})
|
||||
feed_list.append({"feed": feed, "webhook": webhook, "domain": extract_domain(feed.url)})
|
||||
except TagNotFoundError:
|
||||
broken_feeds.append(feed)
|
||||
continue
|
||||
@ -947,6 +1014,29 @@ def modify_webhook(old_hook: Annotated[str, Form()], new_hook: Annotated[str, Fo
|
||||
return RedirectResponse(url="/webhooks", status_code=303)
|
||||
|
||||
|
||||
def extract_youtube_video_id(url: str) -> str | None:
|
||||
"""Extract YouTube video ID from a YouTube video URL.
|
||||
|
||||
Args:
|
||||
url: The YouTube video URL.
|
||||
|
||||
Returns:
|
||||
The video ID if found, None otherwise.
|
||||
"""
|
||||
if not url:
|
||||
return None
|
||||
|
||||
# Handle standard YouTube URLs (youtube.com/watch?v=VIDEO_ID)
|
||||
if "youtube.com/watch" in url and "v=" in url:
|
||||
return url.split("v=")[1].split("&")[0]
|
||||
|
||||
# Handle shortened YouTube URLs (youtu.be/VIDEO_ID)
|
||||
if "youtu.be/" in url:
|
||||
return url.split("youtu.be/")[1].split("?")[0]
|
||||
|
||||
return None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sentry_sdk.init(
|
||||
dsn="https://6e77a0d7acb9c7ea22e85a375e0ff1f4@o4505228040339456.ingest.us.sentry.io/4508792887967744",
|
||||
|
@ -42,6 +42,49 @@
|
||||
<label for="blacklist_author" class="col-sm-6 col-form-label">Blacklist - Author</label>
|
||||
<input name="blacklist_author" type="text" class="form-control bg-dark border-dark text-muted"
|
||||
id="blacklist_author" value="{%- if blacklist_author -%}{{ blacklist_author }}{%- endif -%}" />
|
||||
|
||||
<div class="mt-4">
|
||||
<div class="form-text">
|
||||
<ul class="list-inline">
|
||||
<li>
|
||||
Regular expression patterns for advanced filtering. Each pattern should be on a new
|
||||
line.
|
||||
</li>
|
||||
<li>Patterns are case-insensitive.</li>
|
||||
<li>
|
||||
Examples:
|
||||
<code>
|
||||
<pre>
|
||||
^New Release:.*
|
||||
\b(update|version|patch)\s+\d+\.\d+
|
||||
.*\[(important|notice)\].*
|
||||
</pre>
|
||||
</code>
|
||||
</li>
|
||||
</ul>
|
||||
</div>
|
||||
<label for="regex_blacklist_title" class="col-sm-6 col-form-label">Regex Blacklist - Title</label>
|
||||
<textarea name="regex_blacklist_title" class="form-control bg-dark border-dark text-muted"
|
||||
id="regex_blacklist_title"
|
||||
rows="3">{%- if regex_blacklist_title -%}{{ regex_blacklist_title }}{%- endif -%}</textarea>
|
||||
|
||||
<label for="regex_blacklist_summary" class="col-sm-6 col-form-label">Regex Blacklist -
|
||||
Summary</label>
|
||||
<textarea name="regex_blacklist_summary" class="form-control bg-dark border-dark text-muted"
|
||||
id="regex_blacklist_summary"
|
||||
rows="3">{%- if regex_blacklist_summary -%}{{ regex_blacklist_summary }}{%- endif -%}</textarea>
|
||||
|
||||
<label for="regex_blacklist_content" class="col-sm-6 col-form-label">Regex Blacklist -
|
||||
Content</label>
|
||||
<textarea name="regex_blacklist_content" class="form-control bg-dark border-dark text-muted"
|
||||
id="regex_blacklist_content"
|
||||
rows="3">{%- if regex_blacklist_content -%}{{ regex_blacklist_content }}{%- endif -%}</textarea>
|
||||
|
||||
<label for="regex_blacklist_author" class="col-sm-6 col-form-label">Regex Blacklist - Author</label>
|
||||
<textarea name="regex_blacklist_author" class="form-control bg-dark border-dark text-muted"
|
||||
id="regex_blacklist_author"
|
||||
rows="3">{%- if regex_blacklist_author -%}{{ regex_blacklist_author }}{%- endif -%}</textarea>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<!-- Add a hidden feed_url field to the form -->
|
||||
|
@ -43,6 +43,7 @@
|
||||
</form>
|
||||
{% endif %}
|
||||
|
||||
{% if not "youtube.com/feeds/videos.xml" in feed.url %}
|
||||
{% if should_send_embed %}
|
||||
<form action="/use_text" method="post" class="d-inline">
|
||||
<button class="btn btn-dark btn-sm" name="feed_url" value="{{ feed.url }}">
|
||||
@ -56,6 +57,7 @@
|
||||
</button>
|
||||
</form>
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
</div>
|
||||
|
||||
<!-- Additional Links -->
|
||||
@ -65,9 +67,11 @@
|
||||
<a class="text-muted d-block" href="/custom?feed_url={{ feed.url|encode_url }}">
|
||||
Customize message {% if not should_send_embed %}(Currently active){% endif %}
|
||||
</a>
|
||||
{% if not "youtube.com/feeds/videos.xml" in feed.url %}
|
||||
<a class="text-muted d-block" href="/embed?feed_url={{ feed.url|encode_url }}">
|
||||
Customize embed {% if should_send_embed %}(Currently active){% endif %}
|
||||
</a>
|
||||
{% endif %}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
|
@ -28,32 +28,66 @@
|
||||
{{ entry_count.averages[2]|round(1) }})
|
||||
</abbr>
|
||||
</p>
|
||||
<!-- Loop through the webhooks and add the feeds connected to them. -->
|
||||
|
||||
<!-- Loop through the webhooks and add the feeds grouped by domain -->
|
||||
{% for hook_from_context in webhooks %}
|
||||
<div class="p-2 mb-2 border border-dark">
|
||||
<h2 class="h5">
|
||||
<div class="p-2 mb-3 border border-dark">
|
||||
<h2 class="h5 mb-3">
|
||||
<a class="text-muted" href="/webhooks">{{ hook_from_context.name }}</a>
|
||||
</h2>
|
||||
<ul class="list-group">
|
||||
{% for feed_webhook in feeds %}
|
||||
{% set feed = feed_webhook["feed"] %}
|
||||
{% set hook_from_feed = feed_webhook["webhook"] %}
|
||||
{% if hook_from_context.url == hook_from_feed %}
|
||||
<div>
|
||||
<a class="text-muted" href="/feed?feed_url={{ feed.url|encode_url }}">{{ feed.url }}</a>
|
||||
{% if not feed.updates_enabled %}<span class="text-warning">Disabled</span>{% endif %}
|
||||
{% if feed.last_exception %}<span
|
||||
class="text-danger">({{ feed.last_exception.value_str }})</span>{% endif %}
|
||||
|
||||
<!-- Group feeds by domain within each webhook -->
|
||||
{% set feeds_for_hook = [] %}
|
||||
{% for feed_webhook in feeds %}
|
||||
{% if hook_from_context.url == feed_webhook.webhook %}
|
||||
{% set _ = feeds_for_hook.append(feed_webhook) %}
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
|
||||
{% if feeds_for_hook %}
|
||||
<!-- Create a dictionary to hold feeds grouped by domain -->
|
||||
{% set domains = {} %}
|
||||
{% for feed_item in feeds_for_hook %}
|
||||
{% set feed = feed_item.feed %}
|
||||
{% set domain = feed_item.domain %}
|
||||
{% if domain not in domains %}
|
||||
{% set _ = domains.update({domain: []}) %}
|
||||
{% endif %}
|
||||
{% set _ = domains[domain].append(feed) %}
|
||||
{% endfor %}
|
||||
|
||||
<!-- Display domains and their feeds -->
|
||||
{% for domain, domain_feeds in domains.items() %}
|
||||
<div class="card bg-dark border border-dark mb-2">
|
||||
<div class="card-header">
|
||||
<h3 class="h6 mb-0 text-white-50">{{ domain }} ({{ domain_feeds|length }})</h3>
|
||||
</div>
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
</ul>
|
||||
<div class="card-body p-2">
|
||||
<ul class="list-group list-unstyled mb-0">
|
||||
{% for feed in domain_feeds %}
|
||||
<li>
|
||||
<a class="text-muted" href="/feed?feed_url={{ feed.url|encode_url }}">
|
||||
{% if feed.title %}{{ feed.title }}{% else %}{{ feed.url }}{% endif %}
|
||||
</a>
|
||||
{% if not feed.updates_enabled %}<span class="text-warning">Disabled</span>{% endif %}
|
||||
{% if feed.last_exception %}<span
|
||||
class="text-danger">({{ feed.last_exception.value_str }})</span>{% endif %}
|
||||
</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
</div>
|
||||
</div>
|
||||
{% endfor %}
|
||||
{% else %}
|
||||
<p class="text-muted">No feeds associated with this webhook.</p>
|
||||
{% endif %}
|
||||
</div>
|
||||
{% endfor %}
|
||||
{% else %}
|
||||
<p>
|
||||
Hello there!
|
||||
<br>
|
||||
<br>
|
||||
You need to add a webhook <a class="text-muted" href="/add_webhook">here</a> to get started. After that, you can
|
||||
add feeds <a class="text-muted" href="/add">here</a>. You can find both of these links in the navigation bar
|
||||
above.
|
||||
@ -66,24 +100,52 @@
|
||||
Thanks!
|
||||
</p>
|
||||
{% endif %}
|
||||
|
||||
<!-- Show feeds without webhooks -->
|
||||
{% if broken_feeds %}
|
||||
<div class="p-2 mb-2 border border-dark">
|
||||
<ul class="list-group text-danger">
|
||||
Feeds without webhook:
|
||||
{% for broken_feed in broken_feeds %}
|
||||
<a class="text-muted" href="/feed?feed_url={{ broken_feed.url|encode_url }}">{{ broken_feed.url }}</a>
|
||||
<a class="text-muted" href="/feed?feed_url={{ broken_feed.url|encode_url }}">
|
||||
{# Display username@youtube for YouTube feeds #}
|
||||
{% if "youtube.com/feeds/videos.xml" in broken_feed.url %}
|
||||
{% if "user=" in broken_feed.url %}
|
||||
{{ broken_feed.url.split("user=")[1] }}@youtube
|
||||
{% elif "channel_id=" in broken_feed.url %}
|
||||
{{ broken_feed.title if broken_feed.title else broken_feed.url.split("channel_id=")[1] }}@youtube
|
||||
{% else %}
|
||||
{{ broken_feed.url }}
|
||||
{% endif %}
|
||||
{% else %}
|
||||
{{ broken_feed.url }}
|
||||
{% endif %}
|
||||
</a>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
<!-- Show feeds that has no attached webhook -->
|
||||
{% if feeds_without_attached_webhook %}
|
||||
<div class="p-2 mb-2 border border-dark">
|
||||
<ul class="list-group text-danger">
|
||||
Feeds without attached webhook:
|
||||
{% for feed in feeds_without_attached_webhook %}
|
||||
<a class="text-muted" href="/feed?feed_url={{ feed.url|encode_url }}">{{ feed.url }}</a>
|
||||
<a class="text-muted" href="/feed?feed_url={{ feed.url|encode_url }}">
|
||||
{# Display username@youtube for YouTube feeds #}
|
||||
{% if "youtube.com/feeds/videos.xml" in feed.url %}
|
||||
{% if "user=" in feed.url %}
|
||||
{{ feed.url.split("user=")[1] }}@youtube
|
||||
{% elif "channel_id=" in feed.url %}
|
||||
{{ feed.title if feed.title else feed.url.split("channel_id=")[1] }}@youtube
|
||||
{% else %}
|
||||
{{ feed.url }}
|
||||
{% endif %}
|
||||
{% else %}
|
||||
{{ feed.url }}
|
||||
{% endif %}
|
||||
</a>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
</div>
|
||||
|
@ -1,6 +1,6 @@
|
||||
{% extends "base.html" %}
|
||||
{% block title %}
|
||||
| Blacklist
|
||||
| Whitelist
|
||||
{% endblock title %}
|
||||
{% block content %}
|
||||
<div class="p-2 border border-dark">
|
||||
@ -42,6 +42,49 @@
|
||||
<label for="whitelist_author" class="col-sm-6 col-form-label">Whitelist - Author</label>
|
||||
<input name="whitelist_author" type="text" class="form-control bg-dark border-dark text-muted"
|
||||
id="whitelist_author" value="{%- if whitelist_author -%} {{ whitelist_author }} {%- endif -%}" />
|
||||
|
||||
<div class="mt-4">
|
||||
<div class="form-text">
|
||||
<ul class="list-inline">
|
||||
<li>
|
||||
Regular expression patterns for advanced filtering. Each pattern should be on a new
|
||||
line.
|
||||
</li>
|
||||
<li>Patterns are case-insensitive.</li>
|
||||
<li>
|
||||
Examples:
|
||||
<code>
|
||||
<pre>
|
||||
^New Release:.*
|
||||
\b(update|version|patch)\s+\d+\.\d+
|
||||
.*\[(important|notice)\].*
|
||||
</pre>
|
||||
</code>
|
||||
</li>
|
||||
</ul>
|
||||
</div>
|
||||
<label for="regex_whitelist_title" class="col-sm-6 col-form-label">Regex Whitelist - Title</label>
|
||||
<textarea name="regex_whitelist_title" class="form-control bg-dark border-dark text-muted"
|
||||
id="regex_whitelist_title"
|
||||
rows="3">{%- if regex_whitelist_title -%}{{ regex_whitelist_title }}{%- endif -%}</textarea>
|
||||
|
||||
<label for="regex_whitelist_summary" class="col-sm-6 col-form-label">Regex Whitelist -
|
||||
Summary</label>
|
||||
<textarea name="regex_whitelist_summary" class="form-control bg-dark border-dark text-muted"
|
||||
id="regex_whitelist_summary"
|
||||
rows="3">{%- if regex_whitelist_summary -%}{{ regex_whitelist_summary }}{%- endif -%}</textarea>
|
||||
|
||||
<label for="regex_whitelist_content" class="col-sm-6 col-form-label">Regex Whitelist -
|
||||
Content</label>
|
||||
<textarea name="regex_whitelist_content" class="form-control bg-dark border-dark text-muted"
|
||||
id="regex_whitelist_content"
|
||||
rows="3">{%- if regex_whitelist_content -%}{{ regex_whitelist_content }}{%- endif -%}</textarea>
|
||||
|
||||
<label for="regex_whitelist_author" class="col-sm-6 col-form-label">Regex Whitelist - Author</label>
|
||||
<textarea name="regex_whitelist_author" class="form-control bg-dark border-dark text-muted"
|
||||
id="regex_whitelist_author"
|
||||
rows="3">{%- if regex_whitelist_author -%}{{ regex_whitelist_author }}{%- endif -%}</textarea>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<!-- Add a hidden feed_url field to the form -->
|
||||
|
@ -17,6 +17,7 @@ dependencies = [
|
||||
"python-multipart",
|
||||
"reader",
|
||||
"sentry-sdk[fastapi]",
|
||||
"tldextract",
|
||||
"uvicorn",
|
||||
]
|
||||
|
||||
@ -42,7 +43,7 @@ platformdirs = "*"
|
||||
python-dotenv = "*"
|
||||
python-multipart = "*"
|
||||
reader = "*"
|
||||
sentry-sdk = {version = "*", extras = ["fastapi"]}
|
||||
sentry-sdk = { version = "*", extras = ["fastapi"] }
|
||||
uvicorn = "*"
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
@ -86,6 +87,8 @@ lint.ignore = [
|
||||
"PLR6301", # Checks for the presence of unused self parameter in methods definitions.
|
||||
"RUF029", # Checks for functions declared async that do not await or otherwise use features requiring the function to be declared async.
|
||||
"TD003", # Checks that a TODO comment is associated with a link to a relevant issue or ticket.
|
||||
"PLR0913", # Checks for function definitions that include too many arguments.
|
||||
"PLR0917", # Checks for function definitions that include too many positional arguments.
|
||||
|
||||
# Conflicting lint rules when using Ruff's formatter
|
||||
# https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules
|
||||
|
@ -39,6 +39,13 @@ def test_has_black_tags() -> None:
|
||||
check_if_has_tag(reader, feed, "blacklist_title")
|
||||
check_if_has_tag(reader, feed, "blacklist_summary")
|
||||
check_if_has_tag(reader, feed, "blacklist_content")
|
||||
check_if_has_tag(reader, feed, "blacklist_author")
|
||||
|
||||
# Test regex blacklist tags
|
||||
check_if_has_tag(reader, feed, "regex_blacklist_title")
|
||||
check_if_has_tag(reader, feed, "regex_blacklist_summary")
|
||||
check_if_has_tag(reader, feed, "regex_blacklist_content")
|
||||
check_if_has_tag(reader, feed, "regex_blacklist_author")
|
||||
|
||||
# Clean up
|
||||
reader.delete_feed(feed_url)
|
||||
@ -74,6 +81,7 @@ def test_should_be_skipped() -> None:
|
||||
# Test entry without any blacklists
|
||||
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
|
||||
|
||||
# Test standard blacklist functionality
|
||||
reader.set_tag(feed, "blacklist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
|
||||
assert entry_should_be_skipped(reader, first_entry[0]) is True, f"Entry should be skipped: {first_entry[0]}"
|
||||
reader.delete_tag(feed, "blacklist_title")
|
||||
@ -113,3 +121,81 @@ def test_should_be_skipped() -> None:
|
||||
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
|
||||
reader.delete_tag(feed, "blacklist_author")
|
||||
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
|
||||
|
||||
|
||||
def test_regex_should_be_skipped() -> None:
|
||||
"""Test the regex filtering functionality for blacklist."""
|
||||
reader: Reader = get_reader()
|
||||
|
||||
# Add feed and update entries
|
||||
reader.add_feed(feed_url)
|
||||
feed: Feed = reader.get_feed(feed_url)
|
||||
reader.update_feeds()
|
||||
|
||||
# Get first entry
|
||||
first_entry: list[Entry] = []
|
||||
entries: Iterable[Entry] = reader.get_entries(feed=feed)
|
||||
assert entries is not None, f"Entries should not be None: {entries}"
|
||||
for entry in entries:
|
||||
first_entry.append(entry)
|
||||
break
|
||||
assert len(first_entry) == 1, f"First entry should be added: {first_entry}"
|
||||
|
||||
# Test entry without any regex blacklists
|
||||
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
|
||||
|
||||
# Test regex blacklist for title
|
||||
reader.set_tag(feed, "regex_blacklist_title", r"fvnnn\w+") # pyright: ignore[reportArgumentType]
|
||||
assert entry_should_be_skipped(reader, first_entry[0]) is True, (
|
||||
f"Entry should be skipped with regex title match: {first_entry[0]}"
|
||||
)
|
||||
reader.delete_tag(feed, "regex_blacklist_title")
|
||||
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
|
||||
|
||||
# Test regex blacklist for summary
|
||||
reader.set_tag(feed, "regex_blacklist_summary", r"ffdnfdn\w+") # pyright: ignore[reportArgumentType]
|
||||
assert entry_should_be_skipped(reader, first_entry[0]) is True, (
|
||||
f"Entry should be skipped with regex summary match: {first_entry[0]}"
|
||||
)
|
||||
reader.delete_tag(feed, "regex_blacklist_summary")
|
||||
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
|
||||
|
||||
# Test regex blacklist for content
|
||||
reader.set_tag(feed, "regex_blacklist_content", r"ffdnfdnfdn\w+") # pyright: ignore[reportArgumentType]
|
||||
assert entry_should_be_skipped(reader, first_entry[0]) is True, (
|
||||
f"Entry should be skipped with regex content match: {first_entry[0]}"
|
||||
)
|
||||
reader.delete_tag(feed, "regex_blacklist_content")
|
||||
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
|
||||
|
||||
# Test regex blacklist for author
|
||||
reader.set_tag(feed, "regex_blacklist_author", r"TheLovinator\d*") # pyright: ignore[reportArgumentType]
|
||||
assert entry_should_be_skipped(reader, first_entry[0]) is True, (
|
||||
f"Entry should be skipped with regex author match: {first_entry[0]}"
|
||||
)
|
||||
reader.delete_tag(feed, "regex_blacklist_author")
|
||||
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
|
||||
|
||||
# Test invalid regex pattern (should not raise an exception)
|
||||
reader.set_tag(feed, "regex_blacklist_title", r"[incomplete") # pyright: ignore[reportArgumentType]
|
||||
assert entry_should_be_skipped(reader, first_entry[0]) is False, (
|
||||
f"Entry should not be skipped with invalid regex: {first_entry[0]}"
|
||||
)
|
||||
reader.delete_tag(feed, "regex_blacklist_title")
|
||||
|
||||
# Test multiple regex patterns separated by commas
|
||||
reader.set_tag(feed, "regex_blacklist_author", r"pattern1,TheLovinator\d*,pattern3") # pyright: ignore[reportArgumentType]
|
||||
assert entry_should_be_skipped(reader, first_entry[0]) is True, (
|
||||
f"Entry should be skipped with one matching pattern in list: {first_entry[0]}"
|
||||
)
|
||||
reader.delete_tag(feed, "regex_blacklist_author")
|
||||
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
|
||||
|
||||
# Test newline-separated regex patterns
|
||||
newline_patterns = "pattern1\nTheLovinator\\d*\npattern3"
|
||||
reader.set_tag(feed, "regex_blacklist_author", newline_patterns) # pyright: ignore[reportArgumentType]
|
||||
assert entry_should_be_skipped(reader, first_entry[0]) is True, (
|
||||
f"Entry should be skipped with newline-separated patterns: {first_entry[0]}"
|
||||
)
|
||||
reader.delete_tag(feed, "regex_blacklist_author")
|
||||
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
|
||||
|
@ -4,11 +4,19 @@ import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import LiteralString
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from reader import Feed, Reader, make_reader
|
||||
|
||||
from discord_rss_bot.feeds import send_to_discord, truncate_webhook_message
|
||||
from discord_rss_bot.feeds import (
|
||||
extract_domain,
|
||||
is_youtube_feed,
|
||||
send_entry_to_discord,
|
||||
send_to_discord,
|
||||
should_send_embed_check,
|
||||
truncate_webhook_message,
|
||||
)
|
||||
from discord_rss_bot.missing_tags import add_missing_tags
|
||||
|
||||
|
||||
@ -85,3 +93,186 @@ def test_truncate_webhook_message_long_message():
|
||||
# Test the end of the message
|
||||
assert_msg = "The end of the truncated message should be '...' to indicate truncation."
|
||||
assert truncated_message[-half_length:] == "A" * half_length, assert_msg
|
||||
|
||||
|
||||
def test_is_youtube_feed():
|
||||
"""Test the is_youtube_feed function."""
|
||||
# YouTube feed URLs
|
||||
assert is_youtube_feed("https://www.youtube.com/feeds/videos.xml?channel_id=123456") is True
|
||||
assert is_youtube_feed("https://www.youtube.com/feeds/videos.xml?user=username") is True
|
||||
|
||||
# Non-YouTube feed URLs
|
||||
assert is_youtube_feed("https://www.example.com/feed.xml") is False
|
||||
assert is_youtube_feed("https://www.youtube.com/watch?v=123456") is False
|
||||
assert is_youtube_feed("https://www.reddit.com/r/Python/.rss") is False
|
||||
|
||||
|
||||
@patch("discord_rss_bot.feeds.logger")
|
||||
def test_should_send_embed_check_youtube_feeds(mock_logger: MagicMock) -> None:
|
||||
"""Test should_send_embed_check returns False for YouTube feeds regardless of settings."""
|
||||
# Create mocks
|
||||
mock_reader = MagicMock()
|
||||
mock_entry = MagicMock()
|
||||
|
||||
# Configure a YouTube feed
|
||||
mock_entry.feed.url = "https://www.youtube.com/feeds/videos.xml?channel_id=123456"
|
||||
|
||||
# Set reader to return True for should_send_embed (would normally create an embed)
|
||||
mock_reader.get_tag.return_value = True
|
||||
|
||||
# Result should be False, overriding the feed settings
|
||||
result = should_send_embed_check(mock_reader, mock_entry)
|
||||
assert result is False, "YouTube feeds should never use embeds"
|
||||
|
||||
# Function should not even call get_tag for YouTube feeds
|
||||
mock_reader.get_tag.assert_not_called()
|
||||
|
||||
|
||||
@patch("discord_rss_bot.feeds.logger")
|
||||
def test_should_send_embed_check_normal_feeds(mock_logger: MagicMock) -> None:
|
||||
"""Test should_send_embed_check returns feed settings for non-YouTube feeds."""
|
||||
# Create mocks
|
||||
mock_reader = MagicMock()
|
||||
mock_entry = MagicMock()
|
||||
|
||||
# Configure a normal feed
|
||||
mock_entry.feed.url = "https://www.example.com/feed.xml"
|
||||
|
||||
# Test with should_send_embed set to True
|
||||
mock_reader.get_tag.return_value = True
|
||||
result = should_send_embed_check(mock_reader, mock_entry)
|
||||
assert result is True, "Normal feeds should use embeds when enabled"
|
||||
|
||||
# Test with should_send_embed set to False
|
||||
mock_reader.get_tag.return_value = False
|
||||
result = should_send_embed_check(mock_reader, mock_entry)
|
||||
assert result is False, "Normal feeds should not use embeds when disabled"
|
||||
|
||||
|
||||
@patch("discord_rss_bot.feeds.get_reader")
|
||||
@patch("discord_rss_bot.feeds.get_custom_message")
|
||||
@patch("discord_rss_bot.feeds.replace_tags_in_text_message")
|
||||
@patch("discord_rss_bot.feeds.create_embed_webhook")
|
||||
@patch("discord_rss_bot.feeds.DiscordWebhook")
|
||||
@patch("discord_rss_bot.feeds.execute_webhook")
|
||||
def test_send_entry_to_discord_youtube_feed(
|
||||
mock_execute_webhook: MagicMock,
|
||||
mock_discord_webhook: MagicMock,
|
||||
mock_create_embed: MagicMock,
|
||||
mock_replace_tags: MagicMock,
|
||||
mock_get_custom_message: MagicMock,
|
||||
mock_get_reader: MagicMock,
|
||||
):
|
||||
"""Test send_entry_to_discord function with YouTube feeds."""
|
||||
# Set up mocks
|
||||
mock_reader = MagicMock()
|
||||
mock_get_reader.return_value = mock_reader
|
||||
mock_entry = MagicMock()
|
||||
mock_feed = MagicMock()
|
||||
|
||||
# Configure a YouTube feed
|
||||
mock_entry.feed = mock_feed
|
||||
mock_entry.feed.url = "https://www.youtube.com/feeds/videos.xml?channel_id=123456"
|
||||
mock_entry.feed_url = "https://www.youtube.com/feeds/videos.xml?channel_id=123456"
|
||||
|
||||
# Mock the tags
|
||||
mock_reader.get_tag.side_effect = lambda feed, tag, default=None: { # noqa: ARG005
|
||||
"webhook": "https://discord.com/api/webhooks/123/abc",
|
||||
"should_send_embed": True, # This should be ignored for YouTube feeds
|
||||
}.get(tag, default)
|
||||
|
||||
# Mock custom message
|
||||
mock_get_custom_message.return_value = "Custom message"
|
||||
mock_replace_tags.return_value = "Formatted message with {{entry_link}}"
|
||||
|
||||
# Mock webhook
|
||||
mock_webhook = MagicMock()
|
||||
mock_discord_webhook.return_value = mock_webhook
|
||||
|
||||
# Call the function
|
||||
send_entry_to_discord(mock_entry)
|
||||
|
||||
# Assertions
|
||||
mock_create_embed.assert_not_called()
|
||||
mock_discord_webhook.assert_called_once()
|
||||
|
||||
# Check webhook was created with the right message
|
||||
webhook_call_kwargs = mock_discord_webhook.call_args[1]
|
||||
assert "content" in webhook_call_kwargs, "Webhook should have content"
|
||||
assert webhook_call_kwargs["url"] == "https://discord.com/api/webhooks/123/abc"
|
||||
|
||||
# Verify execute_webhook was called
|
||||
mock_execute_webhook.assert_called_once_with(mock_webhook, mock_entry)
|
||||
|
||||
|
||||
def test_extract_domain_youtube_feed() -> None:
|
||||
"""Test extract_domain for YouTube feeds."""
|
||||
url: str = "https://www.youtube.com/feeds/videos.xml?channel_id=123456"
|
||||
assert extract_domain(url) == "YouTube", "YouTube feeds should return 'YouTube' as the domain."
|
||||
|
||||
|
||||
def test_extract_domain_reddit_feed() -> None:
|
||||
"""Test extract_domain for Reddit feeds."""
|
||||
url: str = "https://www.reddit.com/r/Python/.rss"
|
||||
assert extract_domain(url) == "Reddit", "Reddit feeds should return 'Reddit' as the domain."
|
||||
|
||||
|
||||
def test_extract_domain_github_feed() -> None:
|
||||
"""Test extract_domain for GitHub feeds."""
|
||||
url: str = "https://www.github.com/user/repo"
|
||||
assert extract_domain(url) == "GitHub", "GitHub feeds should return 'GitHub' as the domain."
|
||||
|
||||
|
||||
def test_extract_domain_custom_domain() -> None:
|
||||
"""Test extract_domain for custom domains."""
|
||||
url: str = "https://www.example.com/feed"
|
||||
assert extract_domain(url) == "Example", "Custom domains should return the capitalized first part of the domain."
|
||||
|
||||
|
||||
def test_extract_domain_no_www_prefix() -> None:
|
||||
"""Test extract_domain removes 'www.' prefix."""
|
||||
url: str = "https://www.example.com/feed"
|
||||
assert extract_domain(url) == "Example", "The 'www.' prefix should be removed from the domain."
|
||||
|
||||
|
||||
def test_extract_domain_no_tld() -> None:
|
||||
"""Test extract_domain for domains without a TLD."""
|
||||
url: str = "https://localhost/feed"
|
||||
assert extract_domain(url) == "Localhost", "Domains without a TLD should return the capitalized domain."
|
||||
|
||||
|
||||
def test_extract_domain_invalid_url() -> None:
|
||||
"""Test extract_domain for invalid URLs."""
|
||||
url: str = "not-a-valid-url"
|
||||
assert extract_domain(url) == "Other", "Invalid URLs should return 'Other' as the domain."
|
||||
|
||||
|
||||
def test_extract_domain_empty_url() -> None:
|
||||
"""Test extract_domain for empty URLs."""
|
||||
url: str = ""
|
||||
assert extract_domain(url) == "Other", "Empty URLs should return 'Other' as the domain."
|
||||
|
||||
|
||||
def test_extract_domain_special_characters() -> None:
|
||||
"""Test extract_domain for URLs with special characters."""
|
||||
url: str = "https://www.ex-ample.com/feed"
|
||||
assert extract_domain(url) == "Ex-ample", "Domains with special characters should return the capitalized domain."
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
argnames=("url", "expected"),
|
||||
argvalues=[
|
||||
("https://blog.something.com", "Something"),
|
||||
("https://www.something.com", "Something"),
|
||||
("https://subdomain.example.co.uk", "Example"),
|
||||
("https://github.com/user/repo", "GitHub"),
|
||||
("https://youtube.com/feeds/videos.xml?channel_id=abc", "YouTube"),
|
||||
("https://reddit.com/r/python/.rss", "Reddit"),
|
||||
("", "Other"),
|
||||
("not a url", "Other"),
|
||||
("https://www.example.com", "Example"),
|
||||
("https://foo.bar.baz.com", "Baz"),
|
||||
],
|
||||
)
|
||||
def test_extract_domain(url: str, expected: str) -> None:
|
||||
assert extract_domain(url) == expected
|
||||
|
39
tests/test_hoyolab_api.py
Normal file
39
tests/test_hoyolab_api.py
Normal file
@ -0,0 +1,39 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from discord_rss_bot.hoyolab_api import extract_post_id_from_hoyolab_url
|
||||
|
||||
|
||||
class TestExtractPostIdFromHoyolabUrl:
|
||||
def test_extract_post_id_from_article_url(self) -> None:
|
||||
"""Test extracting post ID from a direct article URL."""
|
||||
test_cases: list[str] = [
|
||||
"https://www.hoyolab.com/article/38588239",
|
||||
"http://hoyolab.com/article/12345",
|
||||
"https://www.hoyolab.com/article/987654321/comments",
|
||||
]
|
||||
|
||||
expected_ids: list[str] = ["38588239", "12345", "987654321"]
|
||||
|
||||
for url, expected_id in zip(test_cases, expected_ids, strict=False):
|
||||
assert extract_post_id_from_hoyolab_url(url) == expected_id
|
||||
|
||||
def test_url_without_post_id(self) -> None:
|
||||
"""Test with a URL that doesn't have a post ID."""
|
||||
test_cases: list[str] = [
|
||||
"https://www.hoyolab.com/community",
|
||||
]
|
||||
|
||||
for url in test_cases:
|
||||
assert extract_post_id_from_hoyolab_url(url) is None
|
||||
|
||||
def test_edge_cases(self) -> None:
|
||||
"""Test edge cases like None, empty string, and malformed URLs."""
|
||||
test_cases: list[str | None] = [
|
||||
None,
|
||||
"",
|
||||
"not_a_url",
|
||||
"http:/", # Malformed URL
|
||||
]
|
||||
|
||||
for url in test_cases:
|
||||
assert extract_post_id_from_hoyolab_url(url) is None # type: ignore
|
@ -45,7 +45,7 @@ def test_search() -> None:
|
||||
# Check that the feed was added.
|
||||
response = client.get(url="/")
|
||||
assert response.status_code == 200, f"Failed to get /: {response.text}"
|
||||
assert feed_url in response.text, f"Feed not found in /: {response.text}"
|
||||
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
|
||||
|
||||
# Search for an entry.
|
||||
response: Response = client.get(url="/search/?query=a")
|
||||
@ -85,7 +85,7 @@ def test_create_feed() -> None:
|
||||
# Check that the feed was added.
|
||||
response = client.get(url="/")
|
||||
assert response.status_code == 200, f"Failed to get /: {response.text}"
|
||||
assert feed_url in response.text, f"Feed not found in /: {response.text}"
|
||||
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
|
||||
|
||||
|
||||
def test_get() -> None:
|
||||
@ -103,7 +103,7 @@ def test_get() -> None:
|
||||
# Check that the feed was added.
|
||||
response = client.get("/")
|
||||
assert response.status_code == 200, f"Failed to get /: {response.text}"
|
||||
assert feed_url in response.text, f"Feed not found in /: {response.text}"
|
||||
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
|
||||
|
||||
response: Response = client.get(url="/add")
|
||||
assert response.status_code == 200, f"/add failed: {response.text}"
|
||||
@ -157,7 +157,7 @@ def test_pause_feed() -> None:
|
||||
# Check that the feed was paused.
|
||||
response = client.get(url="/")
|
||||
assert response.status_code == 200, f"Failed to get /: {response.text}"
|
||||
assert feed_url in response.text, f"Feed not found in /: {response.text}"
|
||||
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
|
||||
|
||||
|
||||
def test_unpause_feed() -> None:
|
||||
@ -184,7 +184,7 @@ def test_unpause_feed() -> None:
|
||||
# Check that the feed was unpaused.
|
||||
response = client.get(url="/")
|
||||
assert response.status_code == 200, f"Failed to get /: {response.text}"
|
||||
assert feed_url in response.text, f"Feed not found in /: {response.text}"
|
||||
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
|
||||
|
||||
|
||||
def test_remove_feed() -> None:
|
||||
|
@ -1,6 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from discord_rss_bot.filter.utils import is_word_in_text
|
||||
from discord_rss_bot.filter.utils import is_regex_match, is_word_in_text
|
||||
|
||||
|
||||
def test_is_word_in_text() -> None:
|
||||
@ -14,3 +14,51 @@ def test_is_word_in_text() -> None:
|
||||
assert is_word_in_text("Alert,Forma", "Outbreak - Mutagen Mass - Rhea (Saturn)") is False, msg_false
|
||||
assert is_word_in_text("Alert,Forma", "Outbreak - Mutagen Mass - Rhea (Saturn)") is False, msg_false
|
||||
assert is_word_in_text("word1,word2", "This is a sample text containing none of the words.") is False, msg_false
|
||||
|
||||
|
||||
def test_is_regex_match() -> None:
|
||||
msg_true = "Should return True"
|
||||
msg_false = "Should return False"
|
||||
|
||||
# Test basic regex patterns
|
||||
assert is_regex_match(r"word\d+", "This text contains word123") is True, msg_true
|
||||
assert is_regex_match(r"^Hello", "Hello world") is True, msg_true
|
||||
assert is_regex_match(r"world$", "Hello world") is True, msg_true
|
||||
|
||||
# Test case insensitivity
|
||||
assert is_regex_match(r"hello", "This text contains HELLO") is True, msg_true
|
||||
|
||||
# Test comma-separated patterns
|
||||
assert is_regex_match(r"pattern1,pattern2", "This contains pattern2") is True, msg_true
|
||||
assert is_regex_match(r"pattern1, pattern2", "This contains pattern1") is True, msg_true
|
||||
|
||||
# Test regex that shouldn't match
|
||||
assert is_regex_match(r"^start", "This doesn't start with the pattern") is False, msg_false
|
||||
assert is_regex_match(r"end$", "This doesn't end with the pattern") is False, msg_false
|
||||
|
||||
# Test with empty input
|
||||
assert is_regex_match("", "Some text") is False, msg_false
|
||||
assert is_regex_match("pattern", "") is False, msg_false
|
||||
|
||||
# Test with invalid regex (should not raise an exception and return False)
|
||||
assert is_regex_match(r"[incomplete", "Some text") is False, msg_false
|
||||
|
||||
# Test with multiple patterns where one is invalid
|
||||
assert is_regex_match(r"valid, [invalid, \w+", "Contains word") is True, msg_true
|
||||
|
||||
# Test newline-separated patterns
|
||||
newline_patterns = "pattern1\n^start\ncontains\\d+"
|
||||
assert is_regex_match(newline_patterns, "This contains123 text") is True, msg_true
|
||||
assert is_regex_match(newline_patterns, "start of line") is True, msg_true
|
||||
assert is_regex_match(newline_patterns, "pattern1 is here") is True, msg_true
|
||||
assert is_regex_match(newline_patterns, "None of these match") is False, msg_false
|
||||
|
||||
# Test mixed newline and comma patterns (for backward compatibility)
|
||||
mixed_patterns = "pattern1\npattern2,pattern3\npattern4"
|
||||
assert is_regex_match(mixed_patterns, "Contains pattern3") is True, msg_true
|
||||
assert is_regex_match(mixed_patterns, "Contains pattern4") is True, msg_true
|
||||
|
||||
# Test with empty lines and spaces
|
||||
whitespace_patterns = "\\s+\n \n\npattern\n\n"
|
||||
assert is_regex_match(whitespace_patterns, "text with spaces") is True, msg_true
|
||||
assert is_regex_match(whitespace_patterns, "text with pattern") is True, msg_true
|
||||
|
@ -38,6 +38,13 @@ def test_has_white_tags() -> None:
|
||||
check_if_has_tag(reader, feed, "whitelist_title")
|
||||
check_if_has_tag(reader, feed, "whitelist_summary")
|
||||
check_if_has_tag(reader, feed, "whitelist_content")
|
||||
check_if_has_tag(reader, feed, "whitelist_author")
|
||||
|
||||
# Test regex whitelist tags
|
||||
check_if_has_tag(reader, feed, "regex_whitelist_title")
|
||||
check_if_has_tag(reader, feed, "regex_whitelist_summary")
|
||||
check_if_has_tag(reader, feed, "regex_whitelist_content")
|
||||
check_if_has_tag(reader, feed, "regex_whitelist_author")
|
||||
|
||||
# Clean up
|
||||
reader.delete_feed(feed_url)
|
||||
@ -109,3 +116,67 @@ def test_should_be_sent() -> None:
|
||||
assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
|
||||
reader.delete_tag(feed, "whitelist_author")
|
||||
assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
|
||||
|
||||
|
||||
def test_regex_should_be_sent() -> None:
|
||||
"""Test the regex filtering functionality for whitelist."""
|
||||
reader: Reader = get_reader()
|
||||
|
||||
# Add feed and update entries
|
||||
reader.add_feed(feed_url)
|
||||
feed: Feed = reader.get_feed(feed_url)
|
||||
reader.update_feeds()
|
||||
|
||||
# Get first entry
|
||||
first_entry: list[Entry] = []
|
||||
entries: Iterable[Entry] = reader.get_entries(feed=feed)
|
||||
assert entries is not None, "Entries should not be None"
|
||||
for entry in entries:
|
||||
first_entry.append(entry)
|
||||
break
|
||||
assert len(first_entry) == 1, "First entry should be added"
|
||||
|
||||
# Test entry without any regex whitelists
|
||||
assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
|
||||
|
||||
# Test regex whitelist for title
|
||||
reader.set_tag(feed, "regex_whitelist_title", r"fvnnn\w+") # pyright: ignore[reportArgumentType]
|
||||
assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with regex title match"
|
||||
reader.delete_tag(feed, "regex_whitelist_title")
|
||||
assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
|
||||
|
||||
# Test regex whitelist for summary
|
||||
reader.set_tag(feed, "regex_whitelist_summary", r"ffdnfdn\w+") # pyright: ignore[reportArgumentType]
|
||||
assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with regex summary match"
|
||||
reader.delete_tag(feed, "regex_whitelist_summary")
|
||||
assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
|
||||
|
||||
# Test regex whitelist for content
|
||||
reader.set_tag(feed, "regex_whitelist_content", r"ffdnfdnfdn\w+") # pyright: ignore[reportArgumentType]
|
||||
assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with regex content match"
|
||||
reader.delete_tag(feed, "regex_whitelist_content")
|
||||
assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
|
||||
|
||||
# Test regex whitelist for author
|
||||
reader.set_tag(feed, "regex_whitelist_author", r"TheLovinator\d*") # pyright: ignore[reportArgumentType]
|
||||
assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with regex author match"
|
||||
reader.delete_tag(feed, "regex_whitelist_author")
|
||||
assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
|
||||
|
||||
# Test invalid regex pattern (should not raise an exception)
|
||||
reader.set_tag(feed, "regex_whitelist_title", r"[incomplete") # pyright: ignore[reportArgumentType]
|
||||
assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent with invalid regex"
|
||||
reader.delete_tag(feed, "regex_whitelist_title")
|
||||
|
||||
# Test multiple regex patterns separated by commas
|
||||
reader.set_tag(feed, "regex_whitelist_author", r"pattern1,TheLovinator\d*,pattern3") # pyright: ignore[reportArgumentType]
|
||||
assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with one matching pattern in list"
|
||||
reader.delete_tag(feed, "regex_whitelist_author")
|
||||
assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
|
||||
|
||||
# Test newline-separated regex patterns
|
||||
newline_patterns = "pattern1\nTheLovinator\\d*\npattern3"
|
||||
reader.set_tag(feed, "regex_whitelist_author", newline_patterns) # pyright: ignore[reportArgumentType]
|
||||
assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with newline-separated patterns"
|
||||
reader.delete_tag(feed, "regex_whitelist_author")
|
||||
assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
|
||||
|
Reference in New Issue
Block a user