From 71695c29871211204cd9e64575a34340f3ae53ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Helle=C5=9Ben?= Date: Sun, 15 Mar 2026 19:37:55 +0100 Subject: [PATCH] WIP --- discord_rss_bot/custom_filters.py | 10 ++--- discord_rss_bot/custom_message.py | 31 +++++++------ discord_rss_bot/feeds.py | 70 +++++++++++++++-------------- discord_rss_bot/filter/blacklist.py | 40 ++++++++--------- discord_rss_bot/filter/whitelist.py | 40 ++++++++--------- discord_rss_bot/main.py | 27 ++++++----- discord_rss_bot/search.py | 9 +--- tests/test_blacklist.py | 6 +-- tests/test_custom_filter.py | 64 +++++++++++++------------- tests/test_custom_message.py | 12 ++--- tests/test_feeds.py | 4 +- tests/test_main.py | 21 +++++++-- tests/test_search.py | 2 +- tests/test_settings.py | 20 ++++----- tests/test_whitelist.py | 6 +-- 15 files changed, 186 insertions(+), 176 deletions(-) diff --git a/discord_rss_bot/custom_filters.py b/discord_rss_bot/custom_filters.py index 7d8fe83..fd9461c 100644 --- a/discord_rss_bot/custom_filters.py +++ b/discord_rss_bot/custom_filters.py @@ -8,15 +8,11 @@ from discord_rss_bot.filter.blacklist import entry_should_be_skipped from discord_rss_bot.filter.blacklist import feed_has_blacklist_tags from discord_rss_bot.filter.whitelist import has_white_tags from discord_rss_bot.filter.whitelist import should_be_sent -from discord_rss_bot.settings import get_reader if TYPE_CHECKING: from reader import Entry from reader import Reader -# Our reader -reader: Reader = get_reader() - @lru_cache def encode_url(url_to_quote: str) -> str: @@ -34,11 +30,12 @@ def encode_url(url_to_quote: str) -> str: return urllib.parse.quote(string=url_to_quote) if url_to_quote else "" -def entry_is_whitelisted(entry_to_check: Entry) -> bool: +def entry_is_whitelisted(entry_to_check: Entry, reader: Reader) -> bool: """Check if the entry is whitelisted. Args: entry_to_check: The feed to check. + reader: Custom Reader instance. Returns: bool: True if the feed is whitelisted, False otherwise. @@ -47,11 +44,12 @@ def entry_is_whitelisted(entry_to_check: Entry) -> bool: return bool(has_white_tags(reader, entry_to_check.feed) and should_be_sent(reader, entry_to_check)) -def entry_is_blacklisted(entry_to_check: Entry) -> bool: +def entry_is_blacklisted(entry_to_check: Entry, reader: Reader) -> bool: """Check if the entry is blacklisted. Args: entry_to_check: The feed to check. + reader: Custom Reader instance. Returns: bool: True if the feed is blacklisted, False otherwise. diff --git a/discord_rss_bot/custom_message.py b/discord_rss_bot/custom_message.py index eeabdff..1626e39 100644 --- a/discord_rss_bot/custom_message.py +++ b/discord_rss_bot/custom_message.py @@ -12,7 +12,6 @@ from bs4 import Tag from markdownify import markdownify from discord_rss_bot.is_url_valid import is_url_valid -from discord_rss_bot.settings import get_reader if TYPE_CHECKING: from reader import Entry @@ -118,18 +117,18 @@ def format_entry_html_for_discord(text: str) -> str: return _restore_discord_timestamp_tags(formatted_text, replacements) -def replace_tags_in_text_message(entry: Entry) -> str: +def replace_tags_in_text_message(entry: Entry, reader: Reader) -> str: """Replace tags in custom_message. Args: entry: The entry to get the tags from. + reader: Custom Reader instance. Returns: Returns the custom_message with the tags replaced. """ feed: Feed = entry.feed - custom_reader: Reader = get_reader() - custom_message: str = get_custom_message(feed=feed, custom_reader=custom_reader) + custom_message: str = get_custom_message(feed=feed, reader=reader) content = "" if entry.content: @@ -231,18 +230,18 @@ def get_first_image(summary: str | None, content: str | None) -> str: return "" -def replace_tags_in_embed(feed: Feed, entry: Entry) -> CustomEmbed: +def replace_tags_in_embed(feed: Feed, entry: Entry, reader: Reader) -> CustomEmbed: """Replace tags in embed. Args: feed: The feed to get the tags from. entry: The entry to get the tags from. + reader: Custom Reader instance. Returns: Returns the embed with the tags replaced. """ - custom_reader: Reader = get_reader() - embed: CustomEmbed = get_embed(feed=feed, custom_reader=custom_reader) + embed: CustomEmbed = get_embed(feed=feed, reader=reader) content = "" if entry.content: @@ -333,29 +332,29 @@ def _replace_embed_tags(embed: CustomEmbed, template: str, replace_with: str) -> embed.footer_icon_url = try_to_replace(embed.footer_icon_url, template, replace_with) -def get_custom_message(custom_reader: Reader, feed: Feed) -> str: +def get_custom_message(reader: Reader, feed: Feed) -> str: """Get custom_message tag from feed. Args: - custom_reader: What Reader to use. + reader: What Reader to use. feed: The feed to get the tag from. Returns: Returns the contents from the custom_message tag. """ try: - custom_message: str = str(custom_reader.get_tag(feed, "custom_message", "")) + custom_message: str = str(reader.get_tag(feed, "custom_message", "")) except ValueError: custom_message = "" return custom_message -def save_embed(custom_reader: Reader, feed: Feed, embed: CustomEmbed) -> None: +def save_embed(reader: Reader, feed: Feed, embed: CustomEmbed) -> None: """Set embed tag in feed. Args: - custom_reader: What Reader to use. + reader: What Reader to use. feed: The feed to set the tag in. embed: The embed to set. """ @@ -371,20 +370,20 @@ def save_embed(custom_reader: Reader, feed: Feed, embed: CustomEmbed) -> None: "footer_text": embed.footer_text, "footer_icon_url": embed.footer_icon_url, } - custom_reader.set_tag(feed, "embed", json.dumps(embed_dict)) # pyright: ignore[reportArgumentType] + reader.set_tag(feed, "embed", json.dumps(embed_dict)) # pyright: ignore[reportArgumentType] -def get_embed(custom_reader: Reader, feed: Feed) -> CustomEmbed: +def get_embed(reader: Reader, feed: Feed) -> CustomEmbed: """Get embed tag from feed. Args: - custom_reader: What Reader to use. + reader: What Reader to use. feed: The feed to get the tag from. Returns: Returns the contents from the embed tag. """ - embed = custom_reader.get_tag(feed, "embed", "") + embed = reader.get_tag(feed, "embed", "") if embed: if not isinstance(embed, str): diff --git a/discord_rss_bot/feeds.py b/discord_rss_bot/feeds.py index c52767a..06263dc 100644 --- a/discord_rss_bot/feeds.py +++ b/discord_rss_bot/feeds.py @@ -96,26 +96,26 @@ def extract_domain(url: str) -> str: # noqa: PLR0911 return "Other" -def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> str | None: # noqa: C901 +def send_entry_to_discord(entry: Entry, reader: Reader | None = None) -> str | None: # noqa: C901 """Send a single entry to Discord. Args: entry: The entry to send to Discord. - custom_reader: The reader to use. If None, the default reader will be used. + reader: The reader to use. If None, the default reader will be used. Returns: str | None: The error message if there was an error, otherwise None. """ # Get the default reader if we didn't get a custom one. - reader: Reader = get_reader() if custom_reader is None else custom_reader + effective_reader: Reader = get_reader() if reader is None else reader # Get the webhook URL for the entry. - webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook", "")) + webhook_url: str = str(effective_reader.get_tag(entry.feed_url, "webhook", "")) if not webhook_url: return "No webhook URL found." # If https://discord.com/quests/ is in the URL, send a separate message with the URL. - send_discord_quest_notification(entry, webhook_url) + send_discord_quest_notification(entry, webhook_url, reader=effective_reader) # Check if this is a c3kay feed if is_c3kay_feed(entry.feed.url): @@ -126,7 +126,7 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id) if post_data: webhook = create_hoyolab_webhook(webhook_url, entry, post_data) - execute_webhook(webhook, entry) + execute_webhook(webhook, entry, reader=effective_reader) return None logger.warning( "Failed to create Hoyolab webhook for feed %s, falling back to regular processing", @@ -139,15 +139,15 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> # Try to get the custom message for the feed. If the user has none, we will use the default message. # This has to be a string for some reason so don't change it to "not custom_message.get_custom_message()" - if get_custom_message(reader, entry.feed) != "": # noqa: PLC1901 - webhook_message: str = replace_tags_in_text_message(entry=entry) + if get_custom_message(effective_reader, entry.feed) != "": # noqa: PLC1901 + webhook_message: str = replace_tags_in_text_message(entry=entry, reader=effective_reader) if not webhook_message: webhook_message = "No message found." # Create the webhook. try: - should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed", True)) + should_send_embed = bool(effective_reader.get_tag(entry.feed, "should_send_embed", True)) except StorageError: logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url) should_send_embed = True @@ -157,15 +157,15 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> should_send_embed = False if should_send_embed: - webhook = create_embed_webhook(webhook_url, entry) + webhook = create_embed_webhook(webhook_url, entry, reader=effective_reader) else: webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True) - execute_webhook(webhook, entry) + execute_webhook(webhook, entry, reader=effective_reader) return None -def send_discord_quest_notification(entry: Entry, webhook_url: str) -> None: +def send_discord_quest_notification(entry: Entry, webhook_url: str, reader: Reader) -> None: """Send a separate message to Discord if the entry is a quest notification.""" quest_regex: re.Pattern[str] = re.compile(r"https://discord\.com/quests/\d+") @@ -177,7 +177,7 @@ def send_discord_quest_notification(entry: Entry, webhook_url: str) -> None: content=quest_url, rate_limit_retry=True, ) - execute_webhook(webhook, entry) + execute_webhook(webhook, entry, reader=reader) # Iterate through the content of the entry for content in entry.content: @@ -235,12 +235,17 @@ def set_title(custom_embed: CustomEmbed, discord_embed: DiscordEmbed) -> None: discord_embed.set_title(embed_title) if embed_title else None -def create_embed_webhook(webhook_url: str, entry: Entry) -> DiscordWebhook: # noqa: C901 +def create_embed_webhook( # noqa: C901 + webhook_url: str, + entry: Entry, + reader: Reader, +) -> DiscordWebhook: """Create a webhook with an embed. Args: webhook_url (str): The webhook URL. entry (Entry): The entry to send to Discord. + reader (Reader): The Reader instance to use for getting embed data. Returns: DiscordWebhook: The webhook with the embed. @@ -249,7 +254,7 @@ def create_embed_webhook(webhook_url: str, entry: Entry) -> DiscordWebhook: # n feed: Feed = entry.feed # Get the embed data from the database. - custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry) + custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry, reader=reader) discord_embed: DiscordEmbed = DiscordEmbed() @@ -337,53 +342,53 @@ def set_entry_as_read(reader: Reader, entry: Entry) -> None: logger.exception("Error setting entry to read: %s", entry.id) -def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: # noqa: C901, PLR0912 +def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: # noqa: C901, PLR0912 """Send entries to Discord. If response was not ok, we will log the error and mark the entry as unread, so it will be sent again next time. Args: - custom_reader: If we should use a custom reader instead of the default one. + reader: If we should use a custom reader instead of the default one. feed: The feed to send to Discord. do_once: If we should only send one entry. This is used in the test. """ logger.info("Starting to send entries to Discord.") # Get the default reader if we didn't get a custom one. - reader: Reader = get_reader() if custom_reader is None else custom_reader + effective_reader: Reader = get_reader() if reader is None else reader # Check for new entries for every feed. - reader.update_feeds( + effective_reader.update_feeds( scheduled=True, workers=os.cpu_count() or 1, ) # Loop through the unread entries. - entries: Iterable[Entry] = reader.get_entries(feed=feed, read=False) + entries: Iterable[Entry] = effective_reader.get_entries(feed=feed, read=False) for entry in entries: - set_entry_as_read(reader, entry) + set_entry_as_read(effective_reader, entry) if entry.added < datetime.datetime.now(tz=entry.added.tzinfo) - datetime.timedelta(days=1): logger.info("Entry is older than 24 hours: %s from %s", entry.id, entry.feed.url) continue - webhook_url: str = get_webhook_url(reader, entry) + webhook_url: str = get_webhook_url(effective_reader, entry) if not webhook_url: logger.info("No webhook URL found for feed: %s", entry.feed.url) continue - should_send_embed: bool = should_send_embed_check(reader, entry) + should_send_embed: bool = should_send_embed_check(effective_reader, entry) # Youtube feeds only need to send the link if is_youtube_feed(entry.feed.url): should_send_embed = False if should_send_embed: - webhook = create_embed_webhook(webhook_url, entry) + webhook = create_embed_webhook(webhook_url, entry, reader=effective_reader) else: # If the user has set the custom message to an empty string, we will use the default message, otherwise we # will use the custom message. - if get_custom_message(reader, entry.feed) != "": # noqa: PLC1901 - webhook_message = replace_tags_in_text_message(entry) + if get_custom_message(effective_reader, entry.feed) != "": # noqa: PLC1901 + webhook_message = replace_tags_in_text_message(entry, reader=effective_reader) else: webhook_message: str = str(default_custom_message) @@ -393,12 +398,12 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True) # Check if the entry is blacklisted, and if it is, we will skip it. - if entry_should_be_skipped(reader, entry): + if entry_should_be_skipped(effective_reader, entry): logger.info("Entry was blacklisted: %s", entry.id) continue # Check if the feed has a whitelist, and if it does, check if the entry is whitelisted. - if has_white_tags(reader, entry.feed) and not should_be_sent(reader, entry): + if has_white_tags(effective_reader, entry.feed) and not should_be_sent(effective_reader, entry): logger.info("Entry was not whitelisted: %s", entry.id) continue @@ -411,7 +416,7 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id) if post_data: webhook = create_hoyolab_webhook(webhook_url, entry, post_data) - execute_webhook(webhook, entry) + execute_webhook(webhook, entry, reader=effective_reader) return logger.warning( "Failed to create Hoyolab webhook for feed %s, falling back to regular processing", @@ -421,7 +426,7 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url) # Send the entry to Discord as it is not blacklisted or feed has a whitelist. - execute_webhook(webhook, entry) + execute_webhook(webhook, entry, reader=effective_reader) # If we only want to send one entry, we will break the loop. This is used when testing this function. if do_once: @@ -429,16 +434,15 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non break -def execute_webhook(webhook: DiscordWebhook, entry: Entry) -> None: +def execute_webhook(webhook: DiscordWebhook, entry: Entry, reader: Reader) -> None: """Execute the webhook. Args: webhook (DiscordWebhook): The webhook to execute. entry (Entry): The entry to send to Discord. + reader (Reader): The Reader instance to use for checking feed status. """ - reader: Reader = get_reader() - # If the feed has been paused or deleted, we will not send the entry to Discord. entry_feed: Feed = entry.feed if entry_feed.updates_enabled is False: diff --git a/discord_rss_bot/filter/blacklist.py b/discord_rss_bot/filter/blacklist.py index 95c0716..8260993 100644 --- a/discord_rss_bot/filter/blacklist.py +++ b/discord_rss_bot/filter/blacklist.py @@ -11,7 +11,7 @@ if TYPE_CHECKING: from reader import Reader -def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool: +def feed_has_blacklist_tags(reader: Reader, feed: Feed) -> bool: """Return True if the feed has blacklist tags. The following tags are checked: @@ -25,21 +25,21 @@ def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool: - regex_blacklist_title Args: - custom_reader: The reader. + reader: The reader. feed: The feed to check. Returns: bool: If the feed has any of the tags. """ - blacklist_author: str = str(custom_reader.get_tag(feed, "blacklist_author", "")).strip() - blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", "")).strip() - blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", "")).strip() - blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", "")).strip() + blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", "")).strip() + blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", "")).strip() + blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", "")).strip() + blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", "")).strip() - regex_blacklist_author: str = str(custom_reader.get_tag(feed, "regex_blacklist_author", "")).strip() - regex_blacklist_content: str = str(custom_reader.get_tag(feed, "regex_blacklist_content", "")).strip() - regex_blacklist_summary: str = str(custom_reader.get_tag(feed, "regex_blacklist_summary", "")).strip() - regex_blacklist_title: str = str(custom_reader.get_tag(feed, "regex_blacklist_title", "")).strip() + regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", "")).strip() + regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", "")).strip() + regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", "")).strip() + regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", "")).strip() return bool( blacklist_title @@ -53,11 +53,11 @@ def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool: ) -def entry_should_be_skipped(custom_reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 +def entry_should_be_skipped(reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 """Return True if the entry is in the blacklist. Args: - custom_reader: The reader. + reader: The reader. entry: The entry to check. Returns: @@ -65,15 +65,15 @@ def entry_should_be_skipped(custom_reader: Reader, entry: Entry) -> bool: # noq """ feed = entry.feed - blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", "")).strip() - blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", "")).strip() - blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", "")).strip() - blacklist_author: str = str(custom_reader.get_tag(feed, "blacklist_author", "")).strip() + blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", "")).strip() + blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", "")).strip() + blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", "")).strip() + blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", "")).strip() - regex_blacklist_title: str = str(custom_reader.get_tag(feed, "regex_blacklist_title", "")).strip() - regex_blacklist_summary: str = str(custom_reader.get_tag(feed, "regex_blacklist_summary", "")).strip() - regex_blacklist_content: str = str(custom_reader.get_tag(feed, "regex_blacklist_content", "")).strip() - regex_blacklist_author: str = str(custom_reader.get_tag(feed, "regex_blacklist_author", "")).strip() + regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", "")).strip() + regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", "")).strip() + regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", "")).strip() + regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", "")).strip() # TODO(TheLovinator): Also add support for entry_text and more. # Check regular blacklist diff --git a/discord_rss_bot/filter/whitelist.py b/discord_rss_bot/filter/whitelist.py index 9c198c4..bb5303d 100644 --- a/discord_rss_bot/filter/whitelist.py +++ b/discord_rss_bot/filter/whitelist.py @@ -11,7 +11,7 @@ if TYPE_CHECKING: from reader import Reader -def has_white_tags(custom_reader: Reader, feed: Feed) -> bool: +def has_white_tags(reader: Reader, feed: Feed) -> bool: """Return True if the feed has whitelist tags. The following tags are checked: @@ -25,21 +25,21 @@ def has_white_tags(custom_reader: Reader, feed: Feed) -> bool: - whitelist_title Args: - custom_reader: The reader. + reader: The reader. feed: The feed to check. Returns: bool: If the feed has any of the tags. """ - whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")).strip() - whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")).strip() - whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")).strip() - whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", "")).strip() + whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", "")).strip() + whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", "")).strip() + whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", "")).strip() + whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", "")).strip() - regex_whitelist_title: str = str(custom_reader.get_tag(feed, "regex_whitelist_title", "")).strip() - regex_whitelist_summary: str = str(custom_reader.get_tag(feed, "regex_whitelist_summary", "")).strip() - regex_whitelist_content: str = str(custom_reader.get_tag(feed, "regex_whitelist_content", "")).strip() - regex_whitelist_author: str = str(custom_reader.get_tag(feed, "regex_whitelist_author", "")).strip() + regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", "")).strip() + regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", "")).strip() + regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", "")).strip() + regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", "")).strip() return bool( whitelist_title @@ -53,11 +53,11 @@ def has_white_tags(custom_reader: Reader, feed: Feed) -> bool: ) -def should_be_sent(custom_reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 +def should_be_sent(reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 """Return True if the entry is in the whitelist. Args: - custom_reader: The reader. + reader: The reader. entry: The entry to check. Returns: @@ -65,16 +65,16 @@ def should_be_sent(custom_reader: Reader, entry: Entry) -> bool: # noqa: PLR091 """ feed: Feed = entry.feed # Regular whitelist tags - whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")).strip() - whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")).strip() - whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")).strip() - whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", "")).strip() + whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", "")).strip() + whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", "")).strip() + whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", "")).strip() + whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", "")).strip() # Regex whitelist tags - regex_whitelist_title: str = str(custom_reader.get_tag(feed, "regex_whitelist_title", "")).strip() - regex_whitelist_summary: str = str(custom_reader.get_tag(feed, "regex_whitelist_summary", "")).strip() - regex_whitelist_content: str = str(custom_reader.get_tag(feed, "regex_whitelist_content", "")).strip() - regex_whitelist_author: str = str(custom_reader.get_tag(feed, "regex_whitelist_author", "")).strip() + regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", "")).strip() + regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", "")).strip() + regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", "")).strip() + regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", "")).strip() # Check regular whitelist if entry.title and whitelist_title and is_word_in_text(whitelist_title, entry.title): diff --git a/discord_rss_bot/main.py b/discord_rss_bot/main.py index 6327e5d..2838c10 100644 --- a/discord_rss_bot/main.py +++ b/discord_rss_bot/main.py @@ -179,8 +179,6 @@ templates: Jinja2Templates = Jinja2Templates(directory="discord_rss_bot/template # Add the filters to the Jinja2 environment so they can be used in html templates. templates.env.filters["encode_url"] = lambda url: urllib.parse.quote(url) if url else "" -templates.env.filters["entry_is_whitelisted"] = entry_is_whitelisted -templates.env.filters["entry_is_blacklisted"] = entry_is_blacklisted templates.env.filters["discord_markdown"] = markdownify templates.env.filters["relative_time"] = relative_time templates.env.globals["get_backup_path"] = get_backup_path @@ -938,7 +936,7 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 except EntryNotFoundError as e: current_entries = list(reader.get_entries(feed=clean_feed_url)) msg: str = f"{e}\n\n{[entry.id for entry in current_entries]}" - html: str = create_html_for_feed(current_entries, clean_feed_url) + html: str = create_html_for_feed(reader=reader, entries=current_entries, current_feed_url=clean_feed_url) # Get feed and global intervals for error case too feed_interval: int | None = None @@ -988,7 +986,7 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 last_entry = entries[-1] # Create the html for the entries. - html: str = create_html_for_feed(entries, clean_feed_url) + html: str = create_html_for_feed(reader=reader, entries=entries, current_feed_url=clean_feed_url) should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed", True)) @@ -1024,10 +1022,15 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 return templates.TemplateResponse(request=request, name="feed.html", context=context) -def create_html_for_feed(entries: Iterable[Entry], current_feed_url: str = "") -> str: # noqa: C901, PLR0914 +def create_html_for_feed( # noqa: C901, PLR0914 + reader: Reader, + entries: Iterable[Entry], + current_feed_url: str = "", +) -> str: """Create HTML for the search results. Args: + reader: The Reader instance to use. entries: The entries to create HTML for. current_feed_url: The feed URL currently being viewed in /feed. @@ -1045,17 +1048,19 @@ def create_html_for_feed(entries: Iterable[Entry], current_feed_url: str = "") - first_image = get_first_image(summary, content) - text: str = replace_tags_in_text_message(entry) or "
No content available.
" + text: str = replace_tags_in_text_message(entry, reader=reader) or ( + "
No content available.
" + ) published = "" if entry.published: published: str = entry.published.strftime("%Y-%m-%d %H:%M:%S") blacklisted: str = "" - if entry_is_blacklisted(entry): + if entry_is_blacklisted(entry, reader=reader): blacklisted = "Blacklisted" whitelisted: str = "" - if entry_is_whitelisted(entry): + if entry_is_whitelisted(entry, reader=reader): whitelisted = "Whitelisted" source_feed_url: str = getattr(entry, "original_feed_url", None) or entry.feed.url @@ -1414,7 +1419,7 @@ async def search( HTMLResponse: The search page. """ reader.update_search() - context = create_search_context(query) + context = create_search_context(query, reader=reader) return templates.TemplateResponse(request=request, name="search.html", context={"request": request, **context}) @@ -1437,7 +1442,7 @@ async def post_entry( if entry is None: return HTMLResponse(status_code=404, content=f"Entry '{entry_id}' not found.") - if result := send_entry_to_discord(entry=entry): + if result := send_entry_to_discord(entry=entry, reader=reader): return result # Redirect to the feed page. @@ -1601,7 +1606,7 @@ async def get_webhook_entries( # noqa: C901, PLR0914 last_entry = paginated_entries[-1] # Create the html for the entries - html: str = create_html_for_feed(paginated_entries) + html: str = create_html_for_feed(reader=reader, entries=paginated_entries) # Check if there are more entries available total_entries: int = len(all_entries) diff --git a/discord_rss_bot/search.py b/discord_rss_bot/search.py index a39f304..85129ac 100644 --- a/discord_rss_bot/search.py +++ b/discord_rss_bot/search.py @@ -3,8 +3,6 @@ from __future__ import annotations import urllib.parse from typing import TYPE_CHECKING -from discord_rss_bot.settings import get_reader - if TYPE_CHECKING: from collections.abc import Iterable @@ -14,19 +12,16 @@ if TYPE_CHECKING: from reader import Reader -def create_search_context(query: str, custom_reader: Reader | None = None) -> dict: +def create_search_context(query: str, reader: Reader) -> dict: """Build context for search.html template. - If custom_reader is None, use the default reader from settings. - Args: query (str): The search query. - custom_reader (Reader | None): Optional custom Reader instance. + reader (Reader): Custom Reader instance. Returns: dict: Context dictionary for rendering the search results. """ - reader: Reader = get_reader() if custom_reader is None else custom_reader search_results: Iterable[EntrySearchResult] = reader.search_entries(query) results: list[dict] = [] diff --git a/tests/test_blacklist.py b/tests/test_blacklist.py index 354fdca..0c756ad 100644 --- a/tests/test_blacklist.py +++ b/tests/test_blacklist.py @@ -38,7 +38,7 @@ def test_has_black_tags() -> None: # Test feed without any blacklist tags assert_msg: str = "Feed should not have any blacklist tags" - assert feed_has_blacklist_tags(custom_reader=get_reader(), feed=feed) is False, assert_msg + assert feed_has_blacklist_tags(reader=get_reader(), feed=feed) is False, assert_msg check_if_has_tag(reader, feed, "blacklist_title") check_if_has_tag(reader, feed, "blacklist_summary") @@ -58,11 +58,11 @@ def test_has_black_tags() -> None: def check_if_has_tag(reader: Reader, feed: Feed, blacklist_name: str) -> None: reader.set_tag(feed, blacklist_name, "a") # pyright: ignore[reportArgumentType] assert_msg: str = f"Feed should have blacklist tags: {blacklist_name}" - assert feed_has_blacklist_tags(custom_reader=reader, feed=feed) is True, assert_msg + assert feed_has_blacklist_tags(reader=reader, feed=feed) is True, assert_msg asset_msg: str = f"Feed should not have any blacklist tags: {blacklist_name}" reader.delete_tag(feed, blacklist_name) - assert feed_has_blacklist_tags(custom_reader=reader, feed=feed) is False, asset_msg + assert feed_has_blacklist_tags(reader=reader, feed=feed) is False, asset_msg def test_should_be_skipped() -> None: diff --git a/tests/test_custom_filter.py b/tests/test_custom_filter.py index 5538608..9611698 100644 --- a/tests/test_custom_filter.py +++ b/tests/test_custom_filter.py @@ -45,39 +45,39 @@ def test_entry_is_whitelisted() -> None: Path.mkdir(Path(temp_dir), exist_ok=True) custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite") - custom_reader: Reader = get_reader(custom_location=str(custom_loc)) + reader: Reader = get_reader(custom_location=str(custom_loc)) # Add a feed to the database. - custom_reader.add_feed("https://lovinator.space/rss_test.xml") - custom_reader.update_feed("https://lovinator.space/rss_test.xml") + reader.add_feed("https://lovinator.space/rss_test.xml") + reader.update_feed("https://lovinator.space/rss_test.xml") # whitelist_title - custom_reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] - for entry in custom_reader.get_entries(): - if entry_is_whitelisted(entry) is True: + reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] + for entry in reader.get_entries(): + if entry_is_whitelisted(entry, reader=reader) is True: assert entry.title == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.title}" break - custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_title") + reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_title") # whitelist_summary - custom_reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] - for entry in custom_reader.get_entries(): - if entry_is_whitelisted(entry) is True: + reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] + for entry in reader.get_entries(): + if entry_is_whitelisted(entry, reader=reader) is True: assert entry.summary == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.summary}" break - custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_summary") + reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_summary") # whitelist_content - custom_reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] - for entry in custom_reader.get_entries(): - if entry_is_whitelisted(entry) is True: + reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] + for entry in reader.get_entries(): + if entry_is_whitelisted(entry, reader=reader) is True: assert_msg = f"Expected:

ffdnfdnfdnfdnfdndfn

, Got: {entry.content[0].value}" assert entry.content[0].value == "

ffdnfdnfdnfdnfdndfn

", assert_msg break - custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_content") + reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_content") # Close the reader, so we can delete the directory. - custom_reader.close() + reader.close() def test_entry_is_blacklisted() -> None: @@ -87,36 +87,36 @@ def test_entry_is_blacklisted() -> None: Path.mkdir(Path(temp_dir), exist_ok=True) custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite") - custom_reader: Reader = get_reader(custom_location=str(custom_loc)) + reader: Reader = get_reader(custom_location=str(custom_loc)) # Add a feed to the database. - custom_reader.add_feed("https://lovinator.space/rss_test.xml") - custom_reader.update_feed("https://lovinator.space/rss_test.xml") + reader.add_feed("https://lovinator.space/rss_test.xml") + reader.update_feed("https://lovinator.space/rss_test.xml") # blacklist_title - custom_reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] - for entry in custom_reader.get_entries(): - if entry_is_blacklisted(entry) is True: + reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] + for entry in reader.get_entries(): + if entry_is_blacklisted(entry, reader=reader) is True: assert entry.title == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.title}" break - custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_title") + reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_title") # blacklist_summary - custom_reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] - for entry in custom_reader.get_entries(): - if entry_is_blacklisted(entry) is True: + reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] + for entry in reader.get_entries(): + if entry_is_blacklisted(entry, reader=reader) is True: assert entry.summary == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.summary}" break - custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_summary") + reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_summary") # blacklist_content - custom_reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] - for entry in custom_reader.get_entries(): - if entry_is_blacklisted(entry) is True: + reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] + for entry in reader.get_entries(): + if entry_is_blacklisted(entry, reader=reader) is True: assert_msg = f"Expected:

ffdnfdnfdnfdnfdndfn

, Got: {entry.content[0].value}" assert entry.content[0].value == "

ffdnfdnfdnfdnfdndfn

", assert_msg break - custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_content") + reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_content") # Close the reader, so we can delete the directory. - custom_reader.close() + reader.close() diff --git a/tests/test_custom_message.py b/tests/test_custom_message.py index 6fc4d41..4b23f45 100644 --- a/tests/test_custom_message.py +++ b/tests/test_custom_message.py @@ -102,12 +102,10 @@ def test_format_entry_html_for_discord_does_not_preserve_invalid_timestamp_style @patch("discord_rss_bot.custom_message.get_custom_message") -@patch("discord_rss_bot.custom_message.get_reader") def test_replace_tags_in_text_message_preserves_timestamp_tags( - mock_get_reader: MagicMock, mock_get_custom_message: MagicMock, ) -> None: - mock_get_reader.return_value = MagicMock() + mock_reader = MagicMock() mock_get_custom_message.return_value = "{{entry_summary}}" summary_parts: list[str] = [ f"

Format {index}: ({timestamp_tag.replace('<', '<').replace('>', '>')})

" @@ -116,19 +114,17 @@ def test_replace_tags_in_text_message_preserves_timestamp_tags( entry_ns: SimpleNamespace = make_entry("".join(summary_parts)) entry: Entry = typing.cast("Entry", entry_ns) - rendered: str = replace_tags_in_text_message(entry) + rendered: str = replace_tags_in_text_message(entry, reader=mock_reader) for timestamp_tag in TIMESTAMP_FORMATS: assert timestamp_tag in rendered @patch("discord_rss_bot.custom_message.get_embed") -@patch("discord_rss_bot.custom_message.get_reader") def test_replace_tags_in_embed_preserves_timestamp_tags( - mock_get_reader: MagicMock, mock_get_embed: MagicMock, ) -> None: - mock_get_reader.return_value = MagicMock() + mock_reader = MagicMock() mock_get_embed.return_value = CustomEmbed(description="{{entry_summary}}") summary_parts: list[str] = [ f"

Format {index}: ({timestamp_tag.replace('<', '<').replace('>', '>')})

" @@ -138,7 +134,7 @@ def test_replace_tags_in_embed_preserves_timestamp_tags( entry: Entry = typing.cast("Entry", entry_ns) - embed: CustomEmbed = replace_tags_in_embed(entry_ns.feed, entry) + embed: CustomEmbed = replace_tags_in_embed(entry_ns.feed, entry, reader=mock_reader) for timestamp_tag in TIMESTAMP_FORMATS: assert timestamp_tag in embed.description diff --git a/tests/test_feeds.py b/tests/test_feeds.py index b19ac2a..00d2b6e 100644 --- a/tests/test_feeds.py +++ b/tests/test_feeds.py @@ -55,7 +55,7 @@ def test_send_to_discord() -> None: assert reader.get_tag(feed, "webhook") == webhook_url, f"The webhook URL should be '{webhook_url}'." # Send the feed to Discord. - send_to_discord(custom_reader=reader, feed=feed, do_once=True) + send_to_discord(reader=reader, feed=feed, do_once=True) # Close the reader, so we can delete the directory. reader.close() @@ -200,7 +200,7 @@ def test_send_entry_to_discord_youtube_feed( assert webhook_call_kwargs["url"] == "https://discord.com/api/webhooks/123/abc" # Verify execute_webhook was called - mock_execute_webhook.assert_called_once_with(mock_webhook, mock_entry) + mock_execute_webhook.assert_called_once_with(mock_webhook, mock_entry, reader=mock_reader) def test_extract_domain_youtube_feed() -> None: diff --git a/tests/test_main.py b/tests/test_main.py index 8ddb0b8..25aff2a 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -718,11 +718,24 @@ def test_create_html_marks_entries_from_another_feed(monkeypatch: pytest.MonkeyP original_feed_url="https://example.com/feed-b.xml", ) - monkeypatch.setattr("discord_rss_bot.main.replace_tags_in_text_message", lambda _entry: "Rendered content") - monkeypatch.setattr("discord_rss_bot.main.entry_is_blacklisted", lambda _entry: False) - monkeypatch.setattr("discord_rss_bot.main.entry_is_whitelisted", lambda _entry: False) + monkeypatch.setattr( + "discord_rss_bot.main.replace_tags_in_text_message", + lambda _entry, **_kwargs: "Rendered content", + ) + monkeypatch.setattr("discord_rss_bot.main.entry_is_blacklisted", lambda _entry, **_kwargs: False) + monkeypatch.setattr("discord_rss_bot.main.entry_is_whitelisted", lambda _entry, **_kwargs: False) - html = create_html_for_feed(cast("list[Entry]", [same_feed_entry, other_feed_entry]), selected_feed_url) + same_feed_entry_typed: Entry = cast("Entry", same_feed_entry) + other_feed_entry_typed: Entry = cast("Entry", other_feed_entry) + + html: str = create_html_for_feed( + reader=MagicMock(), + current_feed_url=selected_feed_url, + entries=[ + same_feed_entry_typed, + other_feed_entry_typed, + ], + ) assert "From another feed: https://example.com/feed-b.xml" in html assert "From another feed: https://example.com/feed-a.xml" not in html diff --git a/tests/test_search.py b/tests/test_search.py index ada1464..77681cf 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -46,7 +46,7 @@ def test_create_search_context() -> None: reader.update_search() # Create the search context. - context: dict = create_search_context("test", custom_reader=reader) + context: dict = create_search_context("test", reader=reader) assert context is not None, f"The context should not be None. Got: {context}" # Close the reader, so we can delete the directory. diff --git a/tests/test_settings.py b/tests/test_settings.py index 5a54094..bcab720 100644 --- a/tests/test_settings.py +++ b/tests/test_settings.py @@ -22,12 +22,12 @@ def test_reader() -> None: Path.mkdir(Path(temp_dir), exist_ok=True) custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite") - custom_reader: Reader = get_reader(custom_location=str(custom_loc)) - assert_msg = f"The custom reader should be an instance of Reader. But it was '{type(custom_reader)}'." - assert isinstance(custom_reader, Reader), assert_msg + reader: Reader = get_reader(custom_location=str(custom_loc)) + assert_msg = f"The custom reader should be an instance of Reader. But it was '{type(reader)}'." + assert isinstance(reader, Reader), assert_msg # Close the reader, so we can delete the directory. - custom_reader.close() + reader.close() def test_data_dir() -> None: @@ -49,16 +49,16 @@ def test_get_webhook_for_entry() -> None: Path.mkdir(Path(temp_dir), exist_ok=True) custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite") - custom_reader: Reader = get_reader(custom_location=str(custom_loc)) + reader: Reader = get_reader(custom_location=str(custom_loc)) # Add a feed to the database. - custom_reader.add_feed("https://www.reddit.com/r/movies.rss") - custom_reader.update_feed("https://www.reddit.com/r/movies.rss") + reader.add_feed("https://www.reddit.com/r/movies.rss") + reader.update_feed("https://www.reddit.com/r/movies.rss") # Add a webhook to the database. - custom_reader.set_tag("https://www.reddit.com/r/movies.rss", "webhook", "https://example.com") # pyright: ignore[reportArgumentType] - our_tag = custom_reader.get_tag("https://www.reddit.com/r/movies.rss", "webhook") # pyright: ignore[reportArgumentType] + reader.set_tag("https://www.reddit.com/r/movies.rss", "webhook", "https://example.com") # pyright: ignore[reportArgumentType] + our_tag = reader.get_tag("https://www.reddit.com/r/movies.rss", "webhook") # pyright: ignore[reportArgumentType] assert our_tag == "https://example.com", f"The tag should be 'https://example.com'. But it was '{our_tag}'." # Close the reader, so we can delete the directory. - custom_reader.close() + reader.close() diff --git a/tests/test_whitelist.py b/tests/test_whitelist.py index 462d652..6e911fe 100644 --- a/tests/test_whitelist.py +++ b/tests/test_whitelist.py @@ -37,7 +37,7 @@ def test_has_white_tags() -> None: reader.update_feeds() # Test feed without any whitelist tags - assert has_white_tags(custom_reader=get_reader(), feed=feed) is False, "Feed should not have any whitelist tags" + assert has_white_tags(reader=get_reader(), feed=feed) is False, "Feed should not have any whitelist tags" check_if_has_tag(reader, feed, "whitelist_title") check_if_has_tag(reader, feed, "whitelist_summary") @@ -56,9 +56,9 @@ def test_has_white_tags() -> None: def check_if_has_tag(reader: Reader, feed: Feed, whitelist_name: str) -> None: reader.set_tag(feed, whitelist_name, "a") # pyright: ignore[reportArgumentType] - assert has_white_tags(custom_reader=reader, feed=feed) is True, "Feed should have whitelist tags" + assert has_white_tags(reader=reader, feed=feed) is True, "Feed should have whitelist tags" reader.delete_tag(feed, whitelist_name) - assert has_white_tags(custom_reader=reader, feed=feed) is False, "Feed should not have any whitelist tags" + assert has_white_tags(reader=reader, feed=feed) is False, "Feed should not have any whitelist tags" def test_should_be_sent() -> None: