diff --git a/discord_rss_bot/custom_filters.py b/discord_rss_bot/custom_filters.py index 7d8fe83..fd9461c 100644 --- a/discord_rss_bot/custom_filters.py +++ b/discord_rss_bot/custom_filters.py @@ -8,15 +8,11 @@ from discord_rss_bot.filter.blacklist import entry_should_be_skipped from discord_rss_bot.filter.blacklist import feed_has_blacklist_tags from discord_rss_bot.filter.whitelist import has_white_tags from discord_rss_bot.filter.whitelist import should_be_sent -from discord_rss_bot.settings import get_reader if TYPE_CHECKING: from reader import Entry from reader import Reader -# Our reader -reader: Reader = get_reader() - @lru_cache def encode_url(url_to_quote: str) -> str: @@ -34,11 +30,12 @@ def encode_url(url_to_quote: str) -> str: return urllib.parse.quote(string=url_to_quote) if url_to_quote else "" -def entry_is_whitelisted(entry_to_check: Entry) -> bool: +def entry_is_whitelisted(entry_to_check: Entry, reader: Reader) -> bool: """Check if the entry is whitelisted. Args: entry_to_check: The feed to check. + reader: Custom Reader instance. Returns: bool: True if the feed is whitelisted, False otherwise. @@ -47,11 +44,12 @@ def entry_is_whitelisted(entry_to_check: Entry) -> bool: return bool(has_white_tags(reader, entry_to_check.feed) and should_be_sent(reader, entry_to_check)) -def entry_is_blacklisted(entry_to_check: Entry) -> bool: +def entry_is_blacklisted(entry_to_check: Entry, reader: Reader) -> bool: """Check if the entry is blacklisted. Args: entry_to_check: The feed to check. + reader: Custom Reader instance. Returns: bool: True if the feed is blacklisted, False otherwise. diff --git a/discord_rss_bot/custom_message.py b/discord_rss_bot/custom_message.py index b84b30f..1626e39 100644 --- a/discord_rss_bot/custom_message.py +++ b/discord_rss_bot/custom_message.py @@ -5,17 +5,18 @@ import json import logging import re from dataclasses import dataclass +from typing import TYPE_CHECKING from bs4 import BeautifulSoup from bs4 import Tag from markdownify import markdownify -from reader import Entry -from reader import Feed -from reader import Reader -from reader import TagNotFoundError from discord_rss_bot.is_url_valid import is_url_valid -from discord_rss_bot.settings import get_reader + +if TYPE_CHECKING: + from reader import Entry + from reader import Feed + from reader import Reader logger: logging.Logger = logging.getLogger(__name__) @@ -116,18 +117,18 @@ def format_entry_html_for_discord(text: str) -> str: return _restore_discord_timestamp_tags(formatted_text, replacements) -def replace_tags_in_text_message(entry: Entry) -> str: +def replace_tags_in_text_message(entry: Entry, reader: Reader) -> str: """Replace tags in custom_message. Args: entry: The entry to get the tags from. + reader: Custom Reader instance. Returns: Returns the custom_message with the tags replaced. """ feed: Feed = entry.feed - custom_reader: Reader = get_reader() - custom_message: str = get_custom_message(feed=feed, custom_reader=custom_reader) + custom_message: str = get_custom_message(feed=feed, reader=reader) content = "" if entry.content: @@ -229,18 +230,18 @@ def get_first_image(summary: str | None, content: str | None) -> str: return "" -def replace_tags_in_embed(feed: Feed, entry: Entry) -> CustomEmbed: +def replace_tags_in_embed(feed: Feed, entry: Entry, reader: Reader) -> CustomEmbed: """Replace tags in embed. Args: feed: The feed to get the tags from. entry: The entry to get the tags from. + reader: Custom Reader instance. Returns: Returns the embed with the tags replaced. """ - custom_reader: Reader = get_reader() - embed: CustomEmbed = get_embed(feed=feed, custom_reader=custom_reader) + embed: CustomEmbed = get_embed(feed=feed, reader=reader) content = "" if entry.content: @@ -331,31 +332,29 @@ def _replace_embed_tags(embed: CustomEmbed, template: str, replace_with: str) -> embed.footer_icon_url = try_to_replace(embed.footer_icon_url, template, replace_with) -def get_custom_message(custom_reader: Reader, feed: Feed) -> str: +def get_custom_message(reader: Reader, feed: Feed) -> str: """Get custom_message tag from feed. Args: - custom_reader: What Reader to use. + reader: What Reader to use. feed: The feed to get the tag from. Returns: Returns the contents from the custom_message tag. """ try: - custom_message: str = str(custom_reader.get_tag(feed, "custom_message")) - except TagNotFoundError: - custom_message = "" + custom_message: str = str(reader.get_tag(feed, "custom_message", "")) except ValueError: custom_message = "" return custom_message -def save_embed(custom_reader: Reader, feed: Feed, embed: CustomEmbed) -> None: +def save_embed(reader: Reader, feed: Feed, embed: CustomEmbed) -> None: """Set embed tag in feed. Args: - custom_reader: What Reader to use. + reader: What Reader to use. feed: The feed to set the tag in. embed: The embed to set. """ @@ -371,20 +370,20 @@ def save_embed(custom_reader: Reader, feed: Feed, embed: CustomEmbed) -> None: "footer_text": embed.footer_text, "footer_icon_url": embed.footer_icon_url, } - custom_reader.set_tag(feed, "embed", json.dumps(embed_dict)) # pyright: ignore[reportArgumentType] + reader.set_tag(feed, "embed", json.dumps(embed_dict)) # pyright: ignore[reportArgumentType] -def get_embed(custom_reader: Reader, feed: Feed) -> CustomEmbed: +def get_embed(reader: Reader, feed: Feed) -> CustomEmbed: """Get embed tag from feed. Args: - custom_reader: What Reader to use. + reader: What Reader to use. feed: The feed to get the tag from. Returns: Returns the contents from the embed tag. """ - embed = custom_reader.get_tag(feed, "embed", "") + embed = reader.get_tag(feed, "embed", "") if embed: if not isinstance(embed, str): diff --git a/discord_rss_bot/feeds.py b/discord_rss_bot/feeds.py index 8e6ca63..5510516 100644 --- a/discord_rss_bot/feeds.py +++ b/discord_rss_bot/feeds.py @@ -23,7 +23,6 @@ from reader import FeedNotFoundError from reader import Reader from reader import ReaderError from reader import StorageError -from reader import TagNotFoundError from discord_rss_bot.custom_message import CustomEmbed from discord_rss_bot.custom_message import get_custom_message @@ -37,7 +36,6 @@ from discord_rss_bot.hoyolab_api import extract_post_id_from_hoyolab_url from discord_rss_bot.hoyolab_api import fetch_hoyolab_post from discord_rss_bot.hoyolab_api import is_c3kay_feed from discord_rss_bot.is_url_valid import is_url_valid -from discord_rss_bot.missing_tags import add_missing_tags from discord_rss_bot.settings import default_custom_message from discord_rss_bot.settings import get_reader @@ -98,26 +96,23 @@ def extract_domain(url: str) -> str: # noqa: PLR0911 return "Other" -def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> str | None: # noqa: C901, PLR0912 +def send_entry_to_discord(entry: Entry, reader: Reader) -> str | None: # noqa: C901 """Send a single entry to Discord. Args: entry: The entry to send to Discord. - custom_reader: The reader to use. If None, the default reader will be used. + reader: The reader to use. Returns: str | None: The error message if there was an error, otherwise None. """ - # Get the default reader if we didn't get a custom one. - reader: Reader = get_reader() if custom_reader is None else custom_reader - # Get the webhook URL for the entry. webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook", "")) if not webhook_url: return "No webhook URL found." # If https://discord.com/quests/ is in the URL, send a separate message with the URL. - send_discord_quest_notification(entry, webhook_url) + send_discord_quest_notification(entry, webhook_url, reader=reader) # Check if this is a c3kay feed if is_c3kay_feed(entry.feed.url): @@ -128,7 +123,7 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id) if post_data: webhook = create_hoyolab_webhook(webhook_url, entry, post_data) - execute_webhook(webhook, entry) + execute_webhook(webhook, entry, reader=reader) return None logger.warning( "Failed to create Hoyolab webhook for feed %s, falling back to regular processing", @@ -142,17 +137,14 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> # Try to get the custom message for the feed. If the user has none, we will use the default message. # This has to be a string for some reason so don't change it to "not custom_message.get_custom_message()" if get_custom_message(reader, entry.feed) != "": # noqa: PLC1901 - webhook_message: str = replace_tags_in_text_message(entry=entry) + webhook_message: str = replace_tags_in_text_message(entry=entry, reader=reader) if not webhook_message: webhook_message = "No message found." # Create the webhook. try: - should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed")) - except TagNotFoundError: - logger.exception("No should_send_embed tag found for feed: %s", entry.feed.url) - should_send_embed = True + should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed", True)) except StorageError: logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url) should_send_embed = True @@ -162,15 +154,15 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> should_send_embed = False if should_send_embed: - webhook = create_embed_webhook(webhook_url, entry) + webhook = create_embed_webhook(webhook_url, entry, reader=reader) else: webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True) - execute_webhook(webhook, entry) + execute_webhook(webhook, entry, reader=reader) return None -def send_discord_quest_notification(entry: Entry, webhook_url: str) -> None: +def send_discord_quest_notification(entry: Entry, webhook_url: str, reader: Reader) -> None: """Send a separate message to Discord if the entry is a quest notification.""" quest_regex: re.Pattern[str] = re.compile(r"https://discord\.com/quests/\d+") @@ -182,7 +174,7 @@ def send_discord_quest_notification(entry: Entry, webhook_url: str) -> None: content=quest_url, rate_limit_retry=True, ) - execute_webhook(webhook, entry) + execute_webhook(webhook, entry, reader=reader) # Iterate through the content of the entry for content in entry.content: @@ -240,12 +232,17 @@ def set_title(custom_embed: CustomEmbed, discord_embed: DiscordEmbed) -> None: discord_embed.set_title(embed_title) if embed_title else None -def create_embed_webhook(webhook_url: str, entry: Entry) -> DiscordWebhook: # noqa: C901 +def create_embed_webhook( # noqa: C901 + webhook_url: str, + entry: Entry, + reader: Reader, +) -> DiscordWebhook: """Create a webhook with an embed. Args: webhook_url (str): The webhook URL. entry (Entry): The entry to send to Discord. + reader (Reader): The Reader instance to use for getting embed data. Returns: DiscordWebhook: The webhook with the embed. @@ -254,7 +251,7 @@ def create_embed_webhook(webhook_url: str, entry: Entry) -> DiscordWebhook: # n feed: Feed = entry.feed # Get the embed data from the database. - custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry) + custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry, reader=reader) discord_embed: DiscordEmbed = DiscordEmbed() @@ -316,13 +313,14 @@ def get_webhook_url(reader: Reader, entry: Entry) -> str: str: The webhook URL. """ try: - webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook")) - except TagNotFoundError: - logger.exception("No webhook URL found for feed: %s", entry.feed.url) - return "" + webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook", "")) except StorageError: logger.exception("Storage error getting webhook URL for feed: %s", entry.feed.url) return "" + + if not webhook_url: + logger.error("No webhook URL found for feed: %s", entry.feed.url) + return "" return webhook_url @@ -341,53 +339,53 @@ def set_entry_as_read(reader: Reader, entry: Entry) -> None: logger.exception("Error setting entry to read: %s", entry.id) -def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: # noqa: C901, PLR0912 +def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: # noqa: C901, PLR0912 """Send entries to Discord. If response was not ok, we will log the error and mark the entry as unread, so it will be sent again next time. Args: - custom_reader: If we should use a custom reader instead of the default one. + reader: If we should use a custom reader instead of the default one. feed: The feed to send to Discord. do_once: If we should only send one entry. This is used in the test. """ logger.info("Starting to send entries to Discord.") # Get the default reader if we didn't get a custom one. - reader: Reader = get_reader() if custom_reader is None else custom_reader + effective_reader: Reader = get_reader() if reader is None else reader # Check for new entries for every feed. - reader.update_feeds( + effective_reader.update_feeds( scheduled=True, workers=os.cpu_count() or 1, ) # Loop through the unread entries. - entries: Iterable[Entry] = reader.get_entries(feed=feed, read=False) + entries: Iterable[Entry] = effective_reader.get_entries(feed=feed, read=False) for entry in entries: - set_entry_as_read(reader, entry) + set_entry_as_read(effective_reader, entry) if entry.added < datetime.datetime.now(tz=entry.added.tzinfo) - datetime.timedelta(days=1): logger.info("Entry is older than 24 hours: %s from %s", entry.id, entry.feed.url) continue - webhook_url: str = get_webhook_url(reader, entry) + webhook_url: str = get_webhook_url(effective_reader, entry) if not webhook_url: logger.info("No webhook URL found for feed: %s", entry.feed.url) continue - should_send_embed: bool = should_send_embed_check(reader, entry) + should_send_embed: bool = should_send_embed_check(effective_reader, entry) # Youtube feeds only need to send the link if is_youtube_feed(entry.feed.url): should_send_embed = False if should_send_embed: - webhook = create_embed_webhook(webhook_url, entry) + webhook = create_embed_webhook(webhook_url, entry, reader=effective_reader) else: # If the user has set the custom message to an empty string, we will use the default message, otherwise we # will use the custom message. - if get_custom_message(reader, entry.feed) != "": # noqa: PLC1901 - webhook_message = replace_tags_in_text_message(entry) + if get_custom_message(effective_reader, entry.feed) != "": # noqa: PLC1901 + webhook_message = replace_tags_in_text_message(entry, reader=effective_reader) else: webhook_message: str = str(default_custom_message) @@ -397,12 +395,12 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True) # Check if the entry is blacklisted, and if it is, we will skip it. - if entry_should_be_skipped(reader, entry): + if entry_should_be_skipped(effective_reader, entry): logger.info("Entry was blacklisted: %s", entry.id) continue # Check if the feed has a whitelist, and if it does, check if the entry is whitelisted. - if has_white_tags(reader, entry.feed) and not should_be_sent(reader, entry): + if has_white_tags(effective_reader, entry.feed) and not should_be_sent(effective_reader, entry): logger.info("Entry was not whitelisted: %s", entry.id) continue @@ -415,7 +413,7 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id) if post_data: webhook = create_hoyolab_webhook(webhook_url, entry, post_data) - execute_webhook(webhook, entry) + execute_webhook(webhook, entry, reader=effective_reader) return logger.warning( "Failed to create Hoyolab webhook for feed %s, falling back to regular processing", @@ -425,7 +423,7 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url) # Send the entry to Discord as it is not blacklisted or feed has a whitelist. - execute_webhook(webhook, entry) + execute_webhook(webhook, entry, reader=effective_reader) # If we only want to send one entry, we will break the loop. This is used when testing this function. if do_once: @@ -433,16 +431,15 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non break -def execute_webhook(webhook: DiscordWebhook, entry: Entry) -> None: +def execute_webhook(webhook: DiscordWebhook, entry: Entry, reader: Reader) -> None: """Execute the webhook. Args: webhook (DiscordWebhook): The webhook to execute. entry (Entry): The entry to send to Discord. + reader (Reader): The Reader instance to use for checking feed status. """ - reader: Reader = get_reader() - # If the feed has been paused or deleted, we will not send the entry to Discord. entry_feed: Feed = entry.feed if entry_feed.updates_enabled is False: @@ -493,10 +490,7 @@ def should_send_embed_check(reader: Reader, entry: Entry) -> bool: return False try: - should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed")) - except TagNotFoundError: - logger.exception("No should_send_embed tag found for feed: %s", entry.feed.url) - should_send_embed = True + should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed", True)) except ReaderError: logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url) should_send_embed = True @@ -551,9 +545,7 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None: reader.add_feed(clean_feed_url) except FeedExistsError: # Add the webhook to an already added feed if it doesn't have a webhook instead of trying to create a new. - try: - reader.get_tag(clean_feed_url, "webhook") - except TagNotFoundError: + if not reader.get_tag(clean_feed_url, "webhook", ""): reader.set_tag(clean_feed_url, "webhook", webhook_url) # pyright: ignore[reportArgumentType] except ReaderError as e: raise HTTPException(status_code=404, detail=f"Error adding feed: {e}") from e @@ -580,5 +572,3 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None: # Update the full-text search index so our new feed is searchable. reader.update_search() - - add_missing_tags(reader) diff --git a/discord_rss_bot/filter/blacklist.py b/discord_rss_bot/filter/blacklist.py index 95c0716..8260993 100644 --- a/discord_rss_bot/filter/blacklist.py +++ b/discord_rss_bot/filter/blacklist.py @@ -11,7 +11,7 @@ if TYPE_CHECKING: from reader import Reader -def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool: +def feed_has_blacklist_tags(reader: Reader, feed: Feed) -> bool: """Return True if the feed has blacklist tags. The following tags are checked: @@ -25,21 +25,21 @@ def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool: - regex_blacklist_title Args: - custom_reader: The reader. + reader: The reader. feed: The feed to check. Returns: bool: If the feed has any of the tags. """ - blacklist_author: str = str(custom_reader.get_tag(feed, "blacklist_author", "")).strip() - blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", "")).strip() - blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", "")).strip() - blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", "")).strip() + blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", "")).strip() + blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", "")).strip() + blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", "")).strip() + blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", "")).strip() - regex_blacklist_author: str = str(custom_reader.get_tag(feed, "regex_blacklist_author", "")).strip() - regex_blacklist_content: str = str(custom_reader.get_tag(feed, "regex_blacklist_content", "")).strip() - regex_blacklist_summary: str = str(custom_reader.get_tag(feed, "regex_blacklist_summary", "")).strip() - regex_blacklist_title: str = str(custom_reader.get_tag(feed, "regex_blacklist_title", "")).strip() + regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", "")).strip() + regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", "")).strip() + regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", "")).strip() + regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", "")).strip() return bool( blacklist_title @@ -53,11 +53,11 @@ def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool: ) -def entry_should_be_skipped(custom_reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 +def entry_should_be_skipped(reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 """Return True if the entry is in the blacklist. Args: - custom_reader: The reader. + reader: The reader. entry: The entry to check. Returns: @@ -65,15 +65,15 @@ def entry_should_be_skipped(custom_reader: Reader, entry: Entry) -> bool: # noq """ feed = entry.feed - blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", "")).strip() - blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", "")).strip() - blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", "")).strip() - blacklist_author: str = str(custom_reader.get_tag(feed, "blacklist_author", "")).strip() + blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", "")).strip() + blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", "")).strip() + blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", "")).strip() + blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", "")).strip() - regex_blacklist_title: str = str(custom_reader.get_tag(feed, "regex_blacklist_title", "")).strip() - regex_blacklist_summary: str = str(custom_reader.get_tag(feed, "regex_blacklist_summary", "")).strip() - regex_blacklist_content: str = str(custom_reader.get_tag(feed, "regex_blacklist_content", "")).strip() - regex_blacklist_author: str = str(custom_reader.get_tag(feed, "regex_blacklist_author", "")).strip() + regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", "")).strip() + regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", "")).strip() + regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", "")).strip() + regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", "")).strip() # TODO(TheLovinator): Also add support for entry_text and more. # Check regular blacklist diff --git a/discord_rss_bot/filter/whitelist.py b/discord_rss_bot/filter/whitelist.py index 9c198c4..bb5303d 100644 --- a/discord_rss_bot/filter/whitelist.py +++ b/discord_rss_bot/filter/whitelist.py @@ -11,7 +11,7 @@ if TYPE_CHECKING: from reader import Reader -def has_white_tags(custom_reader: Reader, feed: Feed) -> bool: +def has_white_tags(reader: Reader, feed: Feed) -> bool: """Return True if the feed has whitelist tags. The following tags are checked: @@ -25,21 +25,21 @@ def has_white_tags(custom_reader: Reader, feed: Feed) -> bool: - whitelist_title Args: - custom_reader: The reader. + reader: The reader. feed: The feed to check. Returns: bool: If the feed has any of the tags. """ - whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")).strip() - whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")).strip() - whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")).strip() - whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", "")).strip() + whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", "")).strip() + whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", "")).strip() + whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", "")).strip() + whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", "")).strip() - regex_whitelist_title: str = str(custom_reader.get_tag(feed, "regex_whitelist_title", "")).strip() - regex_whitelist_summary: str = str(custom_reader.get_tag(feed, "regex_whitelist_summary", "")).strip() - regex_whitelist_content: str = str(custom_reader.get_tag(feed, "regex_whitelist_content", "")).strip() - regex_whitelist_author: str = str(custom_reader.get_tag(feed, "regex_whitelist_author", "")).strip() + regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", "")).strip() + regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", "")).strip() + regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", "")).strip() + regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", "")).strip() return bool( whitelist_title @@ -53,11 +53,11 @@ def has_white_tags(custom_reader: Reader, feed: Feed) -> bool: ) -def should_be_sent(custom_reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 +def should_be_sent(reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 """Return True if the entry is in the whitelist. Args: - custom_reader: The reader. + reader: The reader. entry: The entry to check. Returns: @@ -65,16 +65,16 @@ def should_be_sent(custom_reader: Reader, entry: Entry) -> bool: # noqa: PLR091 """ feed: Feed = entry.feed # Regular whitelist tags - whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")).strip() - whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")).strip() - whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")).strip() - whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", "")).strip() + whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", "")).strip() + whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", "")).strip() + whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", "")).strip() + whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", "")).strip() # Regex whitelist tags - regex_whitelist_title: str = str(custom_reader.get_tag(feed, "regex_whitelist_title", "")).strip() - regex_whitelist_summary: str = str(custom_reader.get_tag(feed, "regex_whitelist_summary", "")).strip() - regex_whitelist_content: str = str(custom_reader.get_tag(feed, "regex_whitelist_content", "")).strip() - regex_whitelist_author: str = str(custom_reader.get_tag(feed, "regex_whitelist_author", "")).strip() + regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", "")).strip() + regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", "")).strip() + regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", "")).strip() + regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", "")).strip() # Check regular whitelist if entry.title and whitelist_title and is_word_in_text(whitelist_title, entry.title): diff --git a/discord_rss_bot/git_backup.py b/discord_rss_bot/git_backup.py index b226489..49528ec 100644 --- a/discord_rss_bot/git_backup.py +++ b/discord_rss_bot/git_backup.py @@ -30,8 +30,6 @@ from pathlib import Path from typing import TYPE_CHECKING from typing import Any -from reader import TagNotFoundError - if TYPE_CHECKING: from reader import Reader @@ -176,21 +174,15 @@ def export_state(reader: Reader, backup_path: Path) -> None: logger.exception("Failed to read tag '%s' for feed '%s' during state export", tag, feed.url) feeds_state.append(feed_data) - try: - webhooks: list[str | int | float | bool | dict[str, Any] | list[Any] | None] = list( - reader.get_tag((), "webhooks", []), - ) - except TagNotFoundError: - webhooks = [] + webhooks: list[str | int | float | bool | dict[str, Any] | list[Any] | None] = list( + reader.get_tag((), "webhooks", []), + ) # Export global update interval if set global_update_interval: dict[str, Any] | None = None - try: - global_update_config = reader.get_tag((), ".reader.update", None) - if isinstance(global_update_config, dict): - global_update_interval = global_update_config - except TagNotFoundError: - pass + global_update_config = reader.get_tag((), ".reader.update", None) + if isinstance(global_update_config, dict): + global_update_interval = global_update_config state: dict = {"feeds": feeds_state, "webhooks": webhooks} if global_update_interval is not None: diff --git a/discord_rss_bot/main.py b/discord_rss_bot/main.py index 7c5e7ac..ca3894c 100644 --- a/discord_rss_bot/main.py +++ b/discord_rss_bot/main.py @@ -19,6 +19,7 @@ import httpx import sentry_sdk import uvicorn from apscheduler.schedulers.asyncio import AsyncIOScheduler +from fastapi import Depends from fastapi import FastAPI from fastapi import Form from fastapi import HTTPException @@ -53,7 +54,7 @@ from discord_rss_bot.feeds import send_entry_to_discord from discord_rss_bot.feeds import send_to_discord from discord_rss_bot.git_backup import commit_state_change from discord_rss_bot.git_backup import get_backup_path -from discord_rss_bot.missing_tags import add_missing_tags +from discord_rss_bot.is_url_valid import is_url_valid from discord_rss_bot.search import create_search_context from discord_rss_bot.settings import get_reader @@ -100,7 +101,16 @@ LOGGING_CONFIG: dict[str, Any] = { logging.config.dictConfig(LOGGING_CONFIG) logger: logging.Logger = logging.getLogger(__name__) -reader: Reader = get_reader() + + +def get_reader_dependency() -> Reader: + """Provide the app Reader instance as a FastAPI dependency. + + Returns: + Reader: The shared Reader instance. + """ + return get_reader() + # Time constants for relative time formatting SECONDS_PER_MINUTE = 60 @@ -146,7 +156,7 @@ def relative_time(dt: datetime | None) -> str: @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None]: """Lifespan function for the FastAPI app.""" - add_missing_tags(reader) + reader: Reader = get_reader() scheduler: AsyncIOScheduler = AsyncIOScheduler(timezone=UTC) scheduler.add_job( func=send_to_discord, @@ -170,8 +180,6 @@ templates: Jinja2Templates = Jinja2Templates(directory="discord_rss_bot/template # Add the filters to the Jinja2 environment so they can be used in html templates. templates.env.filters["encode_url"] = lambda url: urllib.parse.quote(url) if url else "" -templates.env.filters["entry_is_whitelisted"] = entry_is_whitelisted -templates.env.filters["entry_is_blacklisted"] = entry_is_blacklisted templates.env.filters["discord_markdown"] = markdownify templates.env.filters["relative_time"] = relative_time templates.env.globals["get_backup_path"] = get_backup_path @@ -181,12 +189,14 @@ templates.env.globals["get_backup_path"] = get_backup_path async def post_add_webhook( webhook_name: Annotated[str, Form()], webhook_url: Annotated[str, Form()], + reader: Annotated[Reader, Depends(get_reader_dependency)], ) -> RedirectResponse: """Add a feed to the database. Args: webhook_name: The name of the webhook. webhook_url: The url of the webhook. + reader: The Reader instance. Returns: RedirectResponse: Redirect to the index page. @@ -219,11 +229,15 @@ async def post_add_webhook( @app.post("/delete_webhook") -async def post_delete_webhook(webhook_url: Annotated[str, Form()]) -> RedirectResponse: +async def post_delete_webhook( + webhook_url: Annotated[str, Form()], + reader: Annotated[Reader, Depends(get_reader_dependency)], +) -> RedirectResponse: """Delete a webhook from the database. Args: webhook_url: The url of the webhook. + reader: The Reader instance. Returns: RedirectResponse: Redirect to the index page. @@ -266,12 +280,14 @@ async def post_delete_webhook(webhook_url: Annotated[str, Form()]) -> RedirectRe async def post_create_feed( feed_url: Annotated[str, Form()], webhook_dropdown: Annotated[str, Form()], + reader: Annotated[Reader, Depends(get_reader_dependency)], ) -> RedirectResponse: """Add a feed to the database. Args: feed_url: The feed to add. webhook_dropdown: The webhook to use. + reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -283,11 +299,15 @@ async def post_create_feed( @app.post("/pause") -async def post_pause_feed(feed_url: Annotated[str, Form()]) -> RedirectResponse: +async def post_pause_feed( + feed_url: Annotated[str, Form()], + reader: Annotated[Reader, Depends(get_reader_dependency)], +) -> RedirectResponse: """Pause a feed. Args: feed_url: The feed to pause. + reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -298,11 +318,15 @@ async def post_pause_feed(feed_url: Annotated[str, Form()]) -> RedirectResponse: @app.post("/unpause") -async def post_unpause_feed(feed_url: Annotated[str, Form()]) -> RedirectResponse: +async def post_unpause_feed( + feed_url: Annotated[str, Form()], + reader: Annotated[Reader, Depends(get_reader_dependency)], +) -> RedirectResponse: """Unpause a feed. Args: feed_url: The Feed to unpause. + reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -314,6 +338,7 @@ async def post_unpause_feed(feed_url: Annotated[str, Form()]) -> RedirectRespons @app.post("/whitelist") async def post_set_whitelist( + reader: Annotated[Reader, Depends(get_reader_dependency)], whitelist_title: Annotated[str, Form()] = "", whitelist_summary: Annotated[str, Form()] = "", whitelist_content: Annotated[str, Form()] = "", @@ -336,6 +361,7 @@ async def post_set_whitelist( regex_whitelist_content: Whitelisted regex for when checking the content. regex_whitelist_author: Whitelisted regex for when checking the author. feed_url: The feed we should set the whitelist for. + reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -356,12 +382,17 @@ async def post_set_whitelist( @app.get("/whitelist", response_class=HTMLResponse) -async def get_whitelist(feed_url: str, request: Request): +async def get_whitelist( + feed_url: str, + request: Request, + reader: Annotated[Reader, Depends(get_reader_dependency)], +): """Get the whitelist. Args: feed_url: What feed we should get the whitelist for. request: The request object. + reader: The Reader instance. Returns: HTMLResponse: The whitelist page. @@ -395,6 +426,7 @@ async def get_whitelist(feed_url: str, request: Request): @app.post("/blacklist") async def post_set_blacklist( + reader: Annotated[Reader, Depends(get_reader_dependency)], blacklist_title: Annotated[str, Form()] = "", blacklist_summary: Annotated[str, Form()] = "", blacklist_content: Annotated[str, Form()] = "", @@ -420,6 +452,7 @@ async def post_set_blacklist( regex_blacklist_content: Blacklisted regex for when checking the content. regex_blacklist_author: Blacklisted regex for when checking the author. feed_url: What feed we should set the blacklist for. + reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -438,12 +471,17 @@ async def post_set_blacklist( @app.get("/blacklist", response_class=HTMLResponse) -async def get_blacklist(feed_url: str, request: Request): +async def get_blacklist( + feed_url: str, + request: Request, + reader: Annotated[Reader, Depends(get_reader_dependency)], +): """Get the blacklist. Args: feed_url: What feed we should get the blacklist for. request: The request object. + reader: The Reader instance. Returns: HTMLResponse: The blacklist page. @@ -477,6 +515,7 @@ async def get_blacklist(feed_url: str, request: Request): @app.post("/custom") async def post_set_custom( feed_url: Annotated[str, Form()], + reader: Annotated[Reader, Depends(get_reader_dependency)], custom_message: Annotated[str, Form()] = "", ) -> RedirectResponse: """Set the custom message, this is used when sending the message. @@ -484,6 +523,7 @@ async def post_set_custom( Args: custom_message: The custom message. feed_url: The feed we should set the custom message for. + reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -505,12 +545,17 @@ async def post_set_custom( @app.get("/custom", response_class=HTMLResponse) -async def get_custom(feed_url: str, request: Request): +async def get_custom( + feed_url: str, + request: Request, + reader: Annotated[Reader, Depends(get_reader_dependency)], +): """Get the custom message. This is used when sending the message to Discord. Args: feed_url: What feed we should get the custom message for. request: The request object. + reader: The Reader instance. Returns: HTMLResponse: The custom message page. @@ -531,12 +576,17 @@ async def get_custom(feed_url: str, request: Request): @app.get("/embed", response_class=HTMLResponse) -async def get_embed_page(feed_url: str, request: Request): +async def get_embed_page( + feed_url: str, + request: Request, + reader: Annotated[Reader, Depends(get_reader_dependency)], +): """Get the custom message. This is used when sending the message to Discord. Args: feed_url: What feed we should get the custom message for. request: The request object. + reader: The Reader instance. Returns: HTMLResponse: The embed page. @@ -572,6 +622,7 @@ async def get_embed_page(feed_url: str, request: Request): @app.post("/embed", response_class=HTMLResponse) async def post_embed( feed_url: Annotated[str, Form()], + reader: Annotated[Reader, Depends(get_reader_dependency)], title: Annotated[str, Form()] = "", description: Annotated[str, Form()] = "", color: Annotated[str, Form()] = "", @@ -597,7 +648,7 @@ async def post_embed( author_icon_url: The author icon url of the embed. footer_text: The footer text of the embed. footer_icon_url: The footer icon url of the embed. - + reader: The Reader instance. Returns: RedirectResponse: Redirect to the embed page. @@ -625,11 +676,15 @@ async def post_embed( @app.post("/use_embed") -async def post_use_embed(feed_url: Annotated[str, Form()]) -> RedirectResponse: +async def post_use_embed( + feed_url: Annotated[str, Form()], + reader: Annotated[Reader, Depends(get_reader_dependency)], +) -> RedirectResponse: """Use embed instead of text. Args: feed_url: The feed to change. + reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -641,11 +696,15 @@ async def post_use_embed(feed_url: Annotated[str, Form()]) -> RedirectResponse: @app.post("/use_text") -async def post_use_text(feed_url: Annotated[str, Form()]) -> RedirectResponse: +async def post_use_text( + feed_url: Annotated[str, Form()], + reader: Annotated[Reader, Depends(get_reader_dependency)], +) -> RedirectResponse: """Use text instead of embed. Args: feed_url: The feed to change. + reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -659,6 +718,7 @@ async def post_use_text(feed_url: Annotated[str, Form()]) -> RedirectResponse: @app.post("/set_update_interval") async def post_set_update_interval( feed_url: Annotated[str, Form()], + reader: Annotated[Reader, Depends(get_reader_dependency)], interval_minutes: Annotated[int | None, Form()] = None, redirect_to: Annotated[str, Form()] = "", ) -> RedirectResponse: @@ -668,6 +728,7 @@ async def post_set_update_interval( feed_url: The feed to change. interval_minutes: The update interval in minutes (None to reset to global default). redirect_to: Optional redirect URL (defaults to feed page). + reader: The Reader instance. Returns: RedirectResponse: Redirect to the specified page or feed page. @@ -703,12 +764,14 @@ async def post_set_update_interval( async def post_change_feed_url( old_feed_url: Annotated[str, Form()], new_feed_url: Annotated[str, Form()], + reader: Annotated[Reader, Depends(get_reader_dependency)], ) -> RedirectResponse: """Change the URL for an existing feed. Args: old_feed_url: Current feed URL. new_feed_url: New feed URL to change to. + reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page for the resulting URL. @@ -734,6 +797,19 @@ async def post_change_feed_url( except ReaderError as e: raise HTTPException(status_code=400, detail=f"Failed to change feed URL: {e}") from e + # Update the feed with the new URL so we can discover what entries it returns. + # Then mark all unread entries as read so the scheduler doesn't resend them. + try: + reader.update_feed(clean_new_feed_url) + except Exception: + logger.exception("Failed to update feed after URL change: %s", clean_new_feed_url) + + for entry in reader.get_entries(feed=clean_new_feed_url, read=False): + try: + reader.set_entry_read(entry, True) + except Exception: + logger.exception("Failed to mark entry as read after URL change: %s", entry.id) + commit_state_change(reader, f"Change feed URL from {clean_old_feed_url} to {clean_new_feed_url}") return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_new_feed_url)}", status_code=303) @@ -741,6 +817,7 @@ async def post_change_feed_url( @app.post("/reset_update_interval") async def post_reset_update_interval( feed_url: Annotated[str, Form()], + reader: Annotated[Reader, Depends(get_reader_dependency)], redirect_to: Annotated[str, Form()] = "", ) -> RedirectResponse: """Reset the update interval for a feed to use the global default. @@ -748,6 +825,7 @@ async def post_reset_update_interval( Args: feed_url: The feed to change. redirect_to: Optional redirect URL (defaults to feed page). + reader: The Reader instance. Returns: RedirectResponse: Redirect to the specified page or feed page. @@ -774,11 +852,15 @@ async def post_reset_update_interval( @app.post("/set_global_update_interval") -async def post_set_global_update_interval(interval_minutes: Annotated[int, Form()]) -> RedirectResponse: +async def post_set_global_update_interval( + interval_minutes: Annotated[int, Form()], + reader: Annotated[Reader, Depends(get_reader_dependency)], +) -> RedirectResponse: """Set the global default update interval. Args: interval_minutes: The update interval in minutes. + reader: The Reader instance. Returns: RedirectResponse: Redirect to the settings page. @@ -792,11 +874,15 @@ async def post_set_global_update_interval(interval_minutes: Annotated[int, Form( @app.get("/add", response_class=HTMLResponse) -def get_add(request: Request): +def get_add( + request: Request, + reader: Annotated[Reader, Depends(get_reader_dependency)], +): """Page for adding a new feed. Args: request: The request object. + reader: The Reader instance. Returns: HTMLResponse: The add feed page. @@ -809,13 +895,19 @@ def get_add(request: Request): @app.get("/feed", response_class=HTMLResponse) -async def get_feed(feed_url: str, request: Request, starting_after: str = ""): # noqa: C901, PLR0912, PLR0914, PLR0915 +async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 + feed_url: str, + request: Request, + reader: Annotated[Reader, Depends(get_reader_dependency)], + starting_after: str = "", +): """Get a feed by URL. Args: feed_url: The feed to add. request: The request object. starting_after: The entry to start after. Used for pagination. + reader: The Reader instance. Returns: HTMLResponse: The feed page. @@ -845,28 +937,22 @@ async def get_feed(feed_url: str, request: Request, starting_after: str = ""): except EntryNotFoundError as e: current_entries = list(reader.get_entries(feed=clean_feed_url)) msg: str = f"{e}\n\n{[entry.id for entry in current_entries]}" - html: str = create_html_for_feed(current_entries, clean_feed_url) + html: str = create_html_for_feed(reader=reader, entries=current_entries, current_feed_url=clean_feed_url) # Get feed and global intervals for error case too feed_interval: int | None = None - try: - feed_update_config = reader.get_tag(feed, ".reader.update") - if isinstance(feed_update_config, dict) and "interval" in feed_update_config: - interval_value = feed_update_config["interval"] - if isinstance(interval_value, int): - feed_interval = interval_value - except TagNotFoundError: - pass + feed_update_config = reader.get_tag(feed, ".reader.update", None) + if isinstance(feed_update_config, dict) and "interval" in feed_update_config: + interval_value = feed_update_config["interval"] + if isinstance(interval_value, int): + feed_interval = interval_value global_interval: int = 60 - try: - global_update_config = reader.get_tag((), ".reader.update") - if isinstance(global_update_config, dict) and "interval" in global_update_config: - interval_value = global_update_config["interval"] - if isinstance(interval_value, int): - global_interval = interval_value - except TagNotFoundError: - pass + global_update_config = reader.get_tag((), ".reader.update", None) + if isinstance(global_update_config, dict) and "interval" in global_update_config: + interval_value = global_update_config["interval"] + if isinstance(interval_value, int): + global_interval = interval_value context = { "request": request, @@ -901,36 +987,25 @@ async def get_feed(feed_url: str, request: Request, starting_after: str = ""): last_entry = entries[-1] # Create the html for the entries. - html: str = create_html_for_feed(entries, clean_feed_url) + html: str = create_html_for_feed(reader=reader, entries=entries, current_feed_url=clean_feed_url) - try: - should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed")) - except TagNotFoundError: - add_missing_tags(reader) - should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed")) + should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed", True)) # Get the update interval for this feed feed_interval: int | None = None - try: - feed_update_config = reader.get_tag(feed, ".reader.update") - if isinstance(feed_update_config, dict) and "interval" in feed_update_config: - interval_value = feed_update_config["interval"] - if isinstance(interval_value, int): - feed_interval = interval_value - except TagNotFoundError: - # No custom interval set for this feed, will use global default - pass + feed_update_config = reader.get_tag(feed, ".reader.update", None) + if isinstance(feed_update_config, dict) and "interval" in feed_update_config: + interval_value = feed_update_config["interval"] + if isinstance(interval_value, int): + feed_interval = interval_value # Get the global default update interval global_interval: int = 60 # Default to 60 minutes if not set - try: - global_update_config = reader.get_tag((), ".reader.update") - if isinstance(global_update_config, dict) and "interval" in global_update_config: - interval_value = global_update_config["interval"] - if isinstance(interval_value, int): - global_interval = interval_value - except TagNotFoundError: - pass + global_update_config = reader.get_tag((), ".reader.update", None) + if isinstance(global_update_config, dict) and "interval" in global_update_config: + interval_value = global_update_config["interval"] + if isinstance(interval_value, int): + global_interval = interval_value context = { "request": request, @@ -948,10 +1023,15 @@ async def get_feed(feed_url: str, request: Request, starting_after: str = ""): return templates.TemplateResponse(request=request, name="feed.html", context=context) -def create_html_for_feed(entries: Iterable[Entry], current_feed_url: str = "") -> str: # noqa: C901, PLR0914 +def create_html_for_feed( # noqa: C901, PLR0914 + reader: Reader, + entries: Iterable[Entry], + current_feed_url: str = "", +) -> str: """Create HTML for the search results. Args: + reader: The Reader instance to use. entries: The entries to create HTML for. current_feed_url: The feed URL currently being viewed in /feed. @@ -969,17 +1049,19 @@ def create_html_for_feed(entries: Iterable[Entry], current_feed_url: str = "") - first_image = get_first_image(summary, content) - text: str = replace_tags_in_text_message(entry) or "
No content available.
" + text: str = replace_tags_in_text_message(entry, reader=reader) or ( + "
No content available.
" + ) published = "" if entry.published: published: str = entry.published.strftime("%Y-%m-%d %H:%M:%S") blacklisted: str = "" - if entry_is_blacklisted(entry): + if entry_is_blacklisted(entry, reader=reader): blacklisted = "Blacklisted" whitelisted: str = "" - if entry_is_whitelisted(entry): + if entry_is_whitelisted(entry, reader=reader): whitelisted = "Whitelisted" source_feed_url: str = getattr(entry, "original_feed_url", None) or entry.feed.url @@ -999,7 +1081,11 @@ def create_html_for_feed(entries: Iterable[Entry], current_feed_url: str = "") - ) entry_id: str = urllib.parse.quote(entry.id) - to_discord_html: str = f"Send to Discord" + encoded_source_feed_url: str = urllib.parse.quote(source_feed_url) + to_discord_html: str = ( + f"" + "Send to Discord" + ) # Check if this is a YouTube feed entry and the entry has a link is_youtube_feed = "youtube.com/feeds/videos.xml" in entry.feed.url @@ -1070,6 +1156,7 @@ def get_data_from_hook_url(hook_name: str, hook_url: str) -> WebhookInfo: hook_name (str): The webhook name. hook_url (str): The webhook URL. + Returns: WebhookInfo: The webhook username, avatar, guild id, etc. """ @@ -1091,39 +1178,37 @@ def get_data_from_hook_url(hook_name: str, hook_url: str) -> WebhookInfo: @app.get("/settings", response_class=HTMLResponse) -async def get_settings(request: Request): +async def get_settings( + request: Request, + reader: Annotated[Reader, Depends(get_reader_dependency)], +): """Settings page. Args: request: The request object. + reader: The Reader instance. Returns: HTMLResponse: The settings page. """ # Get the global default update interval global_interval: int = 60 # Default to 60 minutes if not set - try: - global_update_config = reader.get_tag((), ".reader.update") - if isinstance(global_update_config, dict) and "interval" in global_update_config: - interval_value = global_update_config["interval"] - if isinstance(interval_value, int): - global_interval = interval_value - except TagNotFoundError: - pass + global_update_config = reader.get_tag((), ".reader.update", None) + if isinstance(global_update_config, dict) and "interval" in global_update_config: + interval_value = global_update_config["interval"] + if isinstance(interval_value, int): + global_interval = interval_value # Get all feeds with their intervals feeds: Iterable[Feed] = reader.get_feeds() feed_intervals = [] for feed in feeds: feed_interval: int | None = None - try: - feed_update_config = reader.get_tag(feed, ".reader.update") - if isinstance(feed_update_config, dict) and "interval" in feed_update_config: - interval_value = feed_update_config["interval"] - if isinstance(interval_value, int): - feed_interval = interval_value - except TagNotFoundError: - pass + feed_update_config = reader.get_tag(feed, ".reader.update", None) + if isinstance(feed_update_config, dict) and "interval" in feed_update_config: + interval_value = feed_update_config["interval"] + if isinstance(interval_value, int): + feed_interval = interval_value feed_intervals.append({ "feed": feed, @@ -1141,11 +1226,15 @@ async def get_settings(request: Request): @app.get("/webhooks", response_class=HTMLResponse) -async def get_webhooks(request: Request): +async def get_webhooks( + request: Request, + reader: Annotated[Reader, Depends(get_reader_dependency)], +): """Page for adding a new webhook. Args: request: The request object. + reader: The Reader instance. Returns: HTMLResponse: The add webhook page. @@ -1166,54 +1255,65 @@ async def get_webhooks(request: Request): @app.get("/", response_class=HTMLResponse) -def get_index(request: Request, message: str = ""): +def get_index( + request: Request, + reader: Annotated[Reader, Depends(get_reader_dependency)], + message: str = "", +): """This is the root of the website. Args: request: The request object. message: Optional message to display to the user. + reader: The Reader instance. Returns: HTMLResponse: The index page. """ - return templates.TemplateResponse(request=request, name="index.html", context=make_context_index(request, message)) + return templates.TemplateResponse( + request=request, + name="index.html", + context=make_context_index(request, message, reader), + ) -def make_context_index(request: Request, message: str = ""): +def make_context_index(request: Request, message: str = "", reader: Reader | None = None): """Create the needed context for the index page. Args: request: The request object. message: Optional message to display to the user. + reader: The Reader instance. Returns: dict: The context for the index page. """ - hooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(reader.get_tag((), "webhooks", []))) + effective_reader: Reader = reader or get_reader_dependency() + hooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(effective_reader.get_tag((), "webhooks", []))) - feed_list = [] - broken_feeds = [] - feeds_without_attached_webhook = [] + feed_list: list[dict[str, JSONType | Feed | str]] = [] + broken_feeds: list[Feed] = [] + feeds_without_attached_webhook: list[Feed] = [] # Get all feeds and organize them - feeds: Iterable[Feed] = reader.get_feeds() + feeds: Iterable[Feed] = effective_reader.get_feeds() for feed in feeds: - try: - webhook = reader.get_tag(feed.url, "webhook") - feed_list.append({"feed": feed, "webhook": webhook, "domain": extract_domain(feed.url)}) - except TagNotFoundError: + webhook: str = str(effective_reader.get_tag(feed.url, "webhook", "")) + if not webhook: broken_feeds.append(feed) continue - webhook_list = [hook["url"] for hook in hooks] + feed_list.append({"feed": feed, "webhook": webhook, "domain": extract_domain(feed.url)}) + + webhook_list: list[str] = [hook["url"] for hook in hooks] if webhook not in webhook_list: feeds_without_attached_webhook.append(feed) return { "request": request, "feeds": feed_list, - "feed_count": reader.get_feed_counts(), - "entry_count": reader.get_entry_counts(), + "feed_count": effective_reader.get_feed_counts(), + "entry_count": effective_reader.get_entry_counts(), "webhooks": hooks, "broken_feeds": broken_feeds, "feeds_without_attached_webhook": feeds_without_attached_webhook, @@ -1222,12 +1322,15 @@ def make_context_index(request: Request, message: str = ""): @app.post("/remove", response_class=HTMLResponse) -async def remove_feed(feed_url: Annotated[str, Form()]): +async def remove_feed( + feed_url: Annotated[str, Form()], + reader: Annotated[Reader, Depends(get_reader_dependency)], +): """Get a feed by URL. Args: feed_url: The feed to add. - + reader: The Reader instance. Returns: RedirectResponse: Redirect to the index page. @@ -1246,13 +1349,17 @@ async def remove_feed(feed_url: Annotated[str, Form()]): @app.get("/update", response_class=HTMLResponse) -async def update_feed(request: Request, feed_url: str): +async def update_feed( + request: Request, + feed_url: str, + reader: Annotated[Reader, Depends(get_reader_dependency)], +): """Update a feed. Args: request: The request object. feed_url: The feed URL to update. - + reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -1270,11 +1377,15 @@ async def update_feed(request: Request, feed_url: str): @app.post("/backup") -async def manual_backup(request: Request) -> RedirectResponse: +async def manual_backup( + request: Request, + reader: Annotated[Reader, Depends(get_reader_dependency)], +) -> RedirectResponse: """Manually trigger a git backup of the current state. Args: request: The request object. + reader: The Reader instance. Returns: RedirectResponse: Redirect to the index page with a success or error message. @@ -1297,51 +1408,81 @@ async def manual_backup(request: Request) -> RedirectResponse: @app.get("/search", response_class=HTMLResponse) -async def search(request: Request, query: str): +async def search( + request: Request, + query: str, + reader: Annotated[Reader, Depends(get_reader_dependency)], +): """Get entries matching a full-text search query. Args: query: The query to search for. request: The request object. + reader: The Reader instance. Returns: HTMLResponse: The search page. """ reader.update_search() - context = create_search_context(query) + context = create_search_context(query, reader=reader) return templates.TemplateResponse(request=request, name="search.html", context={"request": request, **context}) @app.get("/post_entry", response_class=HTMLResponse) -async def post_entry(entry_id: str): +async def post_entry( + entry_id: str, + reader: Annotated[Reader, Depends(get_reader_dependency)], + feed_url: str = "", +): """Send single entry to Discord. Args: entry_id: The entry to send. + feed_url: Optional feed URL used to disambiguate entries with identical IDs. + reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. """ unquoted_entry_id: str = urllib.parse.unquote(entry_id) - entry: Entry | None = next((entry for entry in reader.get_entries() if entry.id == unquoted_entry_id), None) + clean_feed_url: str = urllib.parse.unquote(feed_url.strip()) if feed_url else "" + + # Prefer feed-scoped lookup when feed_url is provided. This avoids ambiguity when + # multiple feeds contain entries with the same ID. + entry: Entry | None = None + if clean_feed_url: + entry = next( + (entry for entry in reader.get_entries(feed=clean_feed_url) if entry.id == unquoted_entry_id), + None, + ) + else: + entry = next((entry for entry in reader.get_entries() if entry.id == unquoted_entry_id), None) + if entry is None: return HTMLResponse(status_code=404, content=f"Entry '{entry_id}' not found.") - if result := send_entry_to_discord(entry=entry): + if result := send_entry_to_discord(entry=entry, reader=reader): return result # Redirect to the feed page. - clean_feed_url: str = entry.feed.url.strip() - return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) + redirect_feed_url: str = entry.feed.url.strip() + return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(redirect_feed_url)}", status_code=303) @app.post("/modify_webhook", response_class=HTMLResponse) -def modify_webhook(old_hook: Annotated[str, Form()], new_hook: Annotated[str, Form()]): +def modify_webhook( + old_hook: Annotated[str, Form()], + new_hook: Annotated[str, Form()], + reader: Annotated[Reader, Depends(get_reader_dependency)], + redirect_to: Annotated[str, Form()] = "", +): """Modify a webhook. Args: old_hook: The webhook to modify. new_hook: The new webhook. + redirect_to: Optional redirect URL after the update. + reader: The Reader instance. Returns: RedirectResponse: Redirect to the webhook page. @@ -1356,15 +1497,20 @@ def modify_webhook(old_hook: Annotated[str, Form()], new_hook: Annotated[str, Fo # Webhooks are stored as a list of dictionaries. # Example: [{"name": "webhook_name", "url": "webhook_url"}] webhooks = cast("list[dict[str, str]]", webhooks) + old_hook_clean: str = old_hook.strip() + new_hook_clean: str = new_hook.strip() + webhook_modified: bool = False for hook in webhooks: - if hook["url"] in old_hook.strip(): - hook["url"] = new_hook.strip() + if hook["url"] in old_hook_clean: + hook["url"] = new_hook_clean # Check if it has been modified. - if hook["url"] != new_hook.strip(): + if hook["url"] != new_hook_clean: raise HTTPException(status_code=500, detail="Webhook could not be modified") + webhook_modified = True + # Add our new list of webhooks to the database. reader.set_tag((), "webhooks", webhooks) # pyright: ignore[reportArgumentType] @@ -1372,16 +1518,21 @@ def modify_webhook(old_hook: Annotated[str, Form()], new_hook: Annotated[str, Fo # matches the old one. feeds: Iterable[Feed] = reader.get_feeds() for feed in feeds: - try: - webhook = reader.get_tag(feed, "webhook") - except TagNotFoundError: - continue + webhook: str = str(reader.get_tag(feed, "webhook", "")) - if webhook == old_hook.strip(): - reader.set_tag(feed.url, "webhook", new_hook.strip()) # pyright: ignore[reportArgumentType] + if webhook == old_hook_clean: + reader.set_tag(feed.url, "webhook", new_hook_clean) # pyright: ignore[reportArgumentType] - # Redirect to the webhook page. - return RedirectResponse(url="/webhooks", status_code=303) + if webhook_modified and old_hook_clean != new_hook_clean: + commit_state_change(reader, f"Modify webhook URL from {old_hook_clean} to {new_hook_clean}") + + redirect_url: str = redirect_to.strip() or "/webhooks" + if redirect_to: + redirect_url = redirect_url.replace(urllib.parse.quote(old_hook_clean), urllib.parse.quote(new_hook_clean)) + redirect_url = redirect_url.replace(old_hook_clean, new_hook_clean) + + # Redirect to the requested page. + return RedirectResponse(url=redirect_url, status_code=303) def extract_youtube_video_id(url: str) -> str | None: @@ -1407,11 +1558,216 @@ def extract_youtube_video_id(url: str) -> str | None: return None -@app.get("/webhook_entries", response_class=HTMLResponse) -async def get_webhook_entries( # noqa: C901, PLR0912, PLR0914 +def resolve_final_feed_url(url: str) -> tuple[str, str | None]: + """Resolve a feed URL by following redirects. + + Args: + url: The feed URL to resolve. + + Returns: + tuple[str, str | None]: A tuple with (resolved_url, error_message). + error_message is None when resolution succeeded. + """ + clean_url: str = url.strip() + if not clean_url: + return "", "URL is empty" + + if not is_url_valid(clean_url): + return clean_url, "URL is invalid" + + try: + response: Response = httpx.get(clean_url, follow_redirects=True, timeout=10.0) + except httpx.HTTPError as e: + return clean_url, str(e) + + if not response.is_success: + return clean_url, f"HTTP {response.status_code}" + + return str(response.url), None + + +def create_webhook_feed_url_preview( + webhook_feeds: list[Feed], + replace_from: str, + replace_to: str, + resolve_urls: bool, # noqa: FBT001 + force_update: bool = False, # noqa: FBT001, FBT002 + existing_feed_urls: set[str] | None = None, +) -> list[dict[str, str | bool | None]]: + """Create preview rows for bulk feed URL replacement. + + Args: + webhook_feeds: Feeds attached to a webhook. + replace_from: Text to replace in each URL. + replace_to: Replacement text. + resolve_urls: Whether to resolve resulting URLs via HTTP redirects. + force_update: Whether conflicts should be marked as force-overwritable. + existing_feed_urls: Optional set of all tracked feed URLs used for conflict detection. + + Returns: + list[dict[str, str | bool | None]]: Rows used in the preview table. + """ + known_feed_urls: set[str] = existing_feed_urls or {feed.url for feed in webhook_feeds} + preview_rows: list[dict[str, str | bool | None]] = [] + for feed in webhook_feeds: + old_url: str = feed.url + has_match: bool = bool(replace_from and replace_from in old_url) + + candidate_url: str = old_url + if has_match: + candidate_url = old_url.replace(replace_from, replace_to) + + resolved_url: str = candidate_url + resolution_error: str | None = None + if has_match and candidate_url != old_url and resolve_urls: + resolved_url, resolution_error = resolve_final_feed_url(candidate_url) + + will_force_ignore_errors: bool = bool( + force_update and bool(resolution_error) and has_match and old_url != candidate_url, + ) + + target_exists: bool = bool( + has_match and not resolution_error and resolved_url != old_url and resolved_url in known_feed_urls, + ) + will_force_overwrite: bool = bool(target_exists and force_update) + will_change: bool = bool( + has_match + and old_url != (candidate_url if will_force_ignore_errors else resolved_url) + and (not target_exists or will_force_overwrite) + and (not resolution_error or will_force_ignore_errors), + ) + + preview_rows.append({ + "old_url": old_url, + "candidate_url": candidate_url, + "resolved_url": resolved_url, + "has_match": has_match, + "will_change": will_change, + "target_exists": target_exists, + "will_force_overwrite": will_force_overwrite, + "will_force_ignore_errors": will_force_ignore_errors, + "resolution_error": resolution_error, + }) + + return preview_rows + + +def build_webhook_mass_update_context( + webhook_feeds: list[Feed], + all_feeds: list[Feed], + replace_from: str, + replace_to: str, + resolve_urls: bool, # noqa: FBT001 + force_update: bool = False, # noqa: FBT001, FBT002 +) -> dict[str, str | bool | int | list[dict[str, str | bool | None]] | dict[str, int]]: + """Build context data used by the webhook mass URL update preview UI. + + Args: + webhook_feeds: Feeds attached to the selected webhook. + all_feeds: All tracked feeds. + replace_from: Text to replace in URLs. + replace_to: Replacement text. + resolve_urls: Whether to resolve resulting URLs. + force_update: Whether to allow overwriting existing target URLs. + + Returns: + dict[str, ...]: Context values for rendering preview controls and table. + """ + clean_replace_from: str = replace_from.strip() + clean_replace_to: str = replace_to.strip() + + preview_rows: list[dict[str, str | bool | None]] = [] + if clean_replace_from: + preview_rows = create_webhook_feed_url_preview( + webhook_feeds=webhook_feeds, + replace_from=clean_replace_from, + replace_to=clean_replace_to, + resolve_urls=resolve_urls, + force_update=force_update, + existing_feed_urls={feed.url for feed in all_feeds}, + ) + + preview_summary: dict[str, int] = { + "total": len(preview_rows), + "matched": sum(1 for row in preview_rows if row["has_match"]), + "will_update": sum(1 for row in preview_rows if row["will_change"]), + "conflicts": sum(1 for row in preview_rows if row["target_exists"] and not row["will_force_overwrite"]), + "force_overwrite": sum(1 for row in preview_rows if row["will_force_overwrite"]), + "force_ignore_errors": sum(1 for row in preview_rows if row["will_force_ignore_errors"]), + "resolve_errors": sum(1 for row in preview_rows if row["resolution_error"]), + } + preview_summary["no_match"] = preview_summary["total"] - preview_summary["matched"] + preview_summary["no_change"] = sum( + 1 for row in preview_rows if row["has_match"] and not row["resolution_error"] and not row["will_change"] + ) + + return { + "replace_from": clean_replace_from, + "replace_to": clean_replace_to, + "resolve_urls": resolve_urls, + "force_update": force_update, + "preview_rows": preview_rows, + "preview_summary": preview_summary, + "preview_change_count": preview_summary["will_update"], + } + + +@app.get("/webhook_entries_mass_update_preview", response_class=HTMLResponse) +async def get_webhook_entries_mass_update_preview( webhook_url: str, request: Request, + reader: Annotated[Reader, Depends(get_reader_dependency)], + replace_from: str = "", + replace_to: str = "", + resolve_urls: bool = True, # noqa: FBT001, FBT002 + force_update: bool = False, # noqa: FBT001, FBT002 +) -> HTMLResponse: + """Render the mass-update preview fragment for a webhook using HTMX. + + Args: + webhook_url: Webhook URL whose feeds are being updated. + request: The request object. + reader: The Reader instance. + replace_from: Text to find in URLs. + replace_to: Replacement text. + resolve_urls: Whether to resolve resulting URLs. + force_update: Whether to allow overwriting existing target URLs. + + Returns: + HTMLResponse: Rendered partial template containing summary + preview table. + """ + clean_webhook_url: str = urllib.parse.unquote(webhook_url.strip()) + all_feeds: list[Feed] = list(reader.get_feeds()) + webhook_feeds: list[Feed] = [ + feed for feed in all_feeds if str(reader.get_tag(feed.url, "webhook", "")) == clean_webhook_url + ] + + context = { + "request": request, + "webhook_url": clean_webhook_url, + **build_webhook_mass_update_context( + webhook_feeds=webhook_feeds, + all_feeds=all_feeds, + replace_from=replace_from, + replace_to=replace_to, + resolve_urls=resolve_urls, + force_update=force_update, + ), + } + return templates.TemplateResponse(request=request, name="_webhook_mass_update_preview.html", context=context) + + +@app.get("/webhook_entries", response_class=HTMLResponse) +async def get_webhook_entries( # noqa: C901, PLR0914 + webhook_url: str, + request: Request, + reader: Annotated[Reader, Depends(get_reader_dependency)], starting_after: str = "", + replace_from: str = "", + replace_to: str = "", + resolve_urls: bool = True, # noqa: FBT001, FBT002 + force_update: bool = False, # noqa: FBT001, FBT002 + message: str = "", ) -> HTMLResponse: """Get all latest entries from all feeds for a specific webhook. @@ -1419,6 +1775,12 @@ async def get_webhook_entries( # noqa: C901, PLR0912, PLR0914 webhook_url: The webhook URL to get entries for. request: The request object. starting_after: The entry to start after. Used for pagination. + replace_from: Optional URL substring to find for bulk URL replacement preview. + replace_to: Optional replacement substring used in bulk URL replacement preview. + resolve_urls: Whether to resolve replaced URLs by following redirects. + force_update: Whether to allow overwriting existing target URLs during apply. + message: Optional status message shown in the UI. + reader: The Reader instance. Returns: HTMLResponse: The webhook entries page. @@ -1440,24 +1802,26 @@ async def get_webhook_entries( # noqa: C901, PLR0912, PLR0914 if not webhook_name: raise HTTPException(status_code=404, detail=f"Webhook not found: {clean_webhook_url}") + hook_info: WebhookInfo = get_data_from_hook_url(hook_name=webhook_name, hook_url=clean_webhook_url) + # Get all feeds associated with this webhook all_feeds: list[Feed] = list(reader.get_feeds()) webhook_feeds: list[Feed] = [] for feed in all_feeds: - try: - feed_webhook: str = str(reader.get_tag(feed.url, "webhook", "")) - if feed_webhook == clean_webhook_url: - webhook_feeds.append(feed) - except TagNotFoundError: - continue + feed_webhook: str = str(reader.get_tag(feed.url, "webhook", "")) + if feed_webhook == clean_webhook_url: + webhook_feeds.append(feed) # Get all entries from all feeds for this webhook, sorted by published date all_entries: list[Entry] = [entry for feed in webhook_feeds for entry in reader.get_entries(feed=feed)] - # Sort entries by published date (newest first) + # Sort entries by published date (newest first), with undated entries last. all_entries.sort( - key=lambda e: e.published or datetime.now(tz=UTC), + key=lambda e: ( + e.published is not None, + e.published or datetime.min.replace(tzinfo=UTC), + ), reverse=True, ) @@ -1490,7 +1854,16 @@ async def get_webhook_entries( # noqa: C901, PLR0912, PLR0914 last_entry = paginated_entries[-1] # Create the html for the entries - html: str = create_html_for_feed(paginated_entries) + html: str = create_html_for_feed(reader=reader, entries=paginated_entries) + + mass_update_context = build_webhook_mass_update_context( + webhook_feeds=webhook_feeds, + all_feeds=all_feeds, + replace_from=replace_from, + replace_to=replace_to, + resolve_urls=resolve_urls, + force_update=force_update, + ) # Check if there are more entries available total_entries: int = len(all_entries) @@ -1498,18 +1871,155 @@ async def get_webhook_entries( # noqa: C901, PLR0912, PLR0914 context = { "request": request, + "hook_info": hook_info, "webhook_name": webhook_name, "webhook_url": clean_webhook_url, + "webhook_feeds": webhook_feeds, "entries": paginated_entries, "html": html, "last_entry": last_entry, "is_show_more_entries_button_visible": is_show_more_entries_button_visible, "total_entries": total_entries, "feeds_count": len(webhook_feeds), + "message": urllib.parse.unquote(message) if message else "", + **mass_update_context, } return templates.TemplateResponse(request=request, name="webhook_entries.html", context=context) +@app.post("/bulk_change_feed_urls", response_class=HTMLResponse) +async def post_bulk_change_feed_urls( # noqa: C901, PLR0914, PLR0912, PLR0915 + webhook_url: Annotated[str, Form()], + replace_from: Annotated[str, Form()], + reader: Annotated[Reader, Depends(get_reader_dependency)], + replace_to: Annotated[str, Form()] = "", + resolve_urls: Annotated[bool, Form()] = True, # noqa: FBT002 + force_update: Annotated[bool, Form()] = False, # noqa: FBT002 +) -> RedirectResponse: + """Bulk-change feed URLs attached to a webhook. + + Args: + webhook_url: The webhook URL whose feeds should be updated. + replace_from: Text to find in each URL. + replace_to: Text to replace with. + resolve_urls: Whether to resolve resulting URLs via redirects. + force_update: Whether existing target feed URLs should be overwritten. + reader: The Reader instance. + + Returns: + RedirectResponse: Redirect to webhook detail with status message. + + Raises: + HTTPException: If webhook is missing or replace_from is empty. + """ + clean_webhook_url: str = urllib.parse.unquote(webhook_url.strip()) + clean_replace_from: str = replace_from.strip() + clean_replace_to: str = replace_to.strip() + + if not clean_replace_from: + raise HTTPException(status_code=400, detail="replace_from cannot be empty") + + webhooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(reader.get_tag((), "webhooks", []))) + if not any(hook["url"] == clean_webhook_url for hook in webhooks): + raise HTTPException(status_code=404, detail=f"Webhook not found: {clean_webhook_url}") + + all_feeds: list[Feed] = list(reader.get_feeds()) + webhook_feeds: list[Feed] = [] + for feed in all_feeds: + feed_webhook: str = str(reader.get_tag(feed.url, "webhook", "")) + if feed_webhook == clean_webhook_url: + webhook_feeds.append(feed) + + preview_rows: list[dict[str, str | bool | None]] = create_webhook_feed_url_preview( + webhook_feeds=webhook_feeds, + replace_from=clean_replace_from, + replace_to=clean_replace_to, + resolve_urls=resolve_urls, + force_update=force_update, + existing_feed_urls={feed.url for feed in all_feeds}, + ) + + changed_count: int = 0 + skipped_count: int = 0 + failed_count: int = 0 + conflict_count: int = 0 + force_overwrite_count: int = 0 + + for row in preview_rows: + if not row["has_match"]: + continue + + if row["resolution_error"] and not force_update: + skipped_count += 1 + continue + + if row["target_exists"] and not force_update: + conflict_count += 1 + skipped_count += 1 + continue + + old_url: str = str(row["old_url"]) + new_url: str = str(row["candidate_url"] if row["will_force_ignore_errors"] else row["resolved_url"]) + + if old_url == new_url: + skipped_count += 1 + continue + + if row["target_exists"] and force_update: + try: + reader.delete_feed(new_url) + force_overwrite_count += 1 + except FeedNotFoundError: + pass + except ReaderError: + failed_count += 1 + continue + + try: + reader.change_feed_url(old_url, new_url) + except FeedExistsError: + skipped_count += 1 + continue + except FeedNotFoundError: + skipped_count += 1 + continue + except ReaderError: + failed_count += 1 + continue + + try: + reader.update_feed(new_url) + except Exception: + logger.exception("Failed to update feed after URL change: %s", new_url) + + for entry in reader.get_entries(feed=new_url, read=False): + try: + reader.set_entry_read(entry, True) + except Exception: + logger.exception("Failed to mark entry as read after URL change: %s", entry.id) + + changed_count += 1 + + if changed_count > 0: + commit_state_change( + reader, + f"Bulk change {changed_count} feed URL(s) for webhook {clean_webhook_url}", + ) + + status_message: str = ( + f"Updated {changed_count} feed URL(s). " + f"Force overwrote {force_overwrite_count}. " + f"Conflicts {conflict_count}. " + f"Skipped {skipped_count}. " + f"Failed {failed_count}." + ) + redirect_url: str = ( + f"/webhook_entries?webhook_url={urllib.parse.quote(clean_webhook_url)}" + f"&message={urllib.parse.quote(status_message)}" + ) + return RedirectResponse(url=redirect_url, status_code=303) + + if __name__ == "__main__": sentry_sdk.init( dsn="https://6e77a0d7acb9c7ea22e85a375e0ff1f4@o4505228040339456.ingest.us.sentry.io/4508792887967744", diff --git a/discord_rss_bot/missing_tags.py b/discord_rss_bot/missing_tags.py deleted file mode 100644 index 589893e..0000000 --- a/discord_rss_bot/missing_tags.py +++ /dev/null @@ -1,109 +0,0 @@ -from __future__ import annotations - -from reader import Feed -from reader import Reader -from reader import TagNotFoundError - -from discord_rss_bot.settings import default_custom_embed -from discord_rss_bot.settings import default_custom_message - - -def add_custom_message(reader: Reader, feed: Feed) -> None: - """Add the custom message tag to the feed if it doesn't exist. - - Args: - reader: What Reader to use. - feed: The feed to add the tag to. - """ - try: - reader.get_tag(feed, "custom_message") - except TagNotFoundError: - reader.set_tag(feed.url, "custom_message", default_custom_message) # pyright: ignore[reportArgumentType] - reader.set_tag(feed.url, "has_custom_message", True) # pyright: ignore[reportArgumentType] - - -def add_has_custom_message(reader: Reader, feed: Feed) -> None: - """Add the has_custom_message tag to the feed if it doesn't exist. - - Args: - reader: What Reader to use. - feed: The feed to add the tag to. - """ - try: - reader.get_tag(feed, "has_custom_message") - except TagNotFoundError: - if reader.get_tag(feed, "custom_message") == default_custom_message: - reader.set_tag(feed.url, "has_custom_message", False) # pyright: ignore[reportArgumentType] - else: - reader.set_tag(feed.url, "has_custom_message", True) # pyright: ignore[reportArgumentType] - - -def add_if_embed(reader: Reader, feed: Feed) -> None: - """Add the if_embed tag to the feed if it doesn't exist. - - Args: - reader: What Reader to use. - feed: The feed to add the tag to. - """ - try: - reader.get_tag(feed, "if_embed") - except TagNotFoundError: - reader.set_tag(feed.url, "if_embed", True) # pyright: ignore[reportArgumentType] - - -def add_custom_embed(reader: Reader, feed: Feed) -> None: - """Add the custom embed tag to the feed if it doesn't exist. - - Args: - reader: What Reader to use. - feed: The feed to add the tag to. - """ - try: - reader.get_tag(feed, "embed") - except TagNotFoundError: - reader.set_tag(feed.url, "embed", default_custom_embed) # pyright: ignore[reportArgumentType] - reader.set_tag(feed.url, "has_custom_embed", True) # pyright: ignore[reportArgumentType] - - -def add_has_custom_embed(reader: Reader, feed: Feed) -> None: - """Add the has_custom_embed tag to the feed if it doesn't exist. - - Args: - reader: What Reader to use. - feed: The feed to add the tag to. - """ - try: - reader.get_tag(feed, "has_custom_embed") - except TagNotFoundError: - if reader.get_tag(feed, "embed") == default_custom_embed: - reader.set_tag(feed.url, "has_custom_embed", False) # pyright: ignore[reportArgumentType] - else: - reader.set_tag(feed.url, "has_custom_embed", True) # pyright: ignore[reportArgumentType] - - -def add_should_send_embed(reader: Reader, feed: Feed) -> None: - """Add the should_send_embed tag to the feed if it doesn't exist. - - Args: - reader: What Reader to use. - feed: The feed to add the tag to. - """ - try: - reader.get_tag(feed, "should_send_embed") - except TagNotFoundError: - reader.set_tag(feed.url, "should_send_embed", True) # pyright: ignore[reportArgumentType] - - -def add_missing_tags(reader: Reader) -> None: - """Add missing tags to feeds. - - Args: - reader: What Reader to use. - """ - for feed in reader.get_feeds(): - add_custom_message(reader, feed) - add_has_custom_message(reader, feed) - add_if_embed(reader, feed) - add_custom_embed(reader, feed) - add_has_custom_embed(reader, feed) - add_should_send_embed(reader, feed) diff --git a/discord_rss_bot/search.py b/discord_rss_bot/search.py index a39f304..85129ac 100644 --- a/discord_rss_bot/search.py +++ b/discord_rss_bot/search.py @@ -3,8 +3,6 @@ from __future__ import annotations import urllib.parse from typing import TYPE_CHECKING -from discord_rss_bot.settings import get_reader - if TYPE_CHECKING: from collections.abc import Iterable @@ -14,19 +12,16 @@ if TYPE_CHECKING: from reader import Reader -def create_search_context(query: str, custom_reader: Reader | None = None) -> dict: +def create_search_context(query: str, reader: Reader) -> dict: """Build context for search.html template. - If custom_reader is None, use the default reader from settings. - Args: query (str): The search query. - custom_reader (Reader | None): Optional custom Reader instance. + reader (Reader): Custom Reader instance. Returns: dict: Context dictionary for rendering the search results. """ - reader: Reader = get_reader() if custom_reader is None else custom_reader search_results: Iterable[EntrySearchResult] = reader.search_entries(query) results: list[dict] = [] diff --git a/discord_rss_bot/settings.py b/discord_rss_bot/settings.py index e91c3c0..194bf08 100644 --- a/discord_rss_bot/settings.py +++ b/discord_rss_bot/settings.py @@ -7,7 +7,6 @@ from pathlib import Path from platformdirs import user_data_dir from reader import Reader -from reader import TagNotFoundError from reader import make_reader if typing.TYPE_CHECKING: @@ -48,9 +47,7 @@ def get_reader(custom_location: Path | None = None) -> Reader: # https://reader.readthedocs.io/en/latest/api.html#reader.types.UpdateConfig # Set the default update interval to 15 minutes if not already configured # Users can change this via the Settings page or per-feed in the feed page - try: - reader.get_tag((), ".reader.update") - except TagNotFoundError: + if reader.get_tag((), ".reader.update", None) is None: # Set default reader.set_tag((), ".reader.update", {"interval": 15}) diff --git a/discord_rss_bot/templates/_webhook_mass_update_preview.html b/discord_rss_bot/templates/_webhook_mass_update_preview.html new file mode 100644 index 0000000..a59e97b --- /dev/null +++ b/discord_rss_bot/templates/_webhook_mass_update_preview.html @@ -0,0 +1,73 @@ +{% if preview_rows %} +

+ {{ preview_change_count }} feed URL{{ 's' if preview_change_count != 1 else '' }} ready to update. +

+
+ Total: {{ preview_summary.total }} + Matched: {{ preview_summary.matched }} + Will update: {{ preview_summary.will_update }} + Conflicts: {{ preview_summary.conflicts }} + Force overwrite: {{ preview_summary.force_overwrite }} + Force ignore errors: {{ preview_summary.force_ignore_errors }} + Resolve errors: {{ preview_summary.resolve_errors }} + No change: {{ preview_summary.no_change }} + No match: {{ preview_summary.no_match }} +
+
+ + + + + + +
+
+ + + + + + + + + + {% for row in preview_rows %} + + + + + + {% endfor %} + +
Old URLNew URLStatus
+ {{ row.old_url }} + + {{ row.resolved_url if resolve_urls else row.candidate_url }} + + {% if not row.has_match %} + No match + {% elif row.will_force_ignore_errors %} + Will force update (ignore resolve error) + {% elif row.resolution_error %} + {{ row.resolution_error }} + {% elif row.will_force_overwrite %} + Will force overwrite + {% elif row.target_exists %} + Conflict: target URL exists + {% elif row.will_change %} + Will update + {% else %} + No change + {% endif %} +
+
+{% elif replace_from %} +

No preview rows found for that replacement pattern.

+{% endif %} diff --git a/discord_rss_bot/templates/base.html b/discord_rss_bot/templates/base.html index a8640dd..9146b35 100644 --- a/discord_rss_bot/templates/base.html +++ b/discord_rss_bot/templates/base.html @@ -1,13 +1,12 @@ - + content="Stay updated with the latest news and events with our easy-to-use RSS bot. Never miss a message or announcement again with real-time notifications directly to your Discord server." /> + content="discord, rss, bot, notifications, announcements, updates, real-time, server, messages, news, events, feed." /> @@ -18,19 +17,20 @@ {% block head %} {% endblock head %} - {% include "nav.html" %}
{% if messages %} - + {% endif %} - {% block content %} {% endblock content %}
@@ -41,18 +41,20 @@
+ - diff --git a/discord_rss_bot/templates/index.html b/discord_rss_bot/templates/index.html index 06b8578..341ec38 100644 --- a/discord_rss_bot/templates/index.html +++ b/discord_rss_bot/templates/index.html @@ -32,10 +32,10 @@ {% for hook_from_context in webhooks %}
diff --git a/discord_rss_bot/templates/webhook_entries.html b/discord_rss_bot/templates/webhook_entries.html index eb12487..f0bc970 100644 --- a/discord_rss_bot/templates/webhook_entries.html +++ b/discord_rss_bot/templates/webhook_entries.html @@ -1,20 +1,149 @@ {% extends "base.html" %} {% block title %} - | {{ webhook_name }} - Latest Entries + | {{ webhook_name }} {% endblock title %} {% block content %} + {% if message %}{% endif %}
- -

{{ webhook_name }} - Latest Entries ({{ total_entries }} total from {{ feeds_count }} feeds)

- -
-

- {{ webhook_url }} -

+
+
+

{{ webhook_name }}

+

+ {{ total_entries }} total from {{ feeds_count }} feed{{ 's' if feeds_count != 1 else '' }} +

+

+ {{ webhook_url }} +

+
+ +
+
+
+
+
+

Settings

+ +
+ + +
+ + +
+
+ +
+
+
+ + +
+
+

Mass update feed URLs

+

Replace part of feed URLs for all feeds attached to this webhook.

+
+ +
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ +
+
+
{% include "_webhook_mass_update_preview.html" %}
+
+
+
+
+

Attached feeds

+ {% if webhook_feeds %} + + {% else %} +

No feeds are attached to this webhook yet.

+ {% endif %} +
{# Rendered HTML content #} {% if entries %} +

Latest entries

{{ html|safe }}
{% if is_show_more_entries_button_visible and last_entry %} None: + +def pytest_addoption(parser: pytest.Parser) -> None: + """Register custom command-line options for optional integration tests.""" + parser.addoption( + "--run-real-git-backup-tests", + action="store_true", + default=False, + help="Run tests that push git backup state to a real repository.", + ) + + +def pytest_sessionstart(session: pytest.Session) -> None: """Isolate persistent app state per xdist worker to avoid cross-worker test interference.""" worker_id: str = os.environ.get("PYTEST_XDIST_WORKER", "gw0") worker_data_dir: Path = Path(tempfile.gettempdir()) / "discord-rss-bot-tests" / worker_id @@ -37,4 +51,10 @@ def pytest_configure() -> None: current_reader.close() get_reader: Any = getattr(settings_module, "get_reader", None) if callable(get_reader): - main_module.reader = get_reader() + get_reader() + + +def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item]) -> None: + """Skip real git-repo push tests unless explicitly requested.""" + if config.getoption("--run-real-git-backup-tests"): + return diff --git a/tests/test_blacklist.py b/tests/test_blacklist.py index 354fdca..0c756ad 100644 --- a/tests/test_blacklist.py +++ b/tests/test_blacklist.py @@ -38,7 +38,7 @@ def test_has_black_tags() -> None: # Test feed without any blacklist tags assert_msg: str = "Feed should not have any blacklist tags" - assert feed_has_blacklist_tags(custom_reader=get_reader(), feed=feed) is False, assert_msg + assert feed_has_blacklist_tags(reader=get_reader(), feed=feed) is False, assert_msg check_if_has_tag(reader, feed, "blacklist_title") check_if_has_tag(reader, feed, "blacklist_summary") @@ -58,11 +58,11 @@ def test_has_black_tags() -> None: def check_if_has_tag(reader: Reader, feed: Feed, blacklist_name: str) -> None: reader.set_tag(feed, blacklist_name, "a") # pyright: ignore[reportArgumentType] assert_msg: str = f"Feed should have blacklist tags: {blacklist_name}" - assert feed_has_blacklist_tags(custom_reader=reader, feed=feed) is True, assert_msg + assert feed_has_blacklist_tags(reader=reader, feed=feed) is True, assert_msg asset_msg: str = f"Feed should not have any blacklist tags: {blacklist_name}" reader.delete_tag(feed, blacklist_name) - assert feed_has_blacklist_tags(custom_reader=reader, feed=feed) is False, asset_msg + assert feed_has_blacklist_tags(reader=reader, feed=feed) is False, asset_msg def test_should_be_skipped() -> None: diff --git a/tests/test_custom_filter.py b/tests/test_custom_filter.py index 5538608..9611698 100644 --- a/tests/test_custom_filter.py +++ b/tests/test_custom_filter.py @@ -45,39 +45,39 @@ def test_entry_is_whitelisted() -> None: Path.mkdir(Path(temp_dir), exist_ok=True) custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite") - custom_reader: Reader = get_reader(custom_location=str(custom_loc)) + reader: Reader = get_reader(custom_location=str(custom_loc)) # Add a feed to the database. - custom_reader.add_feed("https://lovinator.space/rss_test.xml") - custom_reader.update_feed("https://lovinator.space/rss_test.xml") + reader.add_feed("https://lovinator.space/rss_test.xml") + reader.update_feed("https://lovinator.space/rss_test.xml") # whitelist_title - custom_reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] - for entry in custom_reader.get_entries(): - if entry_is_whitelisted(entry) is True: + reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] + for entry in reader.get_entries(): + if entry_is_whitelisted(entry, reader=reader) is True: assert entry.title == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.title}" break - custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_title") + reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_title") # whitelist_summary - custom_reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] - for entry in custom_reader.get_entries(): - if entry_is_whitelisted(entry) is True: + reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] + for entry in reader.get_entries(): + if entry_is_whitelisted(entry, reader=reader) is True: assert entry.summary == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.summary}" break - custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_summary") + reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_summary") # whitelist_content - custom_reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] - for entry in custom_reader.get_entries(): - if entry_is_whitelisted(entry) is True: + reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] + for entry in reader.get_entries(): + if entry_is_whitelisted(entry, reader=reader) is True: assert_msg = f"Expected:

ffdnfdnfdnfdnfdndfn

, Got: {entry.content[0].value}" assert entry.content[0].value == "

ffdnfdnfdnfdnfdndfn

", assert_msg break - custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_content") + reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_content") # Close the reader, so we can delete the directory. - custom_reader.close() + reader.close() def test_entry_is_blacklisted() -> None: @@ -87,36 +87,36 @@ def test_entry_is_blacklisted() -> None: Path.mkdir(Path(temp_dir), exist_ok=True) custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite") - custom_reader: Reader = get_reader(custom_location=str(custom_loc)) + reader: Reader = get_reader(custom_location=str(custom_loc)) # Add a feed to the database. - custom_reader.add_feed("https://lovinator.space/rss_test.xml") - custom_reader.update_feed("https://lovinator.space/rss_test.xml") + reader.add_feed("https://lovinator.space/rss_test.xml") + reader.update_feed("https://lovinator.space/rss_test.xml") # blacklist_title - custom_reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] - for entry in custom_reader.get_entries(): - if entry_is_blacklisted(entry) is True: + reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] + for entry in reader.get_entries(): + if entry_is_blacklisted(entry, reader=reader) is True: assert entry.title == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.title}" break - custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_title") + reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_title") # blacklist_summary - custom_reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] - for entry in custom_reader.get_entries(): - if entry_is_blacklisted(entry) is True: + reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] + for entry in reader.get_entries(): + if entry_is_blacklisted(entry, reader=reader) is True: assert entry.summary == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.summary}" break - custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_summary") + reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_summary") # blacklist_content - custom_reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] - for entry in custom_reader.get_entries(): - if entry_is_blacklisted(entry) is True: + reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] + for entry in reader.get_entries(): + if entry_is_blacklisted(entry, reader=reader) is True: assert_msg = f"Expected:

ffdnfdnfdnfdnfdndfn

, Got: {entry.content[0].value}" assert entry.content[0].value == "

ffdnfdnfdnfdnfdndfn

", assert_msg break - custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_content") + reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_content") # Close the reader, so we can delete the directory. - custom_reader.close() + reader.close() diff --git a/tests/test_custom_message.py b/tests/test_custom_message.py index 6fc4d41..4b23f45 100644 --- a/tests/test_custom_message.py +++ b/tests/test_custom_message.py @@ -102,12 +102,10 @@ def test_format_entry_html_for_discord_does_not_preserve_invalid_timestamp_style @patch("discord_rss_bot.custom_message.get_custom_message") -@patch("discord_rss_bot.custom_message.get_reader") def test_replace_tags_in_text_message_preserves_timestamp_tags( - mock_get_reader: MagicMock, mock_get_custom_message: MagicMock, ) -> None: - mock_get_reader.return_value = MagicMock() + mock_reader = MagicMock() mock_get_custom_message.return_value = "{{entry_summary}}" summary_parts: list[str] = [ f"

Format {index}: ({timestamp_tag.replace('<', '<').replace('>', '>')})

" @@ -116,19 +114,17 @@ def test_replace_tags_in_text_message_preserves_timestamp_tags( entry_ns: SimpleNamespace = make_entry("".join(summary_parts)) entry: Entry = typing.cast("Entry", entry_ns) - rendered: str = replace_tags_in_text_message(entry) + rendered: str = replace_tags_in_text_message(entry, reader=mock_reader) for timestamp_tag in TIMESTAMP_FORMATS: assert timestamp_tag in rendered @patch("discord_rss_bot.custom_message.get_embed") -@patch("discord_rss_bot.custom_message.get_reader") def test_replace_tags_in_embed_preserves_timestamp_tags( - mock_get_reader: MagicMock, mock_get_embed: MagicMock, ) -> None: - mock_get_reader.return_value = MagicMock() + mock_reader = MagicMock() mock_get_embed.return_value = CustomEmbed(description="{{entry_summary}}") summary_parts: list[str] = [ f"

Format {index}: ({timestamp_tag.replace('<', '<').replace('>', '>')})

" @@ -138,7 +134,7 @@ def test_replace_tags_in_embed_preserves_timestamp_tags( entry: Entry = typing.cast("Entry", entry_ns) - embed: CustomEmbed = replace_tags_in_embed(entry_ns.feed, entry) + embed: CustomEmbed = replace_tags_in_embed(entry_ns.feed, entry, reader=mock_reader) for timestamp_tag in TIMESTAMP_FORMATS: assert timestamp_tag in embed.description diff --git a/tests/test_feeds.py b/tests/test_feeds.py index 2ffe47e..84e836c 100644 --- a/tests/test_feeds.py +++ b/tests/test_feeds.py @@ -18,7 +18,6 @@ from discord_rss_bot.feeds import send_entry_to_discord from discord_rss_bot.feeds import send_to_discord from discord_rss_bot.feeds import should_send_embed_check from discord_rss_bot.feeds import truncate_webhook_message -from discord_rss_bot.missing_tags import add_missing_tags def test_send_to_discord() -> None: @@ -35,8 +34,6 @@ def test_send_to_discord() -> None: # Add a feed to the reader. reader.add_feed("https://www.reddit.com/r/Python/.rss") - add_missing_tags(reader) - # Update the feed to get the entries. reader.update_feeds() @@ -58,7 +55,7 @@ def test_send_to_discord() -> None: assert reader.get_tag(feed, "webhook") == webhook_url, f"The webhook URL should be '{webhook_url}'." # Send the feed to Discord. - send_to_discord(custom_reader=reader, feed=feed, do_once=True) + send_to_discord(reader=reader, feed=feed, do_once=True) # Close the reader, so we can delete the directory. reader.close() @@ -191,7 +188,7 @@ def test_send_entry_to_discord_youtube_feed( mock_discord_webhook.return_value = mock_webhook # Call the function - send_entry_to_discord(mock_entry) + send_entry_to_discord(mock_entry, mock_reader) # Assertions mock_create_embed.assert_not_called() @@ -203,7 +200,7 @@ def test_send_entry_to_discord_youtube_feed( assert webhook_call_kwargs["url"] == "https://discord.com/api/webhooks/123/abc" # Verify execute_webhook was called - mock_execute_webhook.assert_called_once_with(mock_webhook, mock_entry) + mock_execute_webhook.assert_called_once_with(mock_webhook, mock_entry, reader=mock_reader) def test_extract_domain_youtube_feed() -> None: diff --git a/tests/test_git_backup.py b/tests/test_git_backup.py index 0fa6f8e..183d178 100644 --- a/tests/test_git_backup.py +++ b/tests/test_git_backup.py @@ -304,102 +304,6 @@ def test_commit_state_change_no_push_when_remote_unset(monkeypatch: pytest.Monke assert not push_calls, "git push should NOT be called when GIT_BACKUP_REMOTE is not set" -@SKIP_IF_NO_GIT -def test_commit_state_change_e2e_push_to_bare_repo(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: - """End-to-end test: commit_state_change pushes to a real bare git repository.""" - git_executable: str | None = shutil.which("git") - assert git_executable is not None, "git executable not found" - - # Create a bare remote repository - bare_repo_path: Path = tmp_path / "remote.git" - subprocess.run([git_executable, "init", "--bare", str(bare_repo_path)], check=True, capture_output=True) # noqa: S603 - - # Configure backup with remote pointing to bare repo - backup_path: Path = tmp_path / "backup" - monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path)) - monkeypatch.setenv("GIT_BACKUP_REMOTE", str(bare_repo_path)) - - # Create mock reader with some state - mock_reader = MagicMock() - feed1 = MagicMock() - feed1.url = "https://example.com/feed.rss" - mock_reader.get_feeds.return_value = [feed1] - - def get_tag_side_effect( - feed_or_key: tuple | str, - tag: str | None = None, - default: str | None = None, - ) -> list[Any] | str | None: - if feed_or_key == (): - return [] - if tag == "webhook": - return "https://discord.com/api/webhooks/123/abc" - return default - - mock_reader.get_tag.side_effect = get_tag_side_effect - - # Perform backup with commit and push - commit_state_change(mock_reader, "Initial backup") - - # Verify commit exists in local backup repo - result: subprocess.CompletedProcess[str] = subprocess.run( # noqa: S603 - [git_executable, "-C", str(backup_path), "log", "--oneline"], - capture_output=True, - text=True, - check=True, - ) - assert "Initial backup" in result.stdout - - # Verify origin remote is configured correctly - result = subprocess.run( # noqa: S603 - [git_executable, "-C", str(backup_path), "remote", "get-url", "origin"], - capture_output=True, - text=True, - check=True, - ) - assert result.stdout.strip() == str(bare_repo_path) - - # Verify commit was pushed to the bare remote - result = subprocess.run( # noqa: S603 - [git_executable, "-C", str(bare_repo_path), "log", "--oneline", "master"], - capture_output=True, - text=True, - check=True, - ) - assert "Initial backup" in result.stdout - - # Verify state.json content in the remote - result = subprocess.run( # noqa: S603 - [git_executable, "-C", str(bare_repo_path), "show", "master:state.json"], - capture_output=True, - text=True, - check=True, - ) - state_data: dict[str, Any] = json.loads(result.stdout) - assert state_data["feeds"][0]["url"] == "https://example.com/feed.rss" - assert state_data["feeds"][0]["webhook"] == "https://discord.com/api/webhooks/123/abc" - - # Perform a second backup to verify subsequent pushes work - feed2 = MagicMock() - feed2.url = "https://another.com/feed.xml" - mock_reader.get_feeds.return_value = [feed1, feed2] - - commit_state_change(mock_reader, "Add second feed") - - # Verify both commits are in the remote - result = subprocess.run( # noqa: S603 - [git_executable, "-C", str(bare_repo_path), "log", "--oneline", "master"], - capture_output=True, - text=True, - check=True, - ) - assert "Initial backup" in result.stdout - assert "Add second feed" in result.stdout - - -# Integration tests for embed-related endpoint backups - - client: TestClient = TestClient(app) test_webhook_name: str = "Test Backup Webhook" test_webhook_url: str = "https://discord.com/api/webhooks/999999999/testbackupwebhook" diff --git a/tests/test_main.py b/tests/test_main.py index dc3ecf5..f6396eb 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -4,13 +4,19 @@ import re import urllib.parse from dataclasses import dataclass from dataclasses import field +from datetime import UTC +from datetime import datetime from typing import TYPE_CHECKING from typing import cast +from unittest.mock import MagicMock +from unittest.mock import patch from fastapi.testclient import TestClient +import discord_rss_bot.main as main_module from discord_rss_bot.main import app from discord_rss_bot.main import create_html_for_feed +from discord_rss_bot.main import get_reader_dependency if TYPE_CHECKING: from pathlib import Path @@ -154,6 +160,9 @@ def test_get() -> None: response: Response = client.get(url="/webhooks") assert response.status_code == 200, f"/webhooks failed: {response.text}" + response = client.get(url="/webhook_entries", params={"webhook_url": webhook_url}) + assert response.status_code == 200, f"/webhook_entries failed: {response.text}" + response: Response = client.get(url="/whitelist", params={"feed_url": encoded_feed_url(feed_url)}) assert response.status_code == 200, f"/whitelist failed: {response.text}" @@ -299,6 +308,147 @@ def test_change_feed_url() -> None: client.post(url="/remove", data={"feed_url": new_feed_url}) +def test_change_feed_url_marks_entries_as_read() -> None: + """After changing a feed URL all entries on the new feed should be marked read to prevent resending.""" + new_feed_url = "https://lovinator.space/rss_test_small.xml" + + # Ensure feeds do not already exist. + client.post(url="/remove", data={"feed_url": feed_url}) + client.post(url="/remove", data={"feed_url": new_feed_url}) + + # Ensure webhook exists. + client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) + client.post(url="/add_webhook", data={"webhook_name": webhook_name, "webhook_url": webhook_url}) + + # Add the original feed. + response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name}) + assert response.status_code == 200, f"Failed to add feed: {response.text}" + + # Patch reader on the main module so we can observe calls. + mock_entry_a = MagicMock() + mock_entry_a.id = "entry-a" + mock_entry_b = MagicMock() + mock_entry_b.id = "entry-b" + + real_reader = main_module.get_reader_dependency() + + # Use a no-redirect client so the POST response is inspected directly; the + # redirect target (/feed?feed_url=…) would 404 because change_feed_url is mocked. + no_redirect_client = TestClient(app, follow_redirects=False) + + with ( + patch.object(real_reader, "get_entries", return_value=[mock_entry_a, mock_entry_b]) as mock_get_entries, + patch.object(real_reader, "set_entry_read") as mock_set_read, + patch.object(real_reader, "update_feed") as mock_update_feed, + patch.object(real_reader, "change_feed_url"), + ): + response = no_redirect_client.post( + url="/change_feed_url", + data={"old_feed_url": feed_url, "new_feed_url": new_feed_url}, + ) + assert response.status_code == 303, f"Expected 303 redirect, got {response.status_code}: {response.text}" + + # update_feed should have been called with the new URL. + mock_update_feed.assert_called_once_with(new_feed_url) + + # get_entries should have been called to fetch unread entries on the new URL. + mock_get_entries.assert_called_once_with(feed=new_feed_url, read=False) + + # Every returned entry should have been marked as read. + assert mock_set_read.call_count == 2, f"Expected 2 set_entry_read calls, got {mock_set_read.call_count}" + mock_set_read.assert_any_call(mock_entry_a, True) + mock_set_read.assert_any_call(mock_entry_b, True) + + # Cleanup. + client.post(url="/remove", data={"feed_url": feed_url}) + client.post(url="/remove", data={"feed_url": new_feed_url}) + + +def test_change_feed_url_empty_old_url_returns_400() -> None: + """Submitting an empty old_feed_url should return HTTP 400.""" + response: Response = client.post( + url="/change_feed_url", + data={"old_feed_url": " ", "new_feed_url": "https://example.com/feed.xml"}, + ) + assert response.status_code == 400, f"Expected 400 for empty old URL, got {response.status_code}" + + +def test_change_feed_url_empty_new_url_returns_400() -> None: + """Submitting a blank new_feed_url should return HTTP 400.""" + response: Response = client.post( + url="/change_feed_url", + data={"old_feed_url": feed_url, "new_feed_url": " "}, + ) + assert response.status_code == 400, f"Expected 400 for blank new URL, got {response.status_code}" + + +def test_change_feed_url_nonexistent_old_url_returns_404() -> None: + """Trying to rename a feed that does not exist should return HTTP 404.""" + non_existent = "https://does-not-exist.example.com/rss.xml" + # Make sure it really is absent. + client.post(url="/remove", data={"feed_url": non_existent}) + + response: Response = client.post( + url="/change_feed_url", + data={"old_feed_url": non_existent, "new_feed_url": "https://example.com/new.xml"}, + ) + assert response.status_code == 404, f"Expected 404 for non-existent feed, got {response.status_code}" + + +def test_change_feed_url_new_url_already_exists_returns_409() -> None: + """Changing to a URL that is already tracked should return HTTP 409.""" + second_feed_url = "https://lovinator.space/rss_test_small.xml" + + # Ensure both feeds are absent. + client.post(url="/remove", data={"feed_url": feed_url}) + client.post(url="/remove", data={"feed_url": second_feed_url}) + + # Ensure webhook exists. + client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) + client.post(url="/add_webhook", data={"webhook_name": webhook_name, "webhook_url": webhook_url}) + + # Add both feeds. + client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name}) + client.post(url="/add", data={"feed_url": second_feed_url, "webhook_dropdown": webhook_name}) + + # Try to rename one to the other. + response: Response = client.post( + url="/change_feed_url", + data={"old_feed_url": feed_url, "new_feed_url": second_feed_url}, + ) + assert response.status_code == 409, f"Expected 409 when new URL already exists, got {response.status_code}" + + # Cleanup. + client.post(url="/remove", data={"feed_url": feed_url}) + client.post(url="/remove", data={"feed_url": second_feed_url}) + + +def test_change_feed_url_same_url_redirects_without_error() -> None: + """Changing a feed's URL to itself should redirect cleanly without any error.""" + # Ensure webhook exists. + client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) + client.post(url="/add_webhook", data={"webhook_name": webhook_name, "webhook_url": webhook_url}) + + # Add the feed. + client.post(url="/remove", data={"feed_url": feed_url}) + response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name}) + assert response.status_code == 200, f"Failed to add feed: {response.text}" + + # Submit the same URL as both old and new. + response = client.post( + url="/change_feed_url", + data={"old_feed_url": feed_url, "new_feed_url": feed_url}, + ) + assert response.status_code == 200, f"Expected 200 redirect for same URL, got {response.status_code}" + + # Feed should still be accessible. + response = client.get(url="/feed", params={"feed_url": feed_url}) + assert response.status_code == 200, f"Feed should still exist after no-op URL change: {response.text}" + + # Cleanup. + client.post(url="/remove", data={"feed_url": feed_url}) + + def test_delete_webhook() -> None: """Test the /delete_webhook page.""" # Remove the feed if it already exists before we run the test. @@ -335,6 +485,110 @@ def test_update_feed_not_found() -> None: assert "Feed not found" in response.text +def test_post_entry_send_to_discord() -> None: + """Test that /post_entry sends an entry to Discord and redirects to the feed page. + + Regression test for the bug where the injected reader was not passed to + send_entry_to_discord, meaning the dependency-injected reader was silently ignored. + """ + # Ensure webhook and feed exist. + client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) + response: Response = client.post( + url="/add_webhook", + data={"webhook_name": webhook_name, "webhook_url": webhook_url}, + ) + assert response.status_code == 200, f"Failed to add webhook: {response.text}" + + client.post(url="/remove", data={"feed_url": feed_url}) + response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name}) + assert response.status_code == 200, f"Failed to add feed: {response.text}" + + # Retrieve an entry from the feed to get a valid entry ID. + reader: main_module.Reader = main_module.get_reader_dependency() + entries: list[Entry] = list(reader.get_entries(feed=feed_url, limit=1)) + assert entries, "Feed should have at least one entry to send" + entry_to_send: main_module.Entry = entries[0] + encoded_id: str = urllib.parse.quote(entry_to_send.id) + + no_redirect_client = TestClient(app, follow_redirects=False) + + # Patch execute_webhook so no real HTTP requests are made to Discord. + with patch("discord_rss_bot.feeds.execute_webhook") as mock_execute: + response = no_redirect_client.get( + url="/post_entry", + params={"entry_id": encoded_id, "feed_url": urllib.parse.quote(feed_url)}, + ) + + assert response.status_code == 303, f"Expected redirect after sending, got {response.status_code}: {response.text}" + location: str = response.headers.get("location", "") + assert "feed?feed_url=" in location, f"Should redirect to feed page, got: {location}" + assert mock_execute.called, "execute_webhook should have been called to deliver the entry to Discord" + + # Cleanup. + client.post(url="/remove", data={"feed_url": feed_url}) + + +def test_post_entry_unknown_id_returns_404() -> None: + """Test that /post_entry returns 404 when the entry ID does not exist.""" + response: Response = client.get( + url="/post_entry", + params={"entry_id": "https://nonexistent.example.com/entry-that-does-not-exist"}, + ) + assert response.status_code == 404, f"Expected 404 for unknown entry, got {response.status_code}" + + +def test_post_entry_uses_feed_url_to_disambiguate_duplicate_ids() -> None: + """When IDs collide across feeds, /post_entry should pick the entry from provided feed_url.""" + + @dataclass(slots=True) + class DummyFeed: + url: str + + @dataclass(slots=True) + class DummyEntry: + id: str + feed: DummyFeed + feed_url: str + + feed_a = "https://example.com/feed-a.xml" + feed_b = "https://example.com/feed-b.xml" + shared_id = "https://example.com/shared-entry-id" + + entry_a: Entry = cast("Entry", DummyEntry(id=shared_id, feed=DummyFeed(feed_a), feed_url=feed_a)) + entry_b: Entry = cast("Entry", DummyEntry(id=shared_id, feed=DummyFeed(feed_b), feed_url=feed_b)) + + class StubReader: + def get_entries(self, feed: str | None = None) -> list[Entry]: + if feed == feed_a: + return [entry_a] + if feed == feed_b: + return [entry_b] + return [entry_a, entry_b] + + selected_feed_urls: list[str] = [] + + def fake_send_entry_to_discord(entry: Entry, reader: object) -> None: + selected_feed_urls.append(entry.feed.url) + + app.dependency_overrides[get_reader_dependency] = StubReader + no_redirect_client = TestClient(app, follow_redirects=False) + + try: + with patch("discord_rss_bot.main.send_entry_to_discord", side_effect=fake_send_entry_to_discord): + response: Response = no_redirect_client.get( + url="/post_entry", + params={"entry_id": urllib.parse.quote(shared_id), "feed_url": urllib.parse.quote(feed_b)}, + ) + + assert response.status_code == 303, f"Expected redirect after sending, got {response.status_code}" + assert selected_feed_urls == [feed_b], f"Expected feed-b entry, got: {selected_feed_urls}" + + location = response.headers.get("location", "") + assert urllib.parse.quote(feed_b) in location, f"Expected redirect to feed-b page, got: {location}" + finally: + app.dependency_overrides = {} + + def test_navbar_backup_link_hidden_when_not_configured(monkeypatch: pytest.MonkeyPatch) -> None: """Test that the backup link is not shown in the navbar when GIT_BACKUP_PATH is not set.""" # Ensure GIT_BACKUP_PATH is not set @@ -573,11 +827,24 @@ def test_create_html_marks_entries_from_another_feed(monkeypatch: pytest.MonkeyP original_feed_url="https://example.com/feed-b.xml", ) - monkeypatch.setattr("discord_rss_bot.main.replace_tags_in_text_message", lambda _entry: "Rendered content") - monkeypatch.setattr("discord_rss_bot.main.entry_is_blacklisted", lambda _entry: False) - monkeypatch.setattr("discord_rss_bot.main.entry_is_whitelisted", lambda _entry: False) + monkeypatch.setattr( + "discord_rss_bot.main.replace_tags_in_text_message", + lambda _entry, **_kwargs: "Rendered content", + ) + monkeypatch.setattr("discord_rss_bot.main.entry_is_blacklisted", lambda _entry, **_kwargs: False) + monkeypatch.setattr("discord_rss_bot.main.entry_is_whitelisted", lambda _entry, **_kwargs: False) - html = create_html_for_feed(cast("list[Entry]", [same_feed_entry, other_feed_entry]), selected_feed_url) + same_feed_entry_typed: Entry = cast("Entry", same_feed_entry) + other_feed_entry_typed: Entry = cast("Entry", other_feed_entry) + + html: str = create_html_for_feed( + reader=MagicMock(), + current_feed_url=selected_feed_url, + entries=[ + same_feed_entry_typed, + other_feed_entry_typed, + ], + ) assert "From another feed: https://example.com/feed-b.xml" in html assert "From another feed: https://example.com/feed-a.xml" not in html @@ -620,6 +887,32 @@ def test_webhook_entries_no_feeds() -> None: assert "No feeds found" in response.text or "Add feeds" in response.text, "Expected message about no feeds" +def test_webhook_entries_no_feeds_still_shows_webhook_settings() -> None: + """The webhook detail view should show settings/actions even with no attached feeds.""" + client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) + response: Response = client.post( + url="/add_webhook", + data={"webhook_name": webhook_name, "webhook_url": webhook_url}, + ) + assert response.status_code == 200, f"Failed to add webhook: {response.text}" + + response = client.get( + url="/webhook_entries", + params={"webhook_url": webhook_url}, + ) + + assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}" + assert "Settings" in response.text, "Expected settings card on webhook detail view" + assert "Modify Webhook" in response.text, "Expected modify form on webhook detail view" + assert "Delete Webhook" in response.text, "Expected delete action on webhook detail view" + assert "Back to dashboard" in response.text, "Expected dashboard navigation link" + assert "All webhooks" in response.text, "Expected all webhooks navigation link" + assert f'name="old_hook" value="{webhook_url}"' in response.text, "Expected old_hook hidden input" + assert f'value="/webhook_entries?webhook_url={urllib.parse.quote(webhook_url)}"' in response.text, ( + "Expected modify form to redirect back to the current webhook detail view" + ) + + def test_webhook_entries_with_feeds_no_entries() -> None: """Test webhook_entries endpoint when webhook has feeds but no entries yet.""" # Clean up and create fresh webhook @@ -681,6 +974,38 @@ def test_webhook_entries_with_entries() -> None: assert webhook_name in response.text, "Webhook name not found in response" # Should show entries (the feed has entries) assert "total from" in response.text, "Expected to see entry count" + assert "Modify Webhook" in response.text, "Expected webhook settings to be visible" + assert "Attached feeds" in response.text, "Expected attached feeds section to be visible" + + +def test_webhook_entries_shows_attached_feed_link() -> None: + """The webhook detail view should list attached feeds linking to their feed pages.""" + client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) + response: Response = client.post( + url="/add_webhook", + data={"webhook_name": webhook_name, "webhook_url": webhook_url}, + ) + assert response.status_code == 200, f"Failed to add webhook: {response.text}" + + client.post(url="/remove", data={"feed_url": feed_url}) + response = client.post( + url="/add", + data={"feed_url": feed_url, "webhook_dropdown": webhook_name}, + ) + assert response.status_code == 200, f"Failed to add feed: {response.text}" + + response = client.get( + url="/webhook_entries", + params={"webhook_url": webhook_url}, + ) + + assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}" + assert f"/feed?feed_url={urllib.parse.quote(feed_url)}" in response.text, ( + "Expected attached feed to link to its feed detail page" + ) + assert "Latest entries" in response.text, "Expected latest entries heading on webhook detail view" + + client.post(url="/remove", data={"feed_url": feed_url}) def test_webhook_entries_multiple_feeds() -> None: @@ -716,6 +1041,75 @@ def test_webhook_entries_multiple_feeds() -> None: client.post(url="/remove", data={"feed_url": feed_url}) +def test_webhook_entries_sort_newest_and_non_null_published_first() -> None: + """Webhook entries should be sorted newest-first with published=None entries placed last.""" + + @dataclass(slots=True) + class DummyFeed: + url: str + title: str | None = None + updates_enabled: bool = True + last_exception: None = None + + @dataclass(slots=True) + class DummyEntry: + id: str + feed: DummyFeed + published: datetime | None + + dummy_feed = DummyFeed(url="https://example.com/feed.xml", title="Example Feed") + + # Intentionally unsorted input with two dated entries and two undated entries. + unsorted_entries: list[Entry] = [ + cast("Entry", DummyEntry(id="old", feed=dummy_feed, published=datetime(2024, 1, 1, tzinfo=UTC))), + cast("Entry", DummyEntry(id="none-1", feed=dummy_feed, published=None)), + cast("Entry", DummyEntry(id="new", feed=dummy_feed, published=datetime(2024, 2, 1, tzinfo=UTC))), + cast("Entry", DummyEntry(id="none-2", feed=dummy_feed, published=None)), + ] + + class StubReader: + def get_tag(self, resource: object, key: str, default: object = None) -> object: + if resource == () and key == "webhooks": + return [{"name": webhook_name, "url": webhook_url}] + if key == "webhook" and isinstance(resource, str): + return webhook_url + return default + + def get_feeds(self) -> list[DummyFeed]: + return [dummy_feed] + + def get_entries(self, **_kwargs: object) -> list[Entry]: + return unsorted_entries + + observed_order: list[str] = [] + + def capture_entries(*, reader: object, entries: list[Entry], current_feed_url: str = "") -> str: + del reader, current_feed_url + observed_order.extend(entry.id for entry in entries) + return "" + + app.dependency_overrides[get_reader_dependency] = StubReader + try: + with ( + patch( + "discord_rss_bot.main.get_data_from_hook_url", + return_value=main_module.WebhookInfo(custom_name=webhook_name, url=webhook_url), + ), + patch("discord_rss_bot.main.create_html_for_feed", side_effect=capture_entries), + ): + response: Response = client.get( + url="/webhook_entries", + params={"webhook_url": webhook_url}, + ) + + assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}" + assert observed_order == ["new", "old", "none-1", "none-2"], ( + "Expected newest published entries first and published=None entries last" + ) + finally: + app.dependency_overrides = {} + + def test_webhook_entries_pagination() -> None: """Test webhook_entries endpoint pagination functionality.""" # Clean up and create webhook @@ -783,3 +1177,445 @@ def test_webhook_entries_url_encoding() -> None: # Clean up client.post(url="/remove", data={"feed_url": feed_url}) + + +def test_dashboard_webhook_name_links_to_webhook_detail() -> None: + """Webhook names on the dashboard should open the webhook detail view.""" + client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) + response: Response = client.post( + url="/add_webhook", + data={"webhook_name": webhook_name, "webhook_url": webhook_url}, + ) + assert response.status_code == 200, f"Failed to add webhook: {response.text}" + + client.post(url="/remove", data={"feed_url": feed_url}) + response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name}) + assert response.status_code == 200, f"Failed to add feed: {response.text}" + + response = client.get(url="/") + assert response.status_code == 200, f"Failed to get /: {response.text}" + + expected_link = f"/webhook_entries?webhook_url={urllib.parse.quote(webhook_url)}" + assert expected_link in response.text, "Expected dashboard webhook link to point to the webhook detail view" + + client.post(url="/remove", data={"feed_url": feed_url}) + + +def test_modify_webhook_redirects_back_to_webhook_detail() -> None: + """Webhook updates from the detail view should redirect back to that view with the new URL.""" + original_webhook_url = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz" + new_webhook_url = "https://discord.com/api/webhooks/1234567890/updated-token" + + client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url}) + client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url}) + + +def test_modify_webhook_triggers_git_backup_commit() -> None: + """Modifying a webhook URL should record a state change for git backup.""" + original_webhook_url = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz" + new_webhook_url = "https://discord.com/api/webhooks/1234567890/updated-token" + + client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url}) + client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url}) + + response: Response = client.post( + url="/add_webhook", + data={"webhook_name": webhook_name, "webhook_url": original_webhook_url}, + ) + assert response.status_code == 200, f"Failed to add webhook: {response.text}" + + no_redirect_client = TestClient(app, follow_redirects=False) + with patch("discord_rss_bot.main.commit_state_change") as mock_commit_state_change: + response = no_redirect_client.post( + url="/modify_webhook", + data={ + "old_hook": original_webhook_url, + "new_hook": new_webhook_url, + "redirect_to": f"/webhook_entries?webhook_url={urllib.parse.quote(original_webhook_url)}", + }, + ) + + assert response.status_code == 303, f"Expected 303 redirect, got {response.status_code}: {response.text}" + assert mock_commit_state_change.call_count == 1, "Expected webhook modification to trigger git backup commit" + + client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url}) + + response = client.post( + url="/add_webhook", + data={"webhook_name": webhook_name, "webhook_url": original_webhook_url}, + ) + assert response.status_code == 200, f"Failed to add webhook: {response.text}" + + no_redirect_client = TestClient(app, follow_redirects=False) + response = no_redirect_client.post( + url="/modify_webhook", + data={ + "old_hook": original_webhook_url, + "new_hook": new_webhook_url, + "redirect_to": f"/webhook_entries?webhook_url={urllib.parse.quote(original_webhook_url)}", + }, + ) + + assert response.status_code == 303, f"Expected 303 redirect, got {response.status_code}: {response.text}" + assert response.headers["location"] == (f"/webhook_entries?webhook_url={urllib.parse.quote(new_webhook_url)}"), ( + f"Unexpected redirect location: {response.headers['location']}" + ) + + client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url}) + + +def test_webhook_entries_mass_update_preview_shows_old_and_new_urls() -> None: + """Preview should list old->new feed URLs for webhook bulk replacement.""" + + @dataclass(slots=True) + class DummyFeed: + url: str + title: str | None = None + updates_enabled: bool = True + last_exception: None = None + + class StubReader: + def __init__(self) -> None: + self._feeds: list[DummyFeed] = [ + DummyFeed(url="https://old.example.com/rss/a.xml", title="A"), + DummyFeed(url="https://old.example.com/rss/b.xml", title="B"), + DummyFeed(url="https://unchanged.example.com/rss/c.xml", title="C"), + ] + + def get_tag(self, resource: object, key: str, default: object = None) -> object: + if resource == () and key == "webhooks": + return [{"name": webhook_name, "url": webhook_url}] + if key == "webhook" and isinstance(resource, str): + if resource.startswith("https://old.example.com"): + return webhook_url + if resource.startswith("https://unchanged.example.com"): + return webhook_url + return default + + def get_feeds(self) -> list[DummyFeed]: + return self._feeds + + def get_entries(self, **_kwargs: object) -> list[Entry]: + return [] + + app.dependency_overrides[get_reader_dependency] = StubReader + try: + with ( + patch( + "discord_rss_bot.main.get_data_from_hook_url", + return_value=main_module.WebhookInfo(custom_name=webhook_name, url=webhook_url), + ), + patch( + "discord_rss_bot.main.resolve_final_feed_url", + side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None), + ), + ): + response: Response = client.get( + url="/webhook_entries", + params={ + "webhook_url": webhook_url, + "replace_from": "old.example.com", + "replace_to": "new.example.com", + "resolve_urls": "true", + }, + ) + + assert response.status_code == 200, f"Failed to get preview: {response.text}" + assert "Mass update feed URLs" in response.text + assert "old.example.com/rss/a.xml" in response.text + assert "new.example.com/rss/a.xml" in response.text + assert "Will update" in response.text + assert "Matched: 2" in response.text + assert "Will update: 2" in response.text + finally: + app.dependency_overrides = {} + + +def test_bulk_change_feed_urls_updates_matching_feeds() -> None: + """Mass updater should change all matching feed URLs for a webhook.""" + + @dataclass(slots=True) + class DummyFeed: + url: str + + class StubReader: + def __init__(self) -> None: + self._feeds = [ + DummyFeed(url="https://old.example.com/rss/a.xml"), + DummyFeed(url="https://old.example.com/rss/b.xml"), + DummyFeed(url="https://unchanged.example.com/rss/c.xml"), + ] + self.change_calls: list[tuple[str, str]] = [] + self.updated_feeds: list[str] = [] + + def get_tag(self, resource: object, key: str, default: object = None) -> object: + if resource == () and key == "webhooks": + return [{"name": webhook_name, "url": webhook_url}] + if key == "webhook" and isinstance(resource, str): + return webhook_url + return default + + def get_feeds(self) -> list[DummyFeed]: + return self._feeds + + def change_feed_url(self, old_url: str, new_url: str) -> None: + self.change_calls.append((old_url, new_url)) + + def update_feed(self, feed_url: str) -> None: + self.updated_feeds.append(feed_url) + + def get_entries(self, **_kwargs: object) -> list[Entry]: + return [] + + def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001 + return + + stub_reader = StubReader() + app.dependency_overrides[get_reader_dependency] = lambda: stub_reader + no_redirect_client = TestClient(app, follow_redirects=False) + + try: + with patch( + "discord_rss_bot.main.resolve_final_feed_url", + side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None), + ): + response: Response = no_redirect_client.post( + url="/bulk_change_feed_urls", + data={ + "webhook_url": webhook_url, + "replace_from": "old.example.com", + "replace_to": "new.example.com", + "resolve_urls": "true", + }, + ) + + assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}" + assert "Updated%202%20feed%20URL%28s%29" in response.headers.get("location", "") + assert sorted(stub_reader.change_calls) == sorted([ + ("https://old.example.com/rss/a.xml", "https://new.example.com/rss/a.xml"), + ("https://old.example.com/rss/b.xml", "https://new.example.com/rss/b.xml"), + ]) + assert sorted(stub_reader.updated_feeds) == sorted([ + "https://new.example.com/rss/a.xml", + "https://new.example.com/rss/b.xml", + ]) + finally: + app.dependency_overrides = {} + + +def test_webhook_entries_mass_update_preview_fragment_endpoint() -> None: + """HTMX preview endpoint should render only the mass-update preview fragment.""" + + @dataclass(slots=True) + class DummyFeed: + url: str + title: str | None = None + updates_enabled: bool = True + last_exception: None = None + + class StubReader: + def __init__(self) -> None: + self._feeds: list[DummyFeed] = [ + DummyFeed(url="https://old.example.com/rss/a.xml", title="A"), + DummyFeed(url="https://old.example.com/rss/b.xml", title="B"), + ] + + def get_tag(self, resource: object, key: str, default: object = None) -> object: + if key == "webhook" and isinstance(resource, str): + return webhook_url + return default + + def get_feeds(self) -> list[DummyFeed]: + return self._feeds + + app.dependency_overrides[get_reader_dependency] = StubReader + try: + with patch( + "discord_rss_bot.main.resolve_final_feed_url", + side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None), + ): + response: Response = client.get( + url="/webhook_entries_mass_update_preview", + params={ + "webhook_url": webhook_url, + "replace_from": "old.example.com", + "replace_to": "new.example.com", + "resolve_urls": "true", + }, + ) + + assert response.status_code == 200, f"Failed to get HTMX preview fragment: {response.text}" + assert "Will update: 2" in response.text + assert " None: # noqa: C901 + """Force update should overwrite conflicting target URLs instead of skipping them.""" + + @dataclass(slots=True) + class DummyFeed: + url: str + + class StubReader: + def __init__(self) -> None: + self._feeds = [ + DummyFeed(url="https://old.example.com/rss/a.xml"), + DummyFeed(url="https://new.example.com/rss/a.xml"), + ] + self.delete_calls: list[str] = [] + self.change_calls: list[tuple[str, str]] = [] + + def get_tag(self, resource: object, key: str, default: object = None) -> object: + if resource == () and key == "webhooks": + return [{"name": webhook_name, "url": webhook_url}] + if key == "webhook" and isinstance(resource, str): + return webhook_url + return default + + def get_feeds(self) -> list[DummyFeed]: + return self._feeds + + def delete_feed(self, feed_url: str) -> None: + self.delete_calls.append(feed_url) + + def change_feed_url(self, old_url: str, new_url: str) -> None: + self.change_calls.append((old_url, new_url)) + + def update_feed(self, _feed_url: str) -> None: + return + + def get_entries(self, **_kwargs: object) -> list[Entry]: + return [] + + def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001 + return + + stub_reader = StubReader() + app.dependency_overrides[get_reader_dependency] = lambda: stub_reader + no_redirect_client = TestClient(app, follow_redirects=False) + + try: + with patch( + "discord_rss_bot.main.resolve_final_feed_url", + side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None), + ): + response: Response = no_redirect_client.post( + url="/bulk_change_feed_urls", + data={ + "webhook_url": webhook_url, + "replace_from": "old.example.com", + "replace_to": "new.example.com", + "resolve_urls": "true", + "force_update": "true", + }, + ) + + assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}" + assert stub_reader.delete_calls == ["https://new.example.com/rss/a.xml"] + assert stub_reader.change_calls == [ + ( + "https://old.example.com/rss/a.xml", + "https://new.example.com/rss/a.xml", + ), + ] + assert "Force%20overwrote%201" in response.headers.get("location", "") + finally: + app.dependency_overrides = {} + + +def test_bulk_change_feed_urls_force_update_ignores_resolution_error() -> None: + """Force update should proceed even when URL resolution returns an error (e.g. HTTP 404).""" + + @dataclass(slots=True) + class DummyFeed: + url: str + + class StubReader: + def __init__(self) -> None: + self._feeds = [ + DummyFeed(url="https://old.example.com/rss/a.xml"), + ] + self.change_calls: list[tuple[str, str]] = [] + + def get_tag(self, resource: object, key: str, default: object = None) -> object: + if resource == () and key == "webhooks": + return [{"name": webhook_name, "url": webhook_url}] + if key == "webhook" and isinstance(resource, str): + return webhook_url + return default + + def get_feeds(self) -> list[DummyFeed]: + return self._feeds + + def change_feed_url(self, old_url: str, new_url: str) -> None: + self.change_calls.append((old_url, new_url)) + + def update_feed(self, _feed_url: str) -> None: + return + + def get_entries(self, **_kwargs: object) -> list[Entry]: + return [] + + def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001 + return + + stub_reader = StubReader() + app.dependency_overrides[get_reader_dependency] = lambda: stub_reader + no_redirect_client = TestClient(app, follow_redirects=False) + + try: + with patch( + "discord_rss_bot.main.resolve_final_feed_url", + return_value=("https://new.example.com/rss/a.xml", "HTTP 404"), + ): + response: Response = no_redirect_client.post( + url="/bulk_change_feed_urls", + data={ + "webhook_url": webhook_url, + "replace_from": "old.example.com", + "replace_to": "new.example.com", + "resolve_urls": "true", + "force_update": "true", + }, + ) + + assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}" + assert stub_reader.change_calls == [ + ( + "https://old.example.com/rss/a.xml", + "https://new.example.com/rss/a.xml", + ), + ] + location = response.headers.get("location", "") + assert "Updated%201%20feed%20URL%28s%29" in location + assert "Failed%200" in location + finally: + app.dependency_overrides = {} + + +def test_reader_dependency_override_is_used() -> None: + """Reader should be injectable and overridable via FastAPI dependency overrides.""" + + class StubReader: + def get_tag(self, _resource: str, _key: str, default: str | None = None) -> str | None: + """Stub get_tag that always returns the default value. + + Args: + _resource: Ignored. + _key: Ignored. + default: The value to return. + + Returns: + The default value, simulating a missing tag. + """ + return default + + app.dependency_overrides[get_reader_dependency] = StubReader + try: + response: Response = client.get(url="/add") + assert response.status_code == 200, f"Expected /add to render with overridden reader: {response.text}" + finally: + app.dependency_overrides = {} diff --git a/tests/test_search.py b/tests/test_search.py index ada1464..77681cf 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -46,7 +46,7 @@ def test_create_search_context() -> None: reader.update_search() # Create the search context. - context: dict = create_search_context("test", custom_reader=reader) + context: dict = create_search_context("test", reader=reader) assert context is not None, f"The context should not be None. Got: {context}" # Close the reader, so we can delete the directory. diff --git a/tests/test_settings.py b/tests/test_settings.py index 5a54094..bcab720 100644 --- a/tests/test_settings.py +++ b/tests/test_settings.py @@ -22,12 +22,12 @@ def test_reader() -> None: Path.mkdir(Path(temp_dir), exist_ok=True) custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite") - custom_reader: Reader = get_reader(custom_location=str(custom_loc)) - assert_msg = f"The custom reader should be an instance of Reader. But it was '{type(custom_reader)}'." - assert isinstance(custom_reader, Reader), assert_msg + reader: Reader = get_reader(custom_location=str(custom_loc)) + assert_msg = f"The custom reader should be an instance of Reader. But it was '{type(reader)}'." + assert isinstance(reader, Reader), assert_msg # Close the reader, so we can delete the directory. - custom_reader.close() + reader.close() def test_data_dir() -> None: @@ -49,16 +49,16 @@ def test_get_webhook_for_entry() -> None: Path.mkdir(Path(temp_dir), exist_ok=True) custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite") - custom_reader: Reader = get_reader(custom_location=str(custom_loc)) + reader: Reader = get_reader(custom_location=str(custom_loc)) # Add a feed to the database. - custom_reader.add_feed("https://www.reddit.com/r/movies.rss") - custom_reader.update_feed("https://www.reddit.com/r/movies.rss") + reader.add_feed("https://www.reddit.com/r/movies.rss") + reader.update_feed("https://www.reddit.com/r/movies.rss") # Add a webhook to the database. - custom_reader.set_tag("https://www.reddit.com/r/movies.rss", "webhook", "https://example.com") # pyright: ignore[reportArgumentType] - our_tag = custom_reader.get_tag("https://www.reddit.com/r/movies.rss", "webhook") # pyright: ignore[reportArgumentType] + reader.set_tag("https://www.reddit.com/r/movies.rss", "webhook", "https://example.com") # pyright: ignore[reportArgumentType] + our_tag = reader.get_tag("https://www.reddit.com/r/movies.rss", "webhook") # pyright: ignore[reportArgumentType] assert our_tag == "https://example.com", f"The tag should be 'https://example.com'. But it was '{our_tag}'." # Close the reader, so we can delete the directory. - custom_reader.close() + reader.close() diff --git a/tests/test_whitelist.py b/tests/test_whitelist.py index 462d652..6e911fe 100644 --- a/tests/test_whitelist.py +++ b/tests/test_whitelist.py @@ -37,7 +37,7 @@ def test_has_white_tags() -> None: reader.update_feeds() # Test feed without any whitelist tags - assert has_white_tags(custom_reader=get_reader(), feed=feed) is False, "Feed should not have any whitelist tags" + assert has_white_tags(reader=get_reader(), feed=feed) is False, "Feed should not have any whitelist tags" check_if_has_tag(reader, feed, "whitelist_title") check_if_has_tag(reader, feed, "whitelist_summary") @@ -56,9 +56,9 @@ def test_has_white_tags() -> None: def check_if_has_tag(reader: Reader, feed: Feed, whitelist_name: str) -> None: reader.set_tag(feed, whitelist_name, "a") # pyright: ignore[reportArgumentType] - assert has_white_tags(custom_reader=reader, feed=feed) is True, "Feed should have whitelist tags" + assert has_white_tags(reader=reader, feed=feed) is True, "Feed should have whitelist tags" reader.delete_tag(feed, whitelist_name) - assert has_white_tags(custom_reader=reader, feed=feed) is False, "Feed should not have any whitelist tags" + assert has_white_tags(reader=reader, feed=feed) is False, "Feed should not have any whitelist tags" def test_should_be_sent() -> None: