diff --git a/discord_rss_bot/custom_filters.py b/discord_rss_bot/custom_filters.py index fd9461c..7d8fe83 100644 --- a/discord_rss_bot/custom_filters.py +++ b/discord_rss_bot/custom_filters.py @@ -8,11 +8,15 @@ from discord_rss_bot.filter.blacklist import entry_should_be_skipped from discord_rss_bot.filter.blacklist import feed_has_blacklist_tags from discord_rss_bot.filter.whitelist import has_white_tags from discord_rss_bot.filter.whitelist import should_be_sent +from discord_rss_bot.settings import get_reader if TYPE_CHECKING: from reader import Entry from reader import Reader +# Our reader +reader: Reader = get_reader() + @lru_cache def encode_url(url_to_quote: str) -> str: @@ -30,12 +34,11 @@ def encode_url(url_to_quote: str) -> str: return urllib.parse.quote(string=url_to_quote) if url_to_quote else "" -def entry_is_whitelisted(entry_to_check: Entry, reader: Reader) -> bool: +def entry_is_whitelisted(entry_to_check: Entry) -> bool: """Check if the entry is whitelisted. Args: entry_to_check: The feed to check. - reader: Custom Reader instance. Returns: bool: True if the feed is whitelisted, False otherwise. @@ -44,12 +47,11 @@ def entry_is_whitelisted(entry_to_check: Entry, reader: Reader) -> bool: return bool(has_white_tags(reader, entry_to_check.feed) and should_be_sent(reader, entry_to_check)) -def entry_is_blacklisted(entry_to_check: Entry, reader: Reader) -> bool: +def entry_is_blacklisted(entry_to_check: Entry) -> bool: """Check if the entry is blacklisted. Args: entry_to_check: The feed to check. - reader: Custom Reader instance. Returns: bool: True if the feed is blacklisted, False otherwise. diff --git a/discord_rss_bot/custom_message.py b/discord_rss_bot/custom_message.py index 1626e39..b84b30f 100644 --- a/discord_rss_bot/custom_message.py +++ b/discord_rss_bot/custom_message.py @@ -5,18 +5,17 @@ import json import logging import re from dataclasses import dataclass -from typing import TYPE_CHECKING from bs4 import BeautifulSoup from bs4 import Tag from markdownify import markdownify +from reader import Entry +from reader import Feed +from reader import Reader +from reader import TagNotFoundError from discord_rss_bot.is_url_valid import is_url_valid - -if TYPE_CHECKING: - from reader import Entry - from reader import Feed - from reader import Reader +from discord_rss_bot.settings import get_reader logger: logging.Logger = logging.getLogger(__name__) @@ -117,18 +116,18 @@ def format_entry_html_for_discord(text: str) -> str: return _restore_discord_timestamp_tags(formatted_text, replacements) -def replace_tags_in_text_message(entry: Entry, reader: Reader) -> str: +def replace_tags_in_text_message(entry: Entry) -> str: """Replace tags in custom_message. Args: entry: The entry to get the tags from. - reader: Custom Reader instance. Returns: Returns the custom_message with the tags replaced. """ feed: Feed = entry.feed - custom_message: str = get_custom_message(feed=feed, reader=reader) + custom_reader: Reader = get_reader() + custom_message: str = get_custom_message(feed=feed, custom_reader=custom_reader) content = "" if entry.content: @@ -230,18 +229,18 @@ def get_first_image(summary: str | None, content: str | None) -> str: return "" -def replace_tags_in_embed(feed: Feed, entry: Entry, reader: Reader) -> CustomEmbed: +def replace_tags_in_embed(feed: Feed, entry: Entry) -> CustomEmbed: """Replace tags in embed. Args: feed: The feed to get the tags from. entry: The entry to get the tags from. - reader: Custom Reader instance. Returns: Returns the embed with the tags replaced. """ - embed: CustomEmbed = get_embed(feed=feed, reader=reader) + custom_reader: Reader = get_reader() + embed: CustomEmbed = get_embed(feed=feed, custom_reader=custom_reader) content = "" if entry.content: @@ -332,29 +331,31 @@ def _replace_embed_tags(embed: CustomEmbed, template: str, replace_with: str) -> embed.footer_icon_url = try_to_replace(embed.footer_icon_url, template, replace_with) -def get_custom_message(reader: Reader, feed: Feed) -> str: +def get_custom_message(custom_reader: Reader, feed: Feed) -> str: """Get custom_message tag from feed. Args: - reader: What Reader to use. + custom_reader: What Reader to use. feed: The feed to get the tag from. Returns: Returns the contents from the custom_message tag. """ try: - custom_message: str = str(reader.get_tag(feed, "custom_message", "")) + custom_message: str = str(custom_reader.get_tag(feed, "custom_message")) + except TagNotFoundError: + custom_message = "" except ValueError: custom_message = "" return custom_message -def save_embed(reader: Reader, feed: Feed, embed: CustomEmbed) -> None: +def save_embed(custom_reader: Reader, feed: Feed, embed: CustomEmbed) -> None: """Set embed tag in feed. Args: - reader: What Reader to use. + custom_reader: What Reader to use. feed: The feed to set the tag in. embed: The embed to set. """ @@ -370,20 +371,20 @@ def save_embed(reader: Reader, feed: Feed, embed: CustomEmbed) -> None: "footer_text": embed.footer_text, "footer_icon_url": embed.footer_icon_url, } - reader.set_tag(feed, "embed", json.dumps(embed_dict)) # pyright: ignore[reportArgumentType] + custom_reader.set_tag(feed, "embed", json.dumps(embed_dict)) # pyright: ignore[reportArgumentType] -def get_embed(reader: Reader, feed: Feed) -> CustomEmbed: +def get_embed(custom_reader: Reader, feed: Feed) -> CustomEmbed: """Get embed tag from feed. Args: - reader: What Reader to use. + custom_reader: What Reader to use. feed: The feed to get the tag from. Returns: Returns the contents from the embed tag. """ - embed = reader.get_tag(feed, "embed", "") + embed = custom_reader.get_tag(feed, "embed", "") if embed: if not isinstance(embed, str): diff --git a/discord_rss_bot/feeds.py b/discord_rss_bot/feeds.py index 5510516..8e6ca63 100644 --- a/discord_rss_bot/feeds.py +++ b/discord_rss_bot/feeds.py @@ -23,6 +23,7 @@ from reader import FeedNotFoundError from reader import Reader from reader import ReaderError from reader import StorageError +from reader import TagNotFoundError from discord_rss_bot.custom_message import CustomEmbed from discord_rss_bot.custom_message import get_custom_message @@ -36,6 +37,7 @@ from discord_rss_bot.hoyolab_api import extract_post_id_from_hoyolab_url from discord_rss_bot.hoyolab_api import fetch_hoyolab_post from discord_rss_bot.hoyolab_api import is_c3kay_feed from discord_rss_bot.is_url_valid import is_url_valid +from discord_rss_bot.missing_tags import add_missing_tags from discord_rss_bot.settings import default_custom_message from discord_rss_bot.settings import get_reader @@ -96,23 +98,26 @@ def extract_domain(url: str) -> str: # noqa: PLR0911 return "Other" -def send_entry_to_discord(entry: Entry, reader: Reader) -> str | None: # noqa: C901 +def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> str | None: # noqa: C901, PLR0912 """Send a single entry to Discord. Args: entry: The entry to send to Discord. - reader: The reader to use. + custom_reader: The reader to use. If None, the default reader will be used. Returns: str | None: The error message if there was an error, otherwise None. """ + # Get the default reader if we didn't get a custom one. + reader: Reader = get_reader() if custom_reader is None else custom_reader + # Get the webhook URL for the entry. webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook", "")) if not webhook_url: return "No webhook URL found." # If https://discord.com/quests/ is in the URL, send a separate message with the URL. - send_discord_quest_notification(entry, webhook_url, reader=reader) + send_discord_quest_notification(entry, webhook_url) # Check if this is a c3kay feed if is_c3kay_feed(entry.feed.url): @@ -123,7 +128,7 @@ def send_entry_to_discord(entry: Entry, reader: Reader) -> str | None: # noqa: post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id) if post_data: webhook = create_hoyolab_webhook(webhook_url, entry, post_data) - execute_webhook(webhook, entry, reader=reader) + execute_webhook(webhook, entry) return None logger.warning( "Failed to create Hoyolab webhook for feed %s, falling back to regular processing", @@ -137,14 +142,17 @@ def send_entry_to_discord(entry: Entry, reader: Reader) -> str | None: # noqa: # Try to get the custom message for the feed. If the user has none, we will use the default message. # This has to be a string for some reason so don't change it to "not custom_message.get_custom_message()" if get_custom_message(reader, entry.feed) != "": # noqa: PLC1901 - webhook_message: str = replace_tags_in_text_message(entry=entry, reader=reader) + webhook_message: str = replace_tags_in_text_message(entry=entry) if not webhook_message: webhook_message = "No message found." # Create the webhook. try: - should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed", True)) + should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed")) + except TagNotFoundError: + logger.exception("No should_send_embed tag found for feed: %s", entry.feed.url) + should_send_embed = True except StorageError: logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url) should_send_embed = True @@ -154,15 +162,15 @@ def send_entry_to_discord(entry: Entry, reader: Reader) -> str | None: # noqa: should_send_embed = False if should_send_embed: - webhook = create_embed_webhook(webhook_url, entry, reader=reader) + webhook = create_embed_webhook(webhook_url, entry) else: webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True) - execute_webhook(webhook, entry, reader=reader) + execute_webhook(webhook, entry) return None -def send_discord_quest_notification(entry: Entry, webhook_url: str, reader: Reader) -> None: +def send_discord_quest_notification(entry: Entry, webhook_url: str) -> None: """Send a separate message to Discord if the entry is a quest notification.""" quest_regex: re.Pattern[str] = re.compile(r"https://discord\.com/quests/\d+") @@ -174,7 +182,7 @@ def send_discord_quest_notification(entry: Entry, webhook_url: str, reader: Read content=quest_url, rate_limit_retry=True, ) - execute_webhook(webhook, entry, reader=reader) + execute_webhook(webhook, entry) # Iterate through the content of the entry for content in entry.content: @@ -232,17 +240,12 @@ def set_title(custom_embed: CustomEmbed, discord_embed: DiscordEmbed) -> None: discord_embed.set_title(embed_title) if embed_title else None -def create_embed_webhook( # noqa: C901 - webhook_url: str, - entry: Entry, - reader: Reader, -) -> DiscordWebhook: +def create_embed_webhook(webhook_url: str, entry: Entry) -> DiscordWebhook: # noqa: C901 """Create a webhook with an embed. Args: webhook_url (str): The webhook URL. entry (Entry): The entry to send to Discord. - reader (Reader): The Reader instance to use for getting embed data. Returns: DiscordWebhook: The webhook with the embed. @@ -251,7 +254,7 @@ def create_embed_webhook( # noqa: C901 feed: Feed = entry.feed # Get the embed data from the database. - custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry, reader=reader) + custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry) discord_embed: DiscordEmbed = DiscordEmbed() @@ -313,14 +316,13 @@ def get_webhook_url(reader: Reader, entry: Entry) -> str: str: The webhook URL. """ try: - webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook", "")) + webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook")) + except TagNotFoundError: + logger.exception("No webhook URL found for feed: %s", entry.feed.url) + return "" except StorageError: logger.exception("Storage error getting webhook URL for feed: %s", entry.feed.url) return "" - - if not webhook_url: - logger.error("No webhook URL found for feed: %s", entry.feed.url) - return "" return webhook_url @@ -339,53 +341,53 @@ def set_entry_as_read(reader: Reader, entry: Entry) -> None: logger.exception("Error setting entry to read: %s", entry.id) -def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: # noqa: C901, PLR0912 +def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: # noqa: C901, PLR0912 """Send entries to Discord. If response was not ok, we will log the error and mark the entry as unread, so it will be sent again next time. Args: - reader: If we should use a custom reader instead of the default one. + custom_reader: If we should use a custom reader instead of the default one. feed: The feed to send to Discord. do_once: If we should only send one entry. This is used in the test. """ logger.info("Starting to send entries to Discord.") # Get the default reader if we didn't get a custom one. - effective_reader: Reader = get_reader() if reader is None else reader + reader: Reader = get_reader() if custom_reader is None else custom_reader # Check for new entries for every feed. - effective_reader.update_feeds( + reader.update_feeds( scheduled=True, workers=os.cpu_count() or 1, ) # Loop through the unread entries. - entries: Iterable[Entry] = effective_reader.get_entries(feed=feed, read=False) + entries: Iterable[Entry] = reader.get_entries(feed=feed, read=False) for entry in entries: - set_entry_as_read(effective_reader, entry) + set_entry_as_read(reader, entry) if entry.added < datetime.datetime.now(tz=entry.added.tzinfo) - datetime.timedelta(days=1): logger.info("Entry is older than 24 hours: %s from %s", entry.id, entry.feed.url) continue - webhook_url: str = get_webhook_url(effective_reader, entry) + webhook_url: str = get_webhook_url(reader, entry) if not webhook_url: logger.info("No webhook URL found for feed: %s", entry.feed.url) continue - should_send_embed: bool = should_send_embed_check(effective_reader, entry) + should_send_embed: bool = should_send_embed_check(reader, entry) # Youtube feeds only need to send the link if is_youtube_feed(entry.feed.url): should_send_embed = False if should_send_embed: - webhook = create_embed_webhook(webhook_url, entry, reader=effective_reader) + webhook = create_embed_webhook(webhook_url, entry) else: # If the user has set the custom message to an empty string, we will use the default message, otherwise we # will use the custom message. - if get_custom_message(effective_reader, entry.feed) != "": # noqa: PLC1901 - webhook_message = replace_tags_in_text_message(entry, reader=effective_reader) + if get_custom_message(reader, entry.feed) != "": # noqa: PLC1901 + webhook_message = replace_tags_in_text_message(entry) else: webhook_message: str = str(default_custom_message) @@ -395,12 +397,12 @@ def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, d webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True) # Check if the entry is blacklisted, and if it is, we will skip it. - if entry_should_be_skipped(effective_reader, entry): + if entry_should_be_skipped(reader, entry): logger.info("Entry was blacklisted: %s", entry.id) continue # Check if the feed has a whitelist, and if it does, check if the entry is whitelisted. - if has_white_tags(effective_reader, entry.feed) and not should_be_sent(effective_reader, entry): + if has_white_tags(reader, entry.feed) and not should_be_sent(reader, entry): logger.info("Entry was not whitelisted: %s", entry.id) continue @@ -413,7 +415,7 @@ def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, d post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id) if post_data: webhook = create_hoyolab_webhook(webhook_url, entry, post_data) - execute_webhook(webhook, entry, reader=effective_reader) + execute_webhook(webhook, entry) return logger.warning( "Failed to create Hoyolab webhook for feed %s, falling back to regular processing", @@ -423,7 +425,7 @@ def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, d logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url) # Send the entry to Discord as it is not blacklisted or feed has a whitelist. - execute_webhook(webhook, entry, reader=effective_reader) + execute_webhook(webhook, entry) # If we only want to send one entry, we will break the loop. This is used when testing this function. if do_once: @@ -431,15 +433,16 @@ def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, d break -def execute_webhook(webhook: DiscordWebhook, entry: Entry, reader: Reader) -> None: +def execute_webhook(webhook: DiscordWebhook, entry: Entry) -> None: """Execute the webhook. Args: webhook (DiscordWebhook): The webhook to execute. entry (Entry): The entry to send to Discord. - reader (Reader): The Reader instance to use for checking feed status. """ + reader: Reader = get_reader() + # If the feed has been paused or deleted, we will not send the entry to Discord. entry_feed: Feed = entry.feed if entry_feed.updates_enabled is False: @@ -490,7 +493,10 @@ def should_send_embed_check(reader: Reader, entry: Entry) -> bool: return False try: - should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed", True)) + should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed")) + except TagNotFoundError: + logger.exception("No should_send_embed tag found for feed: %s", entry.feed.url) + should_send_embed = True except ReaderError: logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url) should_send_embed = True @@ -545,7 +551,9 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None: reader.add_feed(clean_feed_url) except FeedExistsError: # Add the webhook to an already added feed if it doesn't have a webhook instead of trying to create a new. - if not reader.get_tag(clean_feed_url, "webhook", ""): + try: + reader.get_tag(clean_feed_url, "webhook") + except TagNotFoundError: reader.set_tag(clean_feed_url, "webhook", webhook_url) # pyright: ignore[reportArgumentType] except ReaderError as e: raise HTTPException(status_code=404, detail=f"Error adding feed: {e}") from e @@ -572,3 +580,5 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None: # Update the full-text search index so our new feed is searchable. reader.update_search() + + add_missing_tags(reader) diff --git a/discord_rss_bot/filter/blacklist.py b/discord_rss_bot/filter/blacklist.py index 8260993..95c0716 100644 --- a/discord_rss_bot/filter/blacklist.py +++ b/discord_rss_bot/filter/blacklist.py @@ -11,7 +11,7 @@ if TYPE_CHECKING: from reader import Reader -def feed_has_blacklist_tags(reader: Reader, feed: Feed) -> bool: +def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool: """Return True if the feed has blacklist tags. The following tags are checked: @@ -25,21 +25,21 @@ def feed_has_blacklist_tags(reader: Reader, feed: Feed) -> bool: - regex_blacklist_title Args: - reader: The reader. + custom_reader: The reader. feed: The feed to check. Returns: bool: If the feed has any of the tags. """ - blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", "")).strip() - blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", "")).strip() - blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", "")).strip() - blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", "")).strip() + blacklist_author: str = str(custom_reader.get_tag(feed, "blacklist_author", "")).strip() + blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", "")).strip() + blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", "")).strip() + blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", "")).strip() - regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", "")).strip() - regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", "")).strip() - regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", "")).strip() - regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", "")).strip() + regex_blacklist_author: str = str(custom_reader.get_tag(feed, "regex_blacklist_author", "")).strip() + regex_blacklist_content: str = str(custom_reader.get_tag(feed, "regex_blacklist_content", "")).strip() + regex_blacklist_summary: str = str(custom_reader.get_tag(feed, "regex_blacklist_summary", "")).strip() + regex_blacklist_title: str = str(custom_reader.get_tag(feed, "regex_blacklist_title", "")).strip() return bool( blacklist_title @@ -53,11 +53,11 @@ def feed_has_blacklist_tags(reader: Reader, feed: Feed) -> bool: ) -def entry_should_be_skipped(reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 +def entry_should_be_skipped(custom_reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 """Return True if the entry is in the blacklist. Args: - reader: The reader. + custom_reader: The reader. entry: The entry to check. Returns: @@ -65,15 +65,15 @@ def entry_should_be_skipped(reader: Reader, entry: Entry) -> bool: # noqa: PLR0 """ feed = entry.feed - blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", "")).strip() - blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", "")).strip() - blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", "")).strip() - blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", "")).strip() + blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", "")).strip() + blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", "")).strip() + blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", "")).strip() + blacklist_author: str = str(custom_reader.get_tag(feed, "blacklist_author", "")).strip() - regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", "")).strip() - regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", "")).strip() - regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", "")).strip() - regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", "")).strip() + regex_blacklist_title: str = str(custom_reader.get_tag(feed, "regex_blacklist_title", "")).strip() + regex_blacklist_summary: str = str(custom_reader.get_tag(feed, "regex_blacklist_summary", "")).strip() + regex_blacklist_content: str = str(custom_reader.get_tag(feed, "regex_blacklist_content", "")).strip() + regex_blacklist_author: str = str(custom_reader.get_tag(feed, "regex_blacklist_author", "")).strip() # TODO(TheLovinator): Also add support for entry_text and more. # Check regular blacklist diff --git a/discord_rss_bot/filter/whitelist.py b/discord_rss_bot/filter/whitelist.py index bb5303d..9c198c4 100644 --- a/discord_rss_bot/filter/whitelist.py +++ b/discord_rss_bot/filter/whitelist.py @@ -11,7 +11,7 @@ if TYPE_CHECKING: from reader import Reader -def has_white_tags(reader: Reader, feed: Feed) -> bool: +def has_white_tags(custom_reader: Reader, feed: Feed) -> bool: """Return True if the feed has whitelist tags. The following tags are checked: @@ -25,21 +25,21 @@ def has_white_tags(reader: Reader, feed: Feed) -> bool: - whitelist_title Args: - reader: The reader. + custom_reader: The reader. feed: The feed to check. Returns: bool: If the feed has any of the tags. """ - whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", "")).strip() - whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", "")).strip() - whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", "")).strip() - whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", "")).strip() + whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")).strip() + whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")).strip() + whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")).strip() + whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", "")).strip() - regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", "")).strip() - regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", "")).strip() - regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", "")).strip() - regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", "")).strip() + regex_whitelist_title: str = str(custom_reader.get_tag(feed, "regex_whitelist_title", "")).strip() + regex_whitelist_summary: str = str(custom_reader.get_tag(feed, "regex_whitelist_summary", "")).strip() + regex_whitelist_content: str = str(custom_reader.get_tag(feed, "regex_whitelist_content", "")).strip() + regex_whitelist_author: str = str(custom_reader.get_tag(feed, "regex_whitelist_author", "")).strip() return bool( whitelist_title @@ -53,11 +53,11 @@ def has_white_tags(reader: Reader, feed: Feed) -> bool: ) -def should_be_sent(reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 +def should_be_sent(custom_reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 """Return True if the entry is in the whitelist. Args: - reader: The reader. + custom_reader: The reader. entry: The entry to check. Returns: @@ -65,16 +65,16 @@ def should_be_sent(reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 """ feed: Feed = entry.feed # Regular whitelist tags - whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", "")).strip() - whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", "")).strip() - whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", "")).strip() - whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", "")).strip() + whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")).strip() + whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")).strip() + whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")).strip() + whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", "")).strip() # Regex whitelist tags - regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", "")).strip() - regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", "")).strip() - regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", "")).strip() - regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", "")).strip() + regex_whitelist_title: str = str(custom_reader.get_tag(feed, "regex_whitelist_title", "")).strip() + regex_whitelist_summary: str = str(custom_reader.get_tag(feed, "regex_whitelist_summary", "")).strip() + regex_whitelist_content: str = str(custom_reader.get_tag(feed, "regex_whitelist_content", "")).strip() + regex_whitelist_author: str = str(custom_reader.get_tag(feed, "regex_whitelist_author", "")).strip() # Check regular whitelist if entry.title and whitelist_title and is_word_in_text(whitelist_title, entry.title): diff --git a/discord_rss_bot/git_backup.py b/discord_rss_bot/git_backup.py index 49528ec..b226489 100644 --- a/discord_rss_bot/git_backup.py +++ b/discord_rss_bot/git_backup.py @@ -30,6 +30,8 @@ from pathlib import Path from typing import TYPE_CHECKING from typing import Any +from reader import TagNotFoundError + if TYPE_CHECKING: from reader import Reader @@ -174,15 +176,21 @@ def export_state(reader: Reader, backup_path: Path) -> None: logger.exception("Failed to read tag '%s' for feed '%s' during state export", tag, feed.url) feeds_state.append(feed_data) - webhooks: list[str | int | float | bool | dict[str, Any] | list[Any] | None] = list( - reader.get_tag((), "webhooks", []), - ) + try: + webhooks: list[str | int | float | bool | dict[str, Any] | list[Any] | None] = list( + reader.get_tag((), "webhooks", []), + ) + except TagNotFoundError: + webhooks = [] # Export global update interval if set global_update_interval: dict[str, Any] | None = None - global_update_config = reader.get_tag((), ".reader.update", None) - if isinstance(global_update_config, dict): - global_update_interval = global_update_config + try: + global_update_config = reader.get_tag((), ".reader.update", None) + if isinstance(global_update_config, dict): + global_update_interval = global_update_config + except TagNotFoundError: + pass state: dict = {"feeds": feeds_state, "webhooks": webhooks} if global_update_interval is not None: diff --git a/discord_rss_bot/main.py b/discord_rss_bot/main.py index ca3894c..7c5e7ac 100644 --- a/discord_rss_bot/main.py +++ b/discord_rss_bot/main.py @@ -19,7 +19,6 @@ import httpx import sentry_sdk import uvicorn from apscheduler.schedulers.asyncio import AsyncIOScheduler -from fastapi import Depends from fastapi import FastAPI from fastapi import Form from fastapi import HTTPException @@ -54,7 +53,7 @@ from discord_rss_bot.feeds import send_entry_to_discord from discord_rss_bot.feeds import send_to_discord from discord_rss_bot.git_backup import commit_state_change from discord_rss_bot.git_backup import get_backup_path -from discord_rss_bot.is_url_valid import is_url_valid +from discord_rss_bot.missing_tags import add_missing_tags from discord_rss_bot.search import create_search_context from discord_rss_bot.settings import get_reader @@ -101,16 +100,7 @@ LOGGING_CONFIG: dict[str, Any] = { logging.config.dictConfig(LOGGING_CONFIG) logger: logging.Logger = logging.getLogger(__name__) - - -def get_reader_dependency() -> Reader: - """Provide the app Reader instance as a FastAPI dependency. - - Returns: - Reader: The shared Reader instance. - """ - return get_reader() - +reader: Reader = get_reader() # Time constants for relative time formatting SECONDS_PER_MINUTE = 60 @@ -156,7 +146,7 @@ def relative_time(dt: datetime | None) -> str: @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None]: """Lifespan function for the FastAPI app.""" - reader: Reader = get_reader() + add_missing_tags(reader) scheduler: AsyncIOScheduler = AsyncIOScheduler(timezone=UTC) scheduler.add_job( func=send_to_discord, @@ -180,6 +170,8 @@ templates: Jinja2Templates = Jinja2Templates(directory="discord_rss_bot/template # Add the filters to the Jinja2 environment so they can be used in html templates. templates.env.filters["encode_url"] = lambda url: urllib.parse.quote(url) if url else "" +templates.env.filters["entry_is_whitelisted"] = entry_is_whitelisted +templates.env.filters["entry_is_blacklisted"] = entry_is_blacklisted templates.env.filters["discord_markdown"] = markdownify templates.env.filters["relative_time"] = relative_time templates.env.globals["get_backup_path"] = get_backup_path @@ -189,14 +181,12 @@ templates.env.globals["get_backup_path"] = get_backup_path async def post_add_webhook( webhook_name: Annotated[str, Form()], webhook_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], ) -> RedirectResponse: """Add a feed to the database. Args: webhook_name: The name of the webhook. webhook_url: The url of the webhook. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the index page. @@ -229,15 +219,11 @@ async def post_add_webhook( @app.post("/delete_webhook") -async def post_delete_webhook( - webhook_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: +async def post_delete_webhook(webhook_url: Annotated[str, Form()]) -> RedirectResponse: """Delete a webhook from the database. Args: webhook_url: The url of the webhook. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the index page. @@ -280,14 +266,12 @@ async def post_delete_webhook( async def post_create_feed( feed_url: Annotated[str, Form()], webhook_dropdown: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], ) -> RedirectResponse: """Add a feed to the database. Args: feed_url: The feed to add. webhook_dropdown: The webhook to use. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -299,15 +283,11 @@ async def post_create_feed( @app.post("/pause") -async def post_pause_feed( - feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: +async def post_pause_feed(feed_url: Annotated[str, Form()]) -> RedirectResponse: """Pause a feed. Args: feed_url: The feed to pause. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -318,15 +298,11 @@ async def post_pause_feed( @app.post("/unpause") -async def post_unpause_feed( - feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: +async def post_unpause_feed(feed_url: Annotated[str, Form()]) -> RedirectResponse: """Unpause a feed. Args: feed_url: The Feed to unpause. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -338,7 +314,6 @@ async def post_unpause_feed( @app.post("/whitelist") async def post_set_whitelist( - reader: Annotated[Reader, Depends(get_reader_dependency)], whitelist_title: Annotated[str, Form()] = "", whitelist_summary: Annotated[str, Form()] = "", whitelist_content: Annotated[str, Form()] = "", @@ -361,7 +336,6 @@ async def post_set_whitelist( regex_whitelist_content: Whitelisted regex for when checking the content. regex_whitelist_author: Whitelisted regex for when checking the author. feed_url: The feed we should set the whitelist for. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -382,17 +356,12 @@ async def post_set_whitelist( @app.get("/whitelist", response_class=HTMLResponse) -async def get_whitelist( - feed_url: str, - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def get_whitelist(feed_url: str, request: Request): """Get the whitelist. Args: feed_url: What feed we should get the whitelist for. request: The request object. - reader: The Reader instance. Returns: HTMLResponse: The whitelist page. @@ -426,7 +395,6 @@ async def get_whitelist( @app.post("/blacklist") async def post_set_blacklist( - reader: Annotated[Reader, Depends(get_reader_dependency)], blacklist_title: Annotated[str, Form()] = "", blacklist_summary: Annotated[str, Form()] = "", blacklist_content: Annotated[str, Form()] = "", @@ -452,7 +420,6 @@ async def post_set_blacklist( regex_blacklist_content: Blacklisted regex for when checking the content. regex_blacklist_author: Blacklisted regex for when checking the author. feed_url: What feed we should set the blacklist for. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -471,17 +438,12 @@ async def post_set_blacklist( @app.get("/blacklist", response_class=HTMLResponse) -async def get_blacklist( - feed_url: str, - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def get_blacklist(feed_url: str, request: Request): """Get the blacklist. Args: feed_url: What feed we should get the blacklist for. request: The request object. - reader: The Reader instance. Returns: HTMLResponse: The blacklist page. @@ -515,7 +477,6 @@ async def get_blacklist( @app.post("/custom") async def post_set_custom( feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], custom_message: Annotated[str, Form()] = "", ) -> RedirectResponse: """Set the custom message, this is used when sending the message. @@ -523,7 +484,6 @@ async def post_set_custom( Args: custom_message: The custom message. feed_url: The feed we should set the custom message for. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -545,17 +505,12 @@ async def post_set_custom( @app.get("/custom", response_class=HTMLResponse) -async def get_custom( - feed_url: str, - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def get_custom(feed_url: str, request: Request): """Get the custom message. This is used when sending the message to Discord. Args: feed_url: What feed we should get the custom message for. request: The request object. - reader: The Reader instance. Returns: HTMLResponse: The custom message page. @@ -576,17 +531,12 @@ async def get_custom( @app.get("/embed", response_class=HTMLResponse) -async def get_embed_page( - feed_url: str, - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def get_embed_page(feed_url: str, request: Request): """Get the custom message. This is used when sending the message to Discord. Args: feed_url: What feed we should get the custom message for. request: The request object. - reader: The Reader instance. Returns: HTMLResponse: The embed page. @@ -622,7 +572,6 @@ async def get_embed_page( @app.post("/embed", response_class=HTMLResponse) async def post_embed( feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], title: Annotated[str, Form()] = "", description: Annotated[str, Form()] = "", color: Annotated[str, Form()] = "", @@ -648,7 +597,7 @@ async def post_embed( author_icon_url: The author icon url of the embed. footer_text: The footer text of the embed. footer_icon_url: The footer icon url of the embed. - reader: The Reader instance. + Returns: RedirectResponse: Redirect to the embed page. @@ -676,15 +625,11 @@ async def post_embed( @app.post("/use_embed") -async def post_use_embed( - feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: +async def post_use_embed(feed_url: Annotated[str, Form()]) -> RedirectResponse: """Use embed instead of text. Args: feed_url: The feed to change. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -696,15 +641,11 @@ async def post_use_embed( @app.post("/use_text") -async def post_use_text( - feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: +async def post_use_text(feed_url: Annotated[str, Form()]) -> RedirectResponse: """Use text instead of embed. Args: feed_url: The feed to change. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -718,7 +659,6 @@ async def post_use_text( @app.post("/set_update_interval") async def post_set_update_interval( feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], interval_minutes: Annotated[int | None, Form()] = None, redirect_to: Annotated[str, Form()] = "", ) -> RedirectResponse: @@ -728,7 +668,6 @@ async def post_set_update_interval( feed_url: The feed to change. interval_minutes: The update interval in minutes (None to reset to global default). redirect_to: Optional redirect URL (defaults to feed page). - reader: The Reader instance. Returns: RedirectResponse: Redirect to the specified page or feed page. @@ -764,14 +703,12 @@ async def post_set_update_interval( async def post_change_feed_url( old_feed_url: Annotated[str, Form()], new_feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], ) -> RedirectResponse: """Change the URL for an existing feed. Args: old_feed_url: Current feed URL. new_feed_url: New feed URL to change to. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page for the resulting URL. @@ -797,19 +734,6 @@ async def post_change_feed_url( except ReaderError as e: raise HTTPException(status_code=400, detail=f"Failed to change feed URL: {e}") from e - # Update the feed with the new URL so we can discover what entries it returns. - # Then mark all unread entries as read so the scheduler doesn't resend them. - try: - reader.update_feed(clean_new_feed_url) - except Exception: - logger.exception("Failed to update feed after URL change: %s", clean_new_feed_url) - - for entry in reader.get_entries(feed=clean_new_feed_url, read=False): - try: - reader.set_entry_read(entry, True) - except Exception: - logger.exception("Failed to mark entry as read after URL change: %s", entry.id) - commit_state_change(reader, f"Change feed URL from {clean_old_feed_url} to {clean_new_feed_url}") return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_new_feed_url)}", status_code=303) @@ -817,7 +741,6 @@ async def post_change_feed_url( @app.post("/reset_update_interval") async def post_reset_update_interval( feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], redirect_to: Annotated[str, Form()] = "", ) -> RedirectResponse: """Reset the update interval for a feed to use the global default. @@ -825,7 +748,6 @@ async def post_reset_update_interval( Args: feed_url: The feed to change. redirect_to: Optional redirect URL (defaults to feed page). - reader: The Reader instance. Returns: RedirectResponse: Redirect to the specified page or feed page. @@ -852,15 +774,11 @@ async def post_reset_update_interval( @app.post("/set_global_update_interval") -async def post_set_global_update_interval( - interval_minutes: Annotated[int, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: +async def post_set_global_update_interval(interval_minutes: Annotated[int, Form()]) -> RedirectResponse: """Set the global default update interval. Args: interval_minutes: The update interval in minutes. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the settings page. @@ -874,15 +792,11 @@ async def post_set_global_update_interval( @app.get("/add", response_class=HTMLResponse) -def get_add( - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +def get_add(request: Request): """Page for adding a new feed. Args: request: The request object. - reader: The Reader instance. Returns: HTMLResponse: The add feed page. @@ -895,19 +809,13 @@ def get_add( @app.get("/feed", response_class=HTMLResponse) -async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 - feed_url: str, - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], - starting_after: str = "", -): +async def get_feed(feed_url: str, request: Request, starting_after: str = ""): # noqa: C901, PLR0912, PLR0914, PLR0915 """Get a feed by URL. Args: feed_url: The feed to add. request: The request object. starting_after: The entry to start after. Used for pagination. - reader: The Reader instance. Returns: HTMLResponse: The feed page. @@ -937,22 +845,28 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 except EntryNotFoundError as e: current_entries = list(reader.get_entries(feed=clean_feed_url)) msg: str = f"{e}\n\n{[entry.id for entry in current_entries]}" - html: str = create_html_for_feed(reader=reader, entries=current_entries, current_feed_url=clean_feed_url) + html: str = create_html_for_feed(current_entries, clean_feed_url) # Get feed and global intervals for error case too feed_interval: int | None = None - feed_update_config = reader.get_tag(feed, ".reader.update", None) - if isinstance(feed_update_config, dict) and "interval" in feed_update_config: - interval_value = feed_update_config["interval"] - if isinstance(interval_value, int): - feed_interval = interval_value + try: + feed_update_config = reader.get_tag(feed, ".reader.update") + if isinstance(feed_update_config, dict) and "interval" in feed_update_config: + interval_value = feed_update_config["interval"] + if isinstance(interval_value, int): + feed_interval = interval_value + except TagNotFoundError: + pass global_interval: int = 60 - global_update_config = reader.get_tag((), ".reader.update", None) - if isinstance(global_update_config, dict) and "interval" in global_update_config: - interval_value = global_update_config["interval"] - if isinstance(interval_value, int): - global_interval = interval_value + try: + global_update_config = reader.get_tag((), ".reader.update") + if isinstance(global_update_config, dict) and "interval" in global_update_config: + interval_value = global_update_config["interval"] + if isinstance(interval_value, int): + global_interval = interval_value + except TagNotFoundError: + pass context = { "request": request, @@ -987,25 +901,36 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 last_entry = entries[-1] # Create the html for the entries. - html: str = create_html_for_feed(reader=reader, entries=entries, current_feed_url=clean_feed_url) + html: str = create_html_for_feed(entries, clean_feed_url) - should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed", True)) + try: + should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed")) + except TagNotFoundError: + add_missing_tags(reader) + should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed")) # Get the update interval for this feed feed_interval: int | None = None - feed_update_config = reader.get_tag(feed, ".reader.update", None) - if isinstance(feed_update_config, dict) and "interval" in feed_update_config: - interval_value = feed_update_config["interval"] - if isinstance(interval_value, int): - feed_interval = interval_value + try: + feed_update_config = reader.get_tag(feed, ".reader.update") + if isinstance(feed_update_config, dict) and "interval" in feed_update_config: + interval_value = feed_update_config["interval"] + if isinstance(interval_value, int): + feed_interval = interval_value + except TagNotFoundError: + # No custom interval set for this feed, will use global default + pass # Get the global default update interval global_interval: int = 60 # Default to 60 minutes if not set - global_update_config = reader.get_tag((), ".reader.update", None) - if isinstance(global_update_config, dict) and "interval" in global_update_config: - interval_value = global_update_config["interval"] - if isinstance(interval_value, int): - global_interval = interval_value + try: + global_update_config = reader.get_tag((), ".reader.update") + if isinstance(global_update_config, dict) and "interval" in global_update_config: + interval_value = global_update_config["interval"] + if isinstance(interval_value, int): + global_interval = interval_value + except TagNotFoundError: + pass context = { "request": request, @@ -1023,15 +948,10 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 return templates.TemplateResponse(request=request, name="feed.html", context=context) -def create_html_for_feed( # noqa: C901, PLR0914 - reader: Reader, - entries: Iterable[Entry], - current_feed_url: str = "", -) -> str: +def create_html_for_feed(entries: Iterable[Entry], current_feed_url: str = "") -> str: # noqa: C901, PLR0914 """Create HTML for the search results. Args: - reader: The Reader instance to use. entries: The entries to create HTML for. current_feed_url: The feed URL currently being viewed in /feed. @@ -1049,19 +969,17 @@ def create_html_for_feed( # noqa: C901, PLR0914 first_image = get_first_image(summary, content) - text: str = replace_tags_in_text_message(entry, reader=reader) or ( - "
No content available.
" - ) + text: str = replace_tags_in_text_message(entry) or "
No content available.
" published = "" if entry.published: published: str = entry.published.strftime("%Y-%m-%d %H:%M:%S") blacklisted: str = "" - if entry_is_blacklisted(entry, reader=reader): + if entry_is_blacklisted(entry): blacklisted = "Blacklisted" whitelisted: str = "" - if entry_is_whitelisted(entry, reader=reader): + if entry_is_whitelisted(entry): whitelisted = "Whitelisted" source_feed_url: str = getattr(entry, "original_feed_url", None) or entry.feed.url @@ -1081,11 +999,7 @@ def create_html_for_feed( # noqa: C901, PLR0914 ) entry_id: str = urllib.parse.quote(entry.id) - encoded_source_feed_url: str = urllib.parse.quote(source_feed_url) - to_discord_html: str = ( - f"" - "Send to Discord" - ) + to_discord_html: str = f"Send to Discord" # Check if this is a YouTube feed entry and the entry has a link is_youtube_feed = "youtube.com/feeds/videos.xml" in entry.feed.url @@ -1156,7 +1070,6 @@ def get_data_from_hook_url(hook_name: str, hook_url: str) -> WebhookInfo: hook_name (str): The webhook name. hook_url (str): The webhook URL. - Returns: WebhookInfo: The webhook username, avatar, guild id, etc. """ @@ -1178,37 +1091,39 @@ def get_data_from_hook_url(hook_name: str, hook_url: str) -> WebhookInfo: @app.get("/settings", response_class=HTMLResponse) -async def get_settings( - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def get_settings(request: Request): """Settings page. Args: request: The request object. - reader: The Reader instance. Returns: HTMLResponse: The settings page. """ # Get the global default update interval global_interval: int = 60 # Default to 60 minutes if not set - global_update_config = reader.get_tag((), ".reader.update", None) - if isinstance(global_update_config, dict) and "interval" in global_update_config: - interval_value = global_update_config["interval"] - if isinstance(interval_value, int): - global_interval = interval_value + try: + global_update_config = reader.get_tag((), ".reader.update") + if isinstance(global_update_config, dict) and "interval" in global_update_config: + interval_value = global_update_config["interval"] + if isinstance(interval_value, int): + global_interval = interval_value + except TagNotFoundError: + pass # Get all feeds with their intervals feeds: Iterable[Feed] = reader.get_feeds() feed_intervals = [] for feed in feeds: feed_interval: int | None = None - feed_update_config = reader.get_tag(feed, ".reader.update", None) - if isinstance(feed_update_config, dict) and "interval" in feed_update_config: - interval_value = feed_update_config["interval"] - if isinstance(interval_value, int): - feed_interval = interval_value + try: + feed_update_config = reader.get_tag(feed, ".reader.update") + if isinstance(feed_update_config, dict) and "interval" in feed_update_config: + interval_value = feed_update_config["interval"] + if isinstance(interval_value, int): + feed_interval = interval_value + except TagNotFoundError: + pass feed_intervals.append({ "feed": feed, @@ -1226,15 +1141,11 @@ async def get_settings( @app.get("/webhooks", response_class=HTMLResponse) -async def get_webhooks( - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def get_webhooks(request: Request): """Page for adding a new webhook. Args: request: The request object. - reader: The Reader instance. Returns: HTMLResponse: The add webhook page. @@ -1255,65 +1166,54 @@ async def get_webhooks( @app.get("/", response_class=HTMLResponse) -def get_index( - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], - message: str = "", -): +def get_index(request: Request, message: str = ""): """This is the root of the website. Args: request: The request object. message: Optional message to display to the user. - reader: The Reader instance. Returns: HTMLResponse: The index page. """ - return templates.TemplateResponse( - request=request, - name="index.html", - context=make_context_index(request, message, reader), - ) + return templates.TemplateResponse(request=request, name="index.html", context=make_context_index(request, message)) -def make_context_index(request: Request, message: str = "", reader: Reader | None = None): +def make_context_index(request: Request, message: str = ""): """Create the needed context for the index page. Args: request: The request object. message: Optional message to display to the user. - reader: The Reader instance. Returns: dict: The context for the index page. """ - effective_reader: Reader = reader or get_reader_dependency() - hooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(effective_reader.get_tag((), "webhooks", []))) + hooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(reader.get_tag((), "webhooks", []))) - feed_list: list[dict[str, JSONType | Feed | str]] = [] - broken_feeds: list[Feed] = [] - feeds_without_attached_webhook: list[Feed] = [] + feed_list = [] + broken_feeds = [] + feeds_without_attached_webhook = [] # Get all feeds and organize them - feeds: Iterable[Feed] = effective_reader.get_feeds() + feeds: Iterable[Feed] = reader.get_feeds() for feed in feeds: - webhook: str = str(effective_reader.get_tag(feed.url, "webhook", "")) - if not webhook: + try: + webhook = reader.get_tag(feed.url, "webhook") + feed_list.append({"feed": feed, "webhook": webhook, "domain": extract_domain(feed.url)}) + except TagNotFoundError: broken_feeds.append(feed) continue - feed_list.append({"feed": feed, "webhook": webhook, "domain": extract_domain(feed.url)}) - - webhook_list: list[str] = [hook["url"] for hook in hooks] + webhook_list = [hook["url"] for hook in hooks] if webhook not in webhook_list: feeds_without_attached_webhook.append(feed) return { "request": request, "feeds": feed_list, - "feed_count": effective_reader.get_feed_counts(), - "entry_count": effective_reader.get_entry_counts(), + "feed_count": reader.get_feed_counts(), + "entry_count": reader.get_entry_counts(), "webhooks": hooks, "broken_feeds": broken_feeds, "feeds_without_attached_webhook": feeds_without_attached_webhook, @@ -1322,15 +1222,12 @@ def make_context_index(request: Request, message: str = "", reader: Reader | Non @app.post("/remove", response_class=HTMLResponse) -async def remove_feed( - feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def remove_feed(feed_url: Annotated[str, Form()]): """Get a feed by URL. Args: feed_url: The feed to add. - reader: The Reader instance. + Returns: RedirectResponse: Redirect to the index page. @@ -1349,17 +1246,13 @@ async def remove_feed( @app.get("/update", response_class=HTMLResponse) -async def update_feed( - request: Request, - feed_url: str, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def update_feed(request: Request, feed_url: str): """Update a feed. Args: request: The request object. feed_url: The feed URL to update. - reader: The Reader instance. + Returns: RedirectResponse: Redirect to the feed page. @@ -1377,15 +1270,11 @@ async def update_feed( @app.post("/backup") -async def manual_backup( - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: +async def manual_backup(request: Request) -> RedirectResponse: """Manually trigger a git backup of the current state. Args: request: The request object. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the index page with a success or error message. @@ -1408,81 +1297,51 @@ async def manual_backup( @app.get("/search", response_class=HTMLResponse) -async def search( - request: Request, - query: str, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def search(request: Request, query: str): """Get entries matching a full-text search query. Args: query: The query to search for. request: The request object. - reader: The Reader instance. Returns: HTMLResponse: The search page. """ reader.update_search() - context = create_search_context(query, reader=reader) + context = create_search_context(query) return templates.TemplateResponse(request=request, name="search.html", context={"request": request, **context}) @app.get("/post_entry", response_class=HTMLResponse) -async def post_entry( - entry_id: str, - reader: Annotated[Reader, Depends(get_reader_dependency)], - feed_url: str = "", -): +async def post_entry(entry_id: str): """Send single entry to Discord. Args: entry_id: The entry to send. - feed_url: Optional feed URL used to disambiguate entries with identical IDs. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. """ unquoted_entry_id: str = urllib.parse.unquote(entry_id) - clean_feed_url: str = urllib.parse.unquote(feed_url.strip()) if feed_url else "" - - # Prefer feed-scoped lookup when feed_url is provided. This avoids ambiguity when - # multiple feeds contain entries with the same ID. - entry: Entry | None = None - if clean_feed_url: - entry = next( - (entry for entry in reader.get_entries(feed=clean_feed_url) if entry.id == unquoted_entry_id), - None, - ) - else: - entry = next((entry for entry in reader.get_entries() if entry.id == unquoted_entry_id), None) - + entry: Entry | None = next((entry for entry in reader.get_entries() if entry.id == unquoted_entry_id), None) if entry is None: return HTMLResponse(status_code=404, content=f"Entry '{entry_id}' not found.") - if result := send_entry_to_discord(entry=entry, reader=reader): + if result := send_entry_to_discord(entry=entry): return result # Redirect to the feed page. - redirect_feed_url: str = entry.feed.url.strip() - return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(redirect_feed_url)}", status_code=303) + clean_feed_url: str = entry.feed.url.strip() + return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.post("/modify_webhook", response_class=HTMLResponse) -def modify_webhook( - old_hook: Annotated[str, Form()], - new_hook: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], - redirect_to: Annotated[str, Form()] = "", -): +def modify_webhook(old_hook: Annotated[str, Form()], new_hook: Annotated[str, Form()]): """Modify a webhook. Args: old_hook: The webhook to modify. new_hook: The new webhook. - redirect_to: Optional redirect URL after the update. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the webhook page. @@ -1497,20 +1356,15 @@ def modify_webhook( # Webhooks are stored as a list of dictionaries. # Example: [{"name": "webhook_name", "url": "webhook_url"}] webhooks = cast("list[dict[str, str]]", webhooks) - old_hook_clean: str = old_hook.strip() - new_hook_clean: str = new_hook.strip() - webhook_modified: bool = False for hook in webhooks: - if hook["url"] in old_hook_clean: - hook["url"] = new_hook_clean + if hook["url"] in old_hook.strip(): + hook["url"] = new_hook.strip() # Check if it has been modified. - if hook["url"] != new_hook_clean: + if hook["url"] != new_hook.strip(): raise HTTPException(status_code=500, detail="Webhook could not be modified") - webhook_modified = True - # Add our new list of webhooks to the database. reader.set_tag((), "webhooks", webhooks) # pyright: ignore[reportArgumentType] @@ -1518,21 +1372,16 @@ def modify_webhook( # matches the old one. feeds: Iterable[Feed] = reader.get_feeds() for feed in feeds: - webhook: str = str(reader.get_tag(feed, "webhook", "")) + try: + webhook = reader.get_tag(feed, "webhook") + except TagNotFoundError: + continue - if webhook == old_hook_clean: - reader.set_tag(feed.url, "webhook", new_hook_clean) # pyright: ignore[reportArgumentType] + if webhook == old_hook.strip(): + reader.set_tag(feed.url, "webhook", new_hook.strip()) # pyright: ignore[reportArgumentType] - if webhook_modified and old_hook_clean != new_hook_clean: - commit_state_change(reader, f"Modify webhook URL from {old_hook_clean} to {new_hook_clean}") - - redirect_url: str = redirect_to.strip() or "/webhooks" - if redirect_to: - redirect_url = redirect_url.replace(urllib.parse.quote(old_hook_clean), urllib.parse.quote(new_hook_clean)) - redirect_url = redirect_url.replace(old_hook_clean, new_hook_clean) - - # Redirect to the requested page. - return RedirectResponse(url=redirect_url, status_code=303) + # Redirect to the webhook page. + return RedirectResponse(url="/webhooks", status_code=303) def extract_youtube_video_id(url: str) -> str | None: @@ -1558,216 +1407,11 @@ def extract_youtube_video_id(url: str) -> str | None: return None -def resolve_final_feed_url(url: str) -> tuple[str, str | None]: - """Resolve a feed URL by following redirects. - - Args: - url: The feed URL to resolve. - - Returns: - tuple[str, str | None]: A tuple with (resolved_url, error_message). - error_message is None when resolution succeeded. - """ - clean_url: str = url.strip() - if not clean_url: - return "", "URL is empty" - - if not is_url_valid(clean_url): - return clean_url, "URL is invalid" - - try: - response: Response = httpx.get(clean_url, follow_redirects=True, timeout=10.0) - except httpx.HTTPError as e: - return clean_url, str(e) - - if not response.is_success: - return clean_url, f"HTTP {response.status_code}" - - return str(response.url), None - - -def create_webhook_feed_url_preview( - webhook_feeds: list[Feed], - replace_from: str, - replace_to: str, - resolve_urls: bool, # noqa: FBT001 - force_update: bool = False, # noqa: FBT001, FBT002 - existing_feed_urls: set[str] | None = None, -) -> list[dict[str, str | bool | None]]: - """Create preview rows for bulk feed URL replacement. - - Args: - webhook_feeds: Feeds attached to a webhook. - replace_from: Text to replace in each URL. - replace_to: Replacement text. - resolve_urls: Whether to resolve resulting URLs via HTTP redirects. - force_update: Whether conflicts should be marked as force-overwritable. - existing_feed_urls: Optional set of all tracked feed URLs used for conflict detection. - - Returns: - list[dict[str, str | bool | None]]: Rows used in the preview table. - """ - known_feed_urls: set[str] = existing_feed_urls or {feed.url for feed in webhook_feeds} - preview_rows: list[dict[str, str | bool | None]] = [] - for feed in webhook_feeds: - old_url: str = feed.url - has_match: bool = bool(replace_from and replace_from in old_url) - - candidate_url: str = old_url - if has_match: - candidate_url = old_url.replace(replace_from, replace_to) - - resolved_url: str = candidate_url - resolution_error: str | None = None - if has_match and candidate_url != old_url and resolve_urls: - resolved_url, resolution_error = resolve_final_feed_url(candidate_url) - - will_force_ignore_errors: bool = bool( - force_update and bool(resolution_error) and has_match and old_url != candidate_url, - ) - - target_exists: bool = bool( - has_match and not resolution_error and resolved_url != old_url and resolved_url in known_feed_urls, - ) - will_force_overwrite: bool = bool(target_exists and force_update) - will_change: bool = bool( - has_match - and old_url != (candidate_url if will_force_ignore_errors else resolved_url) - and (not target_exists or will_force_overwrite) - and (not resolution_error or will_force_ignore_errors), - ) - - preview_rows.append({ - "old_url": old_url, - "candidate_url": candidate_url, - "resolved_url": resolved_url, - "has_match": has_match, - "will_change": will_change, - "target_exists": target_exists, - "will_force_overwrite": will_force_overwrite, - "will_force_ignore_errors": will_force_ignore_errors, - "resolution_error": resolution_error, - }) - - return preview_rows - - -def build_webhook_mass_update_context( - webhook_feeds: list[Feed], - all_feeds: list[Feed], - replace_from: str, - replace_to: str, - resolve_urls: bool, # noqa: FBT001 - force_update: bool = False, # noqa: FBT001, FBT002 -) -> dict[str, str | bool | int | list[dict[str, str | bool | None]] | dict[str, int]]: - """Build context data used by the webhook mass URL update preview UI. - - Args: - webhook_feeds: Feeds attached to the selected webhook. - all_feeds: All tracked feeds. - replace_from: Text to replace in URLs. - replace_to: Replacement text. - resolve_urls: Whether to resolve resulting URLs. - force_update: Whether to allow overwriting existing target URLs. - - Returns: - dict[str, ...]: Context values for rendering preview controls and table. - """ - clean_replace_from: str = replace_from.strip() - clean_replace_to: str = replace_to.strip() - - preview_rows: list[dict[str, str | bool | None]] = [] - if clean_replace_from: - preview_rows = create_webhook_feed_url_preview( - webhook_feeds=webhook_feeds, - replace_from=clean_replace_from, - replace_to=clean_replace_to, - resolve_urls=resolve_urls, - force_update=force_update, - existing_feed_urls={feed.url for feed in all_feeds}, - ) - - preview_summary: dict[str, int] = { - "total": len(preview_rows), - "matched": sum(1 for row in preview_rows if row["has_match"]), - "will_update": sum(1 for row in preview_rows if row["will_change"]), - "conflicts": sum(1 for row in preview_rows if row["target_exists"] and not row["will_force_overwrite"]), - "force_overwrite": sum(1 for row in preview_rows if row["will_force_overwrite"]), - "force_ignore_errors": sum(1 for row in preview_rows if row["will_force_ignore_errors"]), - "resolve_errors": sum(1 for row in preview_rows if row["resolution_error"]), - } - preview_summary["no_match"] = preview_summary["total"] - preview_summary["matched"] - preview_summary["no_change"] = sum( - 1 for row in preview_rows if row["has_match"] and not row["resolution_error"] and not row["will_change"] - ) - - return { - "replace_from": clean_replace_from, - "replace_to": clean_replace_to, - "resolve_urls": resolve_urls, - "force_update": force_update, - "preview_rows": preview_rows, - "preview_summary": preview_summary, - "preview_change_count": preview_summary["will_update"], - } - - -@app.get("/webhook_entries_mass_update_preview", response_class=HTMLResponse) -async def get_webhook_entries_mass_update_preview( - webhook_url: str, - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], - replace_from: str = "", - replace_to: str = "", - resolve_urls: bool = True, # noqa: FBT001, FBT002 - force_update: bool = False, # noqa: FBT001, FBT002 -) -> HTMLResponse: - """Render the mass-update preview fragment for a webhook using HTMX. - - Args: - webhook_url: Webhook URL whose feeds are being updated. - request: The request object. - reader: The Reader instance. - replace_from: Text to find in URLs. - replace_to: Replacement text. - resolve_urls: Whether to resolve resulting URLs. - force_update: Whether to allow overwriting existing target URLs. - - Returns: - HTMLResponse: Rendered partial template containing summary + preview table. - """ - clean_webhook_url: str = urllib.parse.unquote(webhook_url.strip()) - all_feeds: list[Feed] = list(reader.get_feeds()) - webhook_feeds: list[Feed] = [ - feed for feed in all_feeds if str(reader.get_tag(feed.url, "webhook", "")) == clean_webhook_url - ] - - context = { - "request": request, - "webhook_url": clean_webhook_url, - **build_webhook_mass_update_context( - webhook_feeds=webhook_feeds, - all_feeds=all_feeds, - replace_from=replace_from, - replace_to=replace_to, - resolve_urls=resolve_urls, - force_update=force_update, - ), - } - return templates.TemplateResponse(request=request, name="_webhook_mass_update_preview.html", context=context) - - @app.get("/webhook_entries", response_class=HTMLResponse) -async def get_webhook_entries( # noqa: C901, PLR0914 +async def get_webhook_entries( # noqa: C901, PLR0912, PLR0914 webhook_url: str, request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], starting_after: str = "", - replace_from: str = "", - replace_to: str = "", - resolve_urls: bool = True, # noqa: FBT001, FBT002 - force_update: bool = False, # noqa: FBT001, FBT002 - message: str = "", ) -> HTMLResponse: """Get all latest entries from all feeds for a specific webhook. @@ -1775,12 +1419,6 @@ async def get_webhook_entries( # noqa: C901, PLR0914 webhook_url: The webhook URL to get entries for. request: The request object. starting_after: The entry to start after. Used for pagination. - replace_from: Optional URL substring to find for bulk URL replacement preview. - replace_to: Optional replacement substring used in bulk URL replacement preview. - resolve_urls: Whether to resolve replaced URLs by following redirects. - force_update: Whether to allow overwriting existing target URLs during apply. - message: Optional status message shown in the UI. - reader: The Reader instance. Returns: HTMLResponse: The webhook entries page. @@ -1802,26 +1440,24 @@ async def get_webhook_entries( # noqa: C901, PLR0914 if not webhook_name: raise HTTPException(status_code=404, detail=f"Webhook not found: {clean_webhook_url}") - hook_info: WebhookInfo = get_data_from_hook_url(hook_name=webhook_name, hook_url=clean_webhook_url) - # Get all feeds associated with this webhook all_feeds: list[Feed] = list(reader.get_feeds()) webhook_feeds: list[Feed] = [] for feed in all_feeds: - feed_webhook: str = str(reader.get_tag(feed.url, "webhook", "")) - if feed_webhook == clean_webhook_url: - webhook_feeds.append(feed) + try: + feed_webhook: str = str(reader.get_tag(feed.url, "webhook", "")) + if feed_webhook == clean_webhook_url: + webhook_feeds.append(feed) + except TagNotFoundError: + continue # Get all entries from all feeds for this webhook, sorted by published date all_entries: list[Entry] = [entry for feed in webhook_feeds for entry in reader.get_entries(feed=feed)] - # Sort entries by published date (newest first), with undated entries last. + # Sort entries by published date (newest first) all_entries.sort( - key=lambda e: ( - e.published is not None, - e.published or datetime.min.replace(tzinfo=UTC), - ), + key=lambda e: e.published or datetime.now(tz=UTC), reverse=True, ) @@ -1854,16 +1490,7 @@ async def get_webhook_entries( # noqa: C901, PLR0914 last_entry = paginated_entries[-1] # Create the html for the entries - html: str = create_html_for_feed(reader=reader, entries=paginated_entries) - - mass_update_context = build_webhook_mass_update_context( - webhook_feeds=webhook_feeds, - all_feeds=all_feeds, - replace_from=replace_from, - replace_to=replace_to, - resolve_urls=resolve_urls, - force_update=force_update, - ) + html: str = create_html_for_feed(paginated_entries) # Check if there are more entries available total_entries: int = len(all_entries) @@ -1871,155 +1498,18 @@ async def get_webhook_entries( # noqa: C901, PLR0914 context = { "request": request, - "hook_info": hook_info, "webhook_name": webhook_name, "webhook_url": clean_webhook_url, - "webhook_feeds": webhook_feeds, "entries": paginated_entries, "html": html, "last_entry": last_entry, "is_show_more_entries_button_visible": is_show_more_entries_button_visible, "total_entries": total_entries, "feeds_count": len(webhook_feeds), - "message": urllib.parse.unquote(message) if message else "", - **mass_update_context, } return templates.TemplateResponse(request=request, name="webhook_entries.html", context=context) -@app.post("/bulk_change_feed_urls", response_class=HTMLResponse) -async def post_bulk_change_feed_urls( # noqa: C901, PLR0914, PLR0912, PLR0915 - webhook_url: Annotated[str, Form()], - replace_from: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], - replace_to: Annotated[str, Form()] = "", - resolve_urls: Annotated[bool, Form()] = True, # noqa: FBT002 - force_update: Annotated[bool, Form()] = False, # noqa: FBT002 -) -> RedirectResponse: - """Bulk-change feed URLs attached to a webhook. - - Args: - webhook_url: The webhook URL whose feeds should be updated. - replace_from: Text to find in each URL. - replace_to: Text to replace with. - resolve_urls: Whether to resolve resulting URLs via redirects. - force_update: Whether existing target feed URLs should be overwritten. - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to webhook detail with status message. - - Raises: - HTTPException: If webhook is missing or replace_from is empty. - """ - clean_webhook_url: str = urllib.parse.unquote(webhook_url.strip()) - clean_replace_from: str = replace_from.strip() - clean_replace_to: str = replace_to.strip() - - if not clean_replace_from: - raise HTTPException(status_code=400, detail="replace_from cannot be empty") - - webhooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(reader.get_tag((), "webhooks", []))) - if not any(hook["url"] == clean_webhook_url for hook in webhooks): - raise HTTPException(status_code=404, detail=f"Webhook not found: {clean_webhook_url}") - - all_feeds: list[Feed] = list(reader.get_feeds()) - webhook_feeds: list[Feed] = [] - for feed in all_feeds: - feed_webhook: str = str(reader.get_tag(feed.url, "webhook", "")) - if feed_webhook == clean_webhook_url: - webhook_feeds.append(feed) - - preview_rows: list[dict[str, str | bool | None]] = create_webhook_feed_url_preview( - webhook_feeds=webhook_feeds, - replace_from=clean_replace_from, - replace_to=clean_replace_to, - resolve_urls=resolve_urls, - force_update=force_update, - existing_feed_urls={feed.url for feed in all_feeds}, - ) - - changed_count: int = 0 - skipped_count: int = 0 - failed_count: int = 0 - conflict_count: int = 0 - force_overwrite_count: int = 0 - - for row in preview_rows: - if not row["has_match"]: - continue - - if row["resolution_error"] and not force_update: - skipped_count += 1 - continue - - if row["target_exists"] and not force_update: - conflict_count += 1 - skipped_count += 1 - continue - - old_url: str = str(row["old_url"]) - new_url: str = str(row["candidate_url"] if row["will_force_ignore_errors"] else row["resolved_url"]) - - if old_url == new_url: - skipped_count += 1 - continue - - if row["target_exists"] and force_update: - try: - reader.delete_feed(new_url) - force_overwrite_count += 1 - except FeedNotFoundError: - pass - except ReaderError: - failed_count += 1 - continue - - try: - reader.change_feed_url(old_url, new_url) - except FeedExistsError: - skipped_count += 1 - continue - except FeedNotFoundError: - skipped_count += 1 - continue - except ReaderError: - failed_count += 1 - continue - - try: - reader.update_feed(new_url) - except Exception: - logger.exception("Failed to update feed after URL change: %s", new_url) - - for entry in reader.get_entries(feed=new_url, read=False): - try: - reader.set_entry_read(entry, True) - except Exception: - logger.exception("Failed to mark entry as read after URL change: %s", entry.id) - - changed_count += 1 - - if changed_count > 0: - commit_state_change( - reader, - f"Bulk change {changed_count} feed URL(s) for webhook {clean_webhook_url}", - ) - - status_message: str = ( - f"Updated {changed_count} feed URL(s). " - f"Force overwrote {force_overwrite_count}. " - f"Conflicts {conflict_count}. " - f"Skipped {skipped_count}. " - f"Failed {failed_count}." - ) - redirect_url: str = ( - f"/webhook_entries?webhook_url={urllib.parse.quote(clean_webhook_url)}" - f"&message={urllib.parse.quote(status_message)}" - ) - return RedirectResponse(url=redirect_url, status_code=303) - - if __name__ == "__main__": sentry_sdk.init( dsn="https://6e77a0d7acb9c7ea22e85a375e0ff1f4@o4505228040339456.ingest.us.sentry.io/4508792887967744", diff --git a/discord_rss_bot/missing_tags.py b/discord_rss_bot/missing_tags.py new file mode 100644 index 0000000..589893e --- /dev/null +++ b/discord_rss_bot/missing_tags.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +from reader import Feed +from reader import Reader +from reader import TagNotFoundError + +from discord_rss_bot.settings import default_custom_embed +from discord_rss_bot.settings import default_custom_message + + +def add_custom_message(reader: Reader, feed: Feed) -> None: + """Add the custom message tag to the feed if it doesn't exist. + + Args: + reader: What Reader to use. + feed: The feed to add the tag to. + """ + try: + reader.get_tag(feed, "custom_message") + except TagNotFoundError: + reader.set_tag(feed.url, "custom_message", default_custom_message) # pyright: ignore[reportArgumentType] + reader.set_tag(feed.url, "has_custom_message", True) # pyright: ignore[reportArgumentType] + + +def add_has_custom_message(reader: Reader, feed: Feed) -> None: + """Add the has_custom_message tag to the feed if it doesn't exist. + + Args: + reader: What Reader to use. + feed: The feed to add the tag to. + """ + try: + reader.get_tag(feed, "has_custom_message") + except TagNotFoundError: + if reader.get_tag(feed, "custom_message") == default_custom_message: + reader.set_tag(feed.url, "has_custom_message", False) # pyright: ignore[reportArgumentType] + else: + reader.set_tag(feed.url, "has_custom_message", True) # pyright: ignore[reportArgumentType] + + +def add_if_embed(reader: Reader, feed: Feed) -> None: + """Add the if_embed tag to the feed if it doesn't exist. + + Args: + reader: What Reader to use. + feed: The feed to add the tag to. + """ + try: + reader.get_tag(feed, "if_embed") + except TagNotFoundError: + reader.set_tag(feed.url, "if_embed", True) # pyright: ignore[reportArgumentType] + + +def add_custom_embed(reader: Reader, feed: Feed) -> None: + """Add the custom embed tag to the feed if it doesn't exist. + + Args: + reader: What Reader to use. + feed: The feed to add the tag to. + """ + try: + reader.get_tag(feed, "embed") + except TagNotFoundError: + reader.set_tag(feed.url, "embed", default_custom_embed) # pyright: ignore[reportArgumentType] + reader.set_tag(feed.url, "has_custom_embed", True) # pyright: ignore[reportArgumentType] + + +def add_has_custom_embed(reader: Reader, feed: Feed) -> None: + """Add the has_custom_embed tag to the feed if it doesn't exist. + + Args: + reader: What Reader to use. + feed: The feed to add the tag to. + """ + try: + reader.get_tag(feed, "has_custom_embed") + except TagNotFoundError: + if reader.get_tag(feed, "embed") == default_custom_embed: + reader.set_tag(feed.url, "has_custom_embed", False) # pyright: ignore[reportArgumentType] + else: + reader.set_tag(feed.url, "has_custom_embed", True) # pyright: ignore[reportArgumentType] + + +def add_should_send_embed(reader: Reader, feed: Feed) -> None: + """Add the should_send_embed tag to the feed if it doesn't exist. + + Args: + reader: What Reader to use. + feed: The feed to add the tag to. + """ + try: + reader.get_tag(feed, "should_send_embed") + except TagNotFoundError: + reader.set_tag(feed.url, "should_send_embed", True) # pyright: ignore[reportArgumentType] + + +def add_missing_tags(reader: Reader) -> None: + """Add missing tags to feeds. + + Args: + reader: What Reader to use. + """ + for feed in reader.get_feeds(): + add_custom_message(reader, feed) + add_has_custom_message(reader, feed) + add_if_embed(reader, feed) + add_custom_embed(reader, feed) + add_has_custom_embed(reader, feed) + add_should_send_embed(reader, feed) diff --git a/discord_rss_bot/search.py b/discord_rss_bot/search.py index 85129ac..a39f304 100644 --- a/discord_rss_bot/search.py +++ b/discord_rss_bot/search.py @@ -3,6 +3,8 @@ from __future__ import annotations import urllib.parse from typing import TYPE_CHECKING +from discord_rss_bot.settings import get_reader + if TYPE_CHECKING: from collections.abc import Iterable @@ -12,16 +14,19 @@ if TYPE_CHECKING: from reader import Reader -def create_search_context(query: str, reader: Reader) -> dict: +def create_search_context(query: str, custom_reader: Reader | None = None) -> dict: """Build context for search.html template. + If custom_reader is None, use the default reader from settings. + Args: query (str): The search query. - reader (Reader): Custom Reader instance. + custom_reader (Reader | None): Optional custom Reader instance. Returns: dict: Context dictionary for rendering the search results. """ + reader: Reader = get_reader() if custom_reader is None else custom_reader search_results: Iterable[EntrySearchResult] = reader.search_entries(query) results: list[dict] = [] diff --git a/discord_rss_bot/settings.py b/discord_rss_bot/settings.py index 194bf08..e91c3c0 100644 --- a/discord_rss_bot/settings.py +++ b/discord_rss_bot/settings.py @@ -7,6 +7,7 @@ from pathlib import Path from platformdirs import user_data_dir from reader import Reader +from reader import TagNotFoundError from reader import make_reader if typing.TYPE_CHECKING: @@ -47,7 +48,9 @@ def get_reader(custom_location: Path | None = None) -> Reader: # https://reader.readthedocs.io/en/latest/api.html#reader.types.UpdateConfig # Set the default update interval to 15 minutes if not already configured # Users can change this via the Settings page or per-feed in the feed page - if reader.get_tag((), ".reader.update", None) is None: + try: + reader.get_tag((), ".reader.update") + except TagNotFoundError: # Set default reader.set_tag((), ".reader.update", {"interval": 15}) diff --git a/discord_rss_bot/templates/_webhook_mass_update_preview.html b/discord_rss_bot/templates/_webhook_mass_update_preview.html deleted file mode 100644 index a59e97b..0000000 --- a/discord_rss_bot/templates/_webhook_mass_update_preview.html +++ /dev/null @@ -1,73 +0,0 @@ -{% if preview_rows %} -

- {{ preview_change_count }} feed URL{{ 's' if preview_change_count != 1 else '' }} ready to update. -

-
- Total: {{ preview_summary.total }} - Matched: {{ preview_summary.matched }} - Will update: {{ preview_summary.will_update }} - Conflicts: {{ preview_summary.conflicts }} - Force overwrite: {{ preview_summary.force_overwrite }} - Force ignore errors: {{ preview_summary.force_ignore_errors }} - Resolve errors: {{ preview_summary.resolve_errors }} - No change: {{ preview_summary.no_change }} - No match: {{ preview_summary.no_match }} -
-
- - - - - - -
-
- - - - - - - - - - {% for row in preview_rows %} - - - - - - {% endfor %} - -
Old URLNew URLStatus
- {{ row.old_url }} - - {{ row.resolved_url if resolve_urls else row.candidate_url }} - - {% if not row.has_match %} - No match - {% elif row.will_force_ignore_errors %} - Will force update (ignore resolve error) - {% elif row.resolution_error %} - {{ row.resolution_error }} - {% elif row.will_force_overwrite %} - Will force overwrite - {% elif row.target_exists %} - Conflict: target URL exists - {% elif row.will_change %} - Will update - {% else %} - No change - {% endif %} -
-
-{% elif replace_from %} -

No preview rows found for that replacement pattern.

-{% endif %} diff --git a/discord_rss_bot/templates/base.html b/discord_rss_bot/templates/base.html index 9146b35..a8640dd 100644 --- a/discord_rss_bot/templates/base.html +++ b/discord_rss_bot/templates/base.html @@ -1,12 +1,13 @@ + + content="Stay updated with the latest news and events with our easy-to-use RSS bot. Never miss a message or announcement again with real-time notifications directly to your Discord server." /> + content="discord, rss, bot, notifications, announcements, updates, real-time, server, messages, news, events, feed." /> @@ -17,20 +18,19 @@ {% block head %} {% endblock head %} + {% include "nav.html" %}
{% if messages %} - + {% endif %} + {% block content %} {% endblock content %}
@@ -41,20 +41,18 @@
- + diff --git a/discord_rss_bot/templates/index.html b/discord_rss_bot/templates/index.html index 341ec38..06b8578 100644 --- a/discord_rss_bot/templates/index.html +++ b/discord_rss_bot/templates/index.html @@ -32,10 +32,10 @@ {% for hook_from_context in webhooks %}
-

{{ hook_from_context.name }}

- Settings - + {{ hook_from_context.name }} + + View Latest Entries
diff --git a/discord_rss_bot/templates/webhook_entries.html b/discord_rss_bot/templates/webhook_entries.html index f0bc970..eb12487 100644 --- a/discord_rss_bot/templates/webhook_entries.html +++ b/discord_rss_bot/templates/webhook_entries.html @@ -1,149 +1,20 @@ {% extends "base.html" %} {% block title %} - | {{ webhook_name }} + | {{ webhook_name }} - Latest Entries {% endblock title %} {% block content %} - {% if message %}{% endif %}
-
-
-

{{ webhook_name }}

-

- {{ total_entries }} total from {{ feeds_count }} feed{{ 's' if feeds_count != 1 else '' }} -

-

- {{ webhook_url }} -

-
- -
-
-
-
-
-

Settings

- -
- - -
- - -
-
- -
-
-
- - -
-
-

Mass update feed URLs

-

Replace part of feed URLs for all feeds attached to this webhook.

-
- -
- - -
-
- - -
-
- - -
-
- - -
-
- -
-
-
{% include "_webhook_mass_update_preview.html" %}
-
-
-
-
-

Attached feeds

- {% if webhook_feeds %} - - {% else %} -

No feeds are attached to this webhook yet.

- {% endif %} -
+ +

{{ webhook_name }} - Latest Entries ({{ total_entries }} total from {{ feeds_count }} feeds)

+ +
+

+ {{ webhook_url }} +

{# Rendered HTML content #} {% if entries %} -

Latest entries

{{ html|safe }}
{% if is_show_more_entries_button_visible and last_entry %} None: - """Register custom command-line options for optional integration tests.""" - parser.addoption( - "--run-real-git-backup-tests", - action="store_true", - default=False, - help="Run tests that push git backup state to a real repository.", - ) - - -def pytest_sessionstart(session: pytest.Session) -> None: +def pytest_configure() -> None: """Isolate persistent app state per xdist worker to avoid cross-worker test interference.""" worker_id: str = os.environ.get("PYTEST_XDIST_WORKER", "gw0") worker_data_dir: Path = Path(tempfile.gettempdir()) / "discord-rss-bot-tests" / worker_id @@ -51,10 +37,4 @@ def pytest_sessionstart(session: pytest.Session) -> None: current_reader.close() get_reader: Any = getattr(settings_module, "get_reader", None) if callable(get_reader): - get_reader() - - -def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item]) -> None: - """Skip real git-repo push tests unless explicitly requested.""" - if config.getoption("--run-real-git-backup-tests"): - return + main_module.reader = get_reader() diff --git a/tests/test_blacklist.py b/tests/test_blacklist.py index 0c756ad..354fdca 100644 --- a/tests/test_blacklist.py +++ b/tests/test_blacklist.py @@ -38,7 +38,7 @@ def test_has_black_tags() -> None: # Test feed without any blacklist tags assert_msg: str = "Feed should not have any blacklist tags" - assert feed_has_blacklist_tags(reader=get_reader(), feed=feed) is False, assert_msg + assert feed_has_blacklist_tags(custom_reader=get_reader(), feed=feed) is False, assert_msg check_if_has_tag(reader, feed, "blacklist_title") check_if_has_tag(reader, feed, "blacklist_summary") @@ -58,11 +58,11 @@ def test_has_black_tags() -> None: def check_if_has_tag(reader: Reader, feed: Feed, blacklist_name: str) -> None: reader.set_tag(feed, blacklist_name, "a") # pyright: ignore[reportArgumentType] assert_msg: str = f"Feed should have blacklist tags: {blacklist_name}" - assert feed_has_blacklist_tags(reader=reader, feed=feed) is True, assert_msg + assert feed_has_blacklist_tags(custom_reader=reader, feed=feed) is True, assert_msg asset_msg: str = f"Feed should not have any blacklist tags: {blacklist_name}" reader.delete_tag(feed, blacklist_name) - assert feed_has_blacklist_tags(reader=reader, feed=feed) is False, asset_msg + assert feed_has_blacklist_tags(custom_reader=reader, feed=feed) is False, asset_msg def test_should_be_skipped() -> None: diff --git a/tests/test_custom_filter.py b/tests/test_custom_filter.py index 9611698..5538608 100644 --- a/tests/test_custom_filter.py +++ b/tests/test_custom_filter.py @@ -45,39 +45,39 @@ def test_entry_is_whitelisted() -> None: Path.mkdir(Path(temp_dir), exist_ok=True) custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite") - reader: Reader = get_reader(custom_location=str(custom_loc)) + custom_reader: Reader = get_reader(custom_location=str(custom_loc)) # Add a feed to the database. - reader.add_feed("https://lovinator.space/rss_test.xml") - reader.update_feed("https://lovinator.space/rss_test.xml") + custom_reader.add_feed("https://lovinator.space/rss_test.xml") + custom_reader.update_feed("https://lovinator.space/rss_test.xml") # whitelist_title - reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] - for entry in reader.get_entries(): - if entry_is_whitelisted(entry, reader=reader) is True: + custom_reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] + for entry in custom_reader.get_entries(): + if entry_is_whitelisted(entry) is True: assert entry.title == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.title}" break - reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_title") + custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_title") # whitelist_summary - reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] - for entry in reader.get_entries(): - if entry_is_whitelisted(entry, reader=reader) is True: + custom_reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] + for entry in custom_reader.get_entries(): + if entry_is_whitelisted(entry) is True: assert entry.summary == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.summary}" break - reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_summary") + custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_summary") # whitelist_content - reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] - for entry in reader.get_entries(): - if entry_is_whitelisted(entry, reader=reader) is True: + custom_reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] + for entry in custom_reader.get_entries(): + if entry_is_whitelisted(entry) is True: assert_msg = f"Expected:

ffdnfdnfdnfdnfdndfn

, Got: {entry.content[0].value}" assert entry.content[0].value == "

ffdnfdnfdnfdnfdndfn

", assert_msg break - reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_content") + custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_content") # Close the reader, so we can delete the directory. - reader.close() + custom_reader.close() def test_entry_is_blacklisted() -> None: @@ -87,36 +87,36 @@ def test_entry_is_blacklisted() -> None: Path.mkdir(Path(temp_dir), exist_ok=True) custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite") - reader: Reader = get_reader(custom_location=str(custom_loc)) + custom_reader: Reader = get_reader(custom_location=str(custom_loc)) # Add a feed to the database. - reader.add_feed("https://lovinator.space/rss_test.xml") - reader.update_feed("https://lovinator.space/rss_test.xml") + custom_reader.add_feed("https://lovinator.space/rss_test.xml") + custom_reader.update_feed("https://lovinator.space/rss_test.xml") # blacklist_title - reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] - for entry in reader.get_entries(): - if entry_is_blacklisted(entry, reader=reader) is True: + custom_reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] + for entry in custom_reader.get_entries(): + if entry_is_blacklisted(entry) is True: assert entry.title == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.title}" break - reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_title") + custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_title") # blacklist_summary - reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] - for entry in reader.get_entries(): - if entry_is_blacklisted(entry, reader=reader) is True: + custom_reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] + for entry in custom_reader.get_entries(): + if entry_is_blacklisted(entry) is True: assert entry.summary == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.summary}" break - reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_summary") + custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_summary") # blacklist_content - reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] - for entry in reader.get_entries(): - if entry_is_blacklisted(entry, reader=reader) is True: + custom_reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] + for entry in custom_reader.get_entries(): + if entry_is_blacklisted(entry) is True: assert_msg = f"Expected:

ffdnfdnfdnfdnfdndfn

, Got: {entry.content[0].value}" assert entry.content[0].value == "

ffdnfdnfdnfdnfdndfn

", assert_msg break - reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_content") + custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_content") # Close the reader, so we can delete the directory. - reader.close() + custom_reader.close() diff --git a/tests/test_custom_message.py b/tests/test_custom_message.py index 4b23f45..6fc4d41 100644 --- a/tests/test_custom_message.py +++ b/tests/test_custom_message.py @@ -102,10 +102,12 @@ def test_format_entry_html_for_discord_does_not_preserve_invalid_timestamp_style @patch("discord_rss_bot.custom_message.get_custom_message") +@patch("discord_rss_bot.custom_message.get_reader") def test_replace_tags_in_text_message_preserves_timestamp_tags( + mock_get_reader: MagicMock, mock_get_custom_message: MagicMock, ) -> None: - mock_reader = MagicMock() + mock_get_reader.return_value = MagicMock() mock_get_custom_message.return_value = "{{entry_summary}}" summary_parts: list[str] = [ f"

Format {index}: ({timestamp_tag.replace('<', '<').replace('>', '>')})

" @@ -114,17 +116,19 @@ def test_replace_tags_in_text_message_preserves_timestamp_tags( entry_ns: SimpleNamespace = make_entry("".join(summary_parts)) entry: Entry = typing.cast("Entry", entry_ns) - rendered: str = replace_tags_in_text_message(entry, reader=mock_reader) + rendered: str = replace_tags_in_text_message(entry) for timestamp_tag in TIMESTAMP_FORMATS: assert timestamp_tag in rendered @patch("discord_rss_bot.custom_message.get_embed") +@patch("discord_rss_bot.custom_message.get_reader") def test_replace_tags_in_embed_preserves_timestamp_tags( + mock_get_reader: MagicMock, mock_get_embed: MagicMock, ) -> None: - mock_reader = MagicMock() + mock_get_reader.return_value = MagicMock() mock_get_embed.return_value = CustomEmbed(description="{{entry_summary}}") summary_parts: list[str] = [ f"

Format {index}: ({timestamp_tag.replace('<', '<').replace('>', '>')})

" @@ -134,7 +138,7 @@ def test_replace_tags_in_embed_preserves_timestamp_tags( entry: Entry = typing.cast("Entry", entry_ns) - embed: CustomEmbed = replace_tags_in_embed(entry_ns.feed, entry, reader=mock_reader) + embed: CustomEmbed = replace_tags_in_embed(entry_ns.feed, entry) for timestamp_tag in TIMESTAMP_FORMATS: assert timestamp_tag in embed.description diff --git a/tests/test_feeds.py b/tests/test_feeds.py index 84e836c..2ffe47e 100644 --- a/tests/test_feeds.py +++ b/tests/test_feeds.py @@ -18,6 +18,7 @@ from discord_rss_bot.feeds import send_entry_to_discord from discord_rss_bot.feeds import send_to_discord from discord_rss_bot.feeds import should_send_embed_check from discord_rss_bot.feeds import truncate_webhook_message +from discord_rss_bot.missing_tags import add_missing_tags def test_send_to_discord() -> None: @@ -34,6 +35,8 @@ def test_send_to_discord() -> None: # Add a feed to the reader. reader.add_feed("https://www.reddit.com/r/Python/.rss") + add_missing_tags(reader) + # Update the feed to get the entries. reader.update_feeds() @@ -55,7 +58,7 @@ def test_send_to_discord() -> None: assert reader.get_tag(feed, "webhook") == webhook_url, f"The webhook URL should be '{webhook_url}'." # Send the feed to Discord. - send_to_discord(reader=reader, feed=feed, do_once=True) + send_to_discord(custom_reader=reader, feed=feed, do_once=True) # Close the reader, so we can delete the directory. reader.close() @@ -188,7 +191,7 @@ def test_send_entry_to_discord_youtube_feed( mock_discord_webhook.return_value = mock_webhook # Call the function - send_entry_to_discord(mock_entry, mock_reader) + send_entry_to_discord(mock_entry) # Assertions mock_create_embed.assert_not_called() @@ -200,7 +203,7 @@ def test_send_entry_to_discord_youtube_feed( assert webhook_call_kwargs["url"] == "https://discord.com/api/webhooks/123/abc" # Verify execute_webhook was called - mock_execute_webhook.assert_called_once_with(mock_webhook, mock_entry, reader=mock_reader) + mock_execute_webhook.assert_called_once_with(mock_webhook, mock_entry) def test_extract_domain_youtube_feed() -> None: diff --git a/tests/test_git_backup.py b/tests/test_git_backup.py index 183d178..0fa6f8e 100644 --- a/tests/test_git_backup.py +++ b/tests/test_git_backup.py @@ -304,6 +304,102 @@ def test_commit_state_change_no_push_when_remote_unset(monkeypatch: pytest.Monke assert not push_calls, "git push should NOT be called when GIT_BACKUP_REMOTE is not set" +@SKIP_IF_NO_GIT +def test_commit_state_change_e2e_push_to_bare_repo(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + """End-to-end test: commit_state_change pushes to a real bare git repository.""" + git_executable: str | None = shutil.which("git") + assert git_executable is not None, "git executable not found" + + # Create a bare remote repository + bare_repo_path: Path = tmp_path / "remote.git" + subprocess.run([git_executable, "init", "--bare", str(bare_repo_path)], check=True, capture_output=True) # noqa: S603 + + # Configure backup with remote pointing to bare repo + backup_path: Path = tmp_path / "backup" + monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path)) + monkeypatch.setenv("GIT_BACKUP_REMOTE", str(bare_repo_path)) + + # Create mock reader with some state + mock_reader = MagicMock() + feed1 = MagicMock() + feed1.url = "https://example.com/feed.rss" + mock_reader.get_feeds.return_value = [feed1] + + def get_tag_side_effect( + feed_or_key: tuple | str, + tag: str | None = None, + default: str | None = None, + ) -> list[Any] | str | None: + if feed_or_key == (): + return [] + if tag == "webhook": + return "https://discord.com/api/webhooks/123/abc" + return default + + mock_reader.get_tag.side_effect = get_tag_side_effect + + # Perform backup with commit and push + commit_state_change(mock_reader, "Initial backup") + + # Verify commit exists in local backup repo + result: subprocess.CompletedProcess[str] = subprocess.run( # noqa: S603 + [git_executable, "-C", str(backup_path), "log", "--oneline"], + capture_output=True, + text=True, + check=True, + ) + assert "Initial backup" in result.stdout + + # Verify origin remote is configured correctly + result = subprocess.run( # noqa: S603 + [git_executable, "-C", str(backup_path), "remote", "get-url", "origin"], + capture_output=True, + text=True, + check=True, + ) + assert result.stdout.strip() == str(bare_repo_path) + + # Verify commit was pushed to the bare remote + result = subprocess.run( # noqa: S603 + [git_executable, "-C", str(bare_repo_path), "log", "--oneline", "master"], + capture_output=True, + text=True, + check=True, + ) + assert "Initial backup" in result.stdout + + # Verify state.json content in the remote + result = subprocess.run( # noqa: S603 + [git_executable, "-C", str(bare_repo_path), "show", "master:state.json"], + capture_output=True, + text=True, + check=True, + ) + state_data: dict[str, Any] = json.loads(result.stdout) + assert state_data["feeds"][0]["url"] == "https://example.com/feed.rss" + assert state_data["feeds"][0]["webhook"] == "https://discord.com/api/webhooks/123/abc" + + # Perform a second backup to verify subsequent pushes work + feed2 = MagicMock() + feed2.url = "https://another.com/feed.xml" + mock_reader.get_feeds.return_value = [feed1, feed2] + + commit_state_change(mock_reader, "Add second feed") + + # Verify both commits are in the remote + result = subprocess.run( # noqa: S603 + [git_executable, "-C", str(bare_repo_path), "log", "--oneline", "master"], + capture_output=True, + text=True, + check=True, + ) + assert "Initial backup" in result.stdout + assert "Add second feed" in result.stdout + + +# Integration tests for embed-related endpoint backups + + client: TestClient = TestClient(app) test_webhook_name: str = "Test Backup Webhook" test_webhook_url: str = "https://discord.com/api/webhooks/999999999/testbackupwebhook" diff --git a/tests/test_main.py b/tests/test_main.py index f6396eb..dc3ecf5 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -4,19 +4,13 @@ import re import urllib.parse from dataclasses import dataclass from dataclasses import field -from datetime import UTC -from datetime import datetime from typing import TYPE_CHECKING from typing import cast -from unittest.mock import MagicMock -from unittest.mock import patch from fastapi.testclient import TestClient -import discord_rss_bot.main as main_module from discord_rss_bot.main import app from discord_rss_bot.main import create_html_for_feed -from discord_rss_bot.main import get_reader_dependency if TYPE_CHECKING: from pathlib import Path @@ -160,9 +154,6 @@ def test_get() -> None: response: Response = client.get(url="/webhooks") assert response.status_code == 200, f"/webhooks failed: {response.text}" - response = client.get(url="/webhook_entries", params={"webhook_url": webhook_url}) - assert response.status_code == 200, f"/webhook_entries failed: {response.text}" - response: Response = client.get(url="/whitelist", params={"feed_url": encoded_feed_url(feed_url)}) assert response.status_code == 200, f"/whitelist failed: {response.text}" @@ -308,147 +299,6 @@ def test_change_feed_url() -> None: client.post(url="/remove", data={"feed_url": new_feed_url}) -def test_change_feed_url_marks_entries_as_read() -> None: - """After changing a feed URL all entries on the new feed should be marked read to prevent resending.""" - new_feed_url = "https://lovinator.space/rss_test_small.xml" - - # Ensure feeds do not already exist. - client.post(url="/remove", data={"feed_url": feed_url}) - client.post(url="/remove", data={"feed_url": new_feed_url}) - - # Ensure webhook exists. - client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) - client.post(url="/add_webhook", data={"webhook_name": webhook_name, "webhook_url": webhook_url}) - - # Add the original feed. - response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name}) - assert response.status_code == 200, f"Failed to add feed: {response.text}" - - # Patch reader on the main module so we can observe calls. - mock_entry_a = MagicMock() - mock_entry_a.id = "entry-a" - mock_entry_b = MagicMock() - mock_entry_b.id = "entry-b" - - real_reader = main_module.get_reader_dependency() - - # Use a no-redirect client so the POST response is inspected directly; the - # redirect target (/feed?feed_url=…) would 404 because change_feed_url is mocked. - no_redirect_client = TestClient(app, follow_redirects=False) - - with ( - patch.object(real_reader, "get_entries", return_value=[mock_entry_a, mock_entry_b]) as mock_get_entries, - patch.object(real_reader, "set_entry_read") as mock_set_read, - patch.object(real_reader, "update_feed") as mock_update_feed, - patch.object(real_reader, "change_feed_url"), - ): - response = no_redirect_client.post( - url="/change_feed_url", - data={"old_feed_url": feed_url, "new_feed_url": new_feed_url}, - ) - assert response.status_code == 303, f"Expected 303 redirect, got {response.status_code}: {response.text}" - - # update_feed should have been called with the new URL. - mock_update_feed.assert_called_once_with(new_feed_url) - - # get_entries should have been called to fetch unread entries on the new URL. - mock_get_entries.assert_called_once_with(feed=new_feed_url, read=False) - - # Every returned entry should have been marked as read. - assert mock_set_read.call_count == 2, f"Expected 2 set_entry_read calls, got {mock_set_read.call_count}" - mock_set_read.assert_any_call(mock_entry_a, True) - mock_set_read.assert_any_call(mock_entry_b, True) - - # Cleanup. - client.post(url="/remove", data={"feed_url": feed_url}) - client.post(url="/remove", data={"feed_url": new_feed_url}) - - -def test_change_feed_url_empty_old_url_returns_400() -> None: - """Submitting an empty old_feed_url should return HTTP 400.""" - response: Response = client.post( - url="/change_feed_url", - data={"old_feed_url": " ", "new_feed_url": "https://example.com/feed.xml"}, - ) - assert response.status_code == 400, f"Expected 400 for empty old URL, got {response.status_code}" - - -def test_change_feed_url_empty_new_url_returns_400() -> None: - """Submitting a blank new_feed_url should return HTTP 400.""" - response: Response = client.post( - url="/change_feed_url", - data={"old_feed_url": feed_url, "new_feed_url": " "}, - ) - assert response.status_code == 400, f"Expected 400 for blank new URL, got {response.status_code}" - - -def test_change_feed_url_nonexistent_old_url_returns_404() -> None: - """Trying to rename a feed that does not exist should return HTTP 404.""" - non_existent = "https://does-not-exist.example.com/rss.xml" - # Make sure it really is absent. - client.post(url="/remove", data={"feed_url": non_existent}) - - response: Response = client.post( - url="/change_feed_url", - data={"old_feed_url": non_existent, "new_feed_url": "https://example.com/new.xml"}, - ) - assert response.status_code == 404, f"Expected 404 for non-existent feed, got {response.status_code}" - - -def test_change_feed_url_new_url_already_exists_returns_409() -> None: - """Changing to a URL that is already tracked should return HTTP 409.""" - second_feed_url = "https://lovinator.space/rss_test_small.xml" - - # Ensure both feeds are absent. - client.post(url="/remove", data={"feed_url": feed_url}) - client.post(url="/remove", data={"feed_url": second_feed_url}) - - # Ensure webhook exists. - client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) - client.post(url="/add_webhook", data={"webhook_name": webhook_name, "webhook_url": webhook_url}) - - # Add both feeds. - client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name}) - client.post(url="/add", data={"feed_url": second_feed_url, "webhook_dropdown": webhook_name}) - - # Try to rename one to the other. - response: Response = client.post( - url="/change_feed_url", - data={"old_feed_url": feed_url, "new_feed_url": second_feed_url}, - ) - assert response.status_code == 409, f"Expected 409 when new URL already exists, got {response.status_code}" - - # Cleanup. - client.post(url="/remove", data={"feed_url": feed_url}) - client.post(url="/remove", data={"feed_url": second_feed_url}) - - -def test_change_feed_url_same_url_redirects_without_error() -> None: - """Changing a feed's URL to itself should redirect cleanly without any error.""" - # Ensure webhook exists. - client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) - client.post(url="/add_webhook", data={"webhook_name": webhook_name, "webhook_url": webhook_url}) - - # Add the feed. - client.post(url="/remove", data={"feed_url": feed_url}) - response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name}) - assert response.status_code == 200, f"Failed to add feed: {response.text}" - - # Submit the same URL as both old and new. - response = client.post( - url="/change_feed_url", - data={"old_feed_url": feed_url, "new_feed_url": feed_url}, - ) - assert response.status_code == 200, f"Expected 200 redirect for same URL, got {response.status_code}" - - # Feed should still be accessible. - response = client.get(url="/feed", params={"feed_url": feed_url}) - assert response.status_code == 200, f"Feed should still exist after no-op URL change: {response.text}" - - # Cleanup. - client.post(url="/remove", data={"feed_url": feed_url}) - - def test_delete_webhook() -> None: """Test the /delete_webhook page.""" # Remove the feed if it already exists before we run the test. @@ -485,110 +335,6 @@ def test_update_feed_not_found() -> None: assert "Feed not found" in response.text -def test_post_entry_send_to_discord() -> None: - """Test that /post_entry sends an entry to Discord and redirects to the feed page. - - Regression test for the bug where the injected reader was not passed to - send_entry_to_discord, meaning the dependency-injected reader was silently ignored. - """ - # Ensure webhook and feed exist. - client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) - response: Response = client.post( - url="/add_webhook", - data={"webhook_name": webhook_name, "webhook_url": webhook_url}, - ) - assert response.status_code == 200, f"Failed to add webhook: {response.text}" - - client.post(url="/remove", data={"feed_url": feed_url}) - response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name}) - assert response.status_code == 200, f"Failed to add feed: {response.text}" - - # Retrieve an entry from the feed to get a valid entry ID. - reader: main_module.Reader = main_module.get_reader_dependency() - entries: list[Entry] = list(reader.get_entries(feed=feed_url, limit=1)) - assert entries, "Feed should have at least one entry to send" - entry_to_send: main_module.Entry = entries[0] - encoded_id: str = urllib.parse.quote(entry_to_send.id) - - no_redirect_client = TestClient(app, follow_redirects=False) - - # Patch execute_webhook so no real HTTP requests are made to Discord. - with patch("discord_rss_bot.feeds.execute_webhook") as mock_execute: - response = no_redirect_client.get( - url="/post_entry", - params={"entry_id": encoded_id, "feed_url": urllib.parse.quote(feed_url)}, - ) - - assert response.status_code == 303, f"Expected redirect after sending, got {response.status_code}: {response.text}" - location: str = response.headers.get("location", "") - assert "feed?feed_url=" in location, f"Should redirect to feed page, got: {location}" - assert mock_execute.called, "execute_webhook should have been called to deliver the entry to Discord" - - # Cleanup. - client.post(url="/remove", data={"feed_url": feed_url}) - - -def test_post_entry_unknown_id_returns_404() -> None: - """Test that /post_entry returns 404 when the entry ID does not exist.""" - response: Response = client.get( - url="/post_entry", - params={"entry_id": "https://nonexistent.example.com/entry-that-does-not-exist"}, - ) - assert response.status_code == 404, f"Expected 404 for unknown entry, got {response.status_code}" - - -def test_post_entry_uses_feed_url_to_disambiguate_duplicate_ids() -> None: - """When IDs collide across feeds, /post_entry should pick the entry from provided feed_url.""" - - @dataclass(slots=True) - class DummyFeed: - url: str - - @dataclass(slots=True) - class DummyEntry: - id: str - feed: DummyFeed - feed_url: str - - feed_a = "https://example.com/feed-a.xml" - feed_b = "https://example.com/feed-b.xml" - shared_id = "https://example.com/shared-entry-id" - - entry_a: Entry = cast("Entry", DummyEntry(id=shared_id, feed=DummyFeed(feed_a), feed_url=feed_a)) - entry_b: Entry = cast("Entry", DummyEntry(id=shared_id, feed=DummyFeed(feed_b), feed_url=feed_b)) - - class StubReader: - def get_entries(self, feed: str | None = None) -> list[Entry]: - if feed == feed_a: - return [entry_a] - if feed == feed_b: - return [entry_b] - return [entry_a, entry_b] - - selected_feed_urls: list[str] = [] - - def fake_send_entry_to_discord(entry: Entry, reader: object) -> None: - selected_feed_urls.append(entry.feed.url) - - app.dependency_overrides[get_reader_dependency] = StubReader - no_redirect_client = TestClient(app, follow_redirects=False) - - try: - with patch("discord_rss_bot.main.send_entry_to_discord", side_effect=fake_send_entry_to_discord): - response: Response = no_redirect_client.get( - url="/post_entry", - params={"entry_id": urllib.parse.quote(shared_id), "feed_url": urllib.parse.quote(feed_b)}, - ) - - assert response.status_code == 303, f"Expected redirect after sending, got {response.status_code}" - assert selected_feed_urls == [feed_b], f"Expected feed-b entry, got: {selected_feed_urls}" - - location = response.headers.get("location", "") - assert urllib.parse.quote(feed_b) in location, f"Expected redirect to feed-b page, got: {location}" - finally: - app.dependency_overrides = {} - - def test_navbar_backup_link_hidden_when_not_configured(monkeypatch: pytest.MonkeyPatch) -> None: """Test that the backup link is not shown in the navbar when GIT_BACKUP_PATH is not set.""" # Ensure GIT_BACKUP_PATH is not set @@ -827,24 +573,11 @@ def test_create_html_marks_entries_from_another_feed(monkeypatch: pytest.MonkeyP original_feed_url="https://example.com/feed-b.xml", ) - monkeypatch.setattr( - "discord_rss_bot.main.replace_tags_in_text_message", - lambda _entry, **_kwargs: "Rendered content", - ) - monkeypatch.setattr("discord_rss_bot.main.entry_is_blacklisted", lambda _entry, **_kwargs: False) - monkeypatch.setattr("discord_rss_bot.main.entry_is_whitelisted", lambda _entry, **_kwargs: False) + monkeypatch.setattr("discord_rss_bot.main.replace_tags_in_text_message", lambda _entry: "Rendered content") + monkeypatch.setattr("discord_rss_bot.main.entry_is_blacklisted", lambda _entry: False) + monkeypatch.setattr("discord_rss_bot.main.entry_is_whitelisted", lambda _entry: False) - same_feed_entry_typed: Entry = cast("Entry", same_feed_entry) - other_feed_entry_typed: Entry = cast("Entry", other_feed_entry) - - html: str = create_html_for_feed( - reader=MagicMock(), - current_feed_url=selected_feed_url, - entries=[ - same_feed_entry_typed, - other_feed_entry_typed, - ], - ) + html = create_html_for_feed(cast("list[Entry]", [same_feed_entry, other_feed_entry]), selected_feed_url) assert "From another feed: https://example.com/feed-b.xml" in html assert "From another feed: https://example.com/feed-a.xml" not in html @@ -887,32 +620,6 @@ def test_webhook_entries_no_feeds() -> None: assert "No feeds found" in response.text or "Add feeds" in response.text, "Expected message about no feeds" -def test_webhook_entries_no_feeds_still_shows_webhook_settings() -> None: - """The webhook detail view should show settings/actions even with no attached feeds.""" - client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) - response: Response = client.post( - url="/add_webhook", - data={"webhook_name": webhook_name, "webhook_url": webhook_url}, - ) - assert response.status_code == 200, f"Failed to add webhook: {response.text}" - - response = client.get( - url="/webhook_entries", - params={"webhook_url": webhook_url}, - ) - - assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}" - assert "Settings" in response.text, "Expected settings card on webhook detail view" - assert "Modify Webhook" in response.text, "Expected modify form on webhook detail view" - assert "Delete Webhook" in response.text, "Expected delete action on webhook detail view" - assert "Back to dashboard" in response.text, "Expected dashboard navigation link" - assert "All webhooks" in response.text, "Expected all webhooks navigation link" - assert f'name="old_hook" value="{webhook_url}"' in response.text, "Expected old_hook hidden input" - assert f'value="/webhook_entries?webhook_url={urllib.parse.quote(webhook_url)}"' in response.text, ( - "Expected modify form to redirect back to the current webhook detail view" - ) - - def test_webhook_entries_with_feeds_no_entries() -> None: """Test webhook_entries endpoint when webhook has feeds but no entries yet.""" # Clean up and create fresh webhook @@ -974,38 +681,6 @@ def test_webhook_entries_with_entries() -> None: assert webhook_name in response.text, "Webhook name not found in response" # Should show entries (the feed has entries) assert "total from" in response.text, "Expected to see entry count" - assert "Modify Webhook" in response.text, "Expected webhook settings to be visible" - assert "Attached feeds" in response.text, "Expected attached feeds section to be visible" - - -def test_webhook_entries_shows_attached_feed_link() -> None: - """The webhook detail view should list attached feeds linking to their feed pages.""" - client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) - response: Response = client.post( - url="/add_webhook", - data={"webhook_name": webhook_name, "webhook_url": webhook_url}, - ) - assert response.status_code == 200, f"Failed to add webhook: {response.text}" - - client.post(url="/remove", data={"feed_url": feed_url}) - response = client.post( - url="/add", - data={"feed_url": feed_url, "webhook_dropdown": webhook_name}, - ) - assert response.status_code == 200, f"Failed to add feed: {response.text}" - - response = client.get( - url="/webhook_entries", - params={"webhook_url": webhook_url}, - ) - - assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}" - assert f"/feed?feed_url={urllib.parse.quote(feed_url)}" in response.text, ( - "Expected attached feed to link to its feed detail page" - ) - assert "Latest entries" in response.text, "Expected latest entries heading on webhook detail view" - - client.post(url="/remove", data={"feed_url": feed_url}) def test_webhook_entries_multiple_feeds() -> None: @@ -1041,75 +716,6 @@ def test_webhook_entries_multiple_feeds() -> None: client.post(url="/remove", data={"feed_url": feed_url}) -def test_webhook_entries_sort_newest_and_non_null_published_first() -> None: - """Webhook entries should be sorted newest-first with published=None entries placed last.""" - - @dataclass(slots=True) - class DummyFeed: - url: str - title: str | None = None - updates_enabled: bool = True - last_exception: None = None - - @dataclass(slots=True) - class DummyEntry: - id: str - feed: DummyFeed - published: datetime | None - - dummy_feed = DummyFeed(url="https://example.com/feed.xml", title="Example Feed") - - # Intentionally unsorted input with two dated entries and two undated entries. - unsorted_entries: list[Entry] = [ - cast("Entry", DummyEntry(id="old", feed=dummy_feed, published=datetime(2024, 1, 1, tzinfo=UTC))), - cast("Entry", DummyEntry(id="none-1", feed=dummy_feed, published=None)), - cast("Entry", DummyEntry(id="new", feed=dummy_feed, published=datetime(2024, 2, 1, tzinfo=UTC))), - cast("Entry", DummyEntry(id="none-2", feed=dummy_feed, published=None)), - ] - - class StubReader: - def get_tag(self, resource: object, key: str, default: object = None) -> object: - if resource == () and key == "webhooks": - return [{"name": webhook_name, "url": webhook_url}] - if key == "webhook" and isinstance(resource, str): - return webhook_url - return default - - def get_feeds(self) -> list[DummyFeed]: - return [dummy_feed] - - def get_entries(self, **_kwargs: object) -> list[Entry]: - return unsorted_entries - - observed_order: list[str] = [] - - def capture_entries(*, reader: object, entries: list[Entry], current_feed_url: str = "") -> str: - del reader, current_feed_url - observed_order.extend(entry.id for entry in entries) - return "" - - app.dependency_overrides[get_reader_dependency] = StubReader - try: - with ( - patch( - "discord_rss_bot.main.get_data_from_hook_url", - return_value=main_module.WebhookInfo(custom_name=webhook_name, url=webhook_url), - ), - patch("discord_rss_bot.main.create_html_for_feed", side_effect=capture_entries), - ): - response: Response = client.get( - url="/webhook_entries", - params={"webhook_url": webhook_url}, - ) - - assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}" - assert observed_order == ["new", "old", "none-1", "none-2"], ( - "Expected newest published entries first and published=None entries last" - ) - finally: - app.dependency_overrides = {} - - def test_webhook_entries_pagination() -> None: """Test webhook_entries endpoint pagination functionality.""" # Clean up and create webhook @@ -1177,445 +783,3 @@ def test_webhook_entries_url_encoding() -> None: # Clean up client.post(url="/remove", data={"feed_url": feed_url}) - - -def test_dashboard_webhook_name_links_to_webhook_detail() -> None: - """Webhook names on the dashboard should open the webhook detail view.""" - client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) - response: Response = client.post( - url="/add_webhook", - data={"webhook_name": webhook_name, "webhook_url": webhook_url}, - ) - assert response.status_code == 200, f"Failed to add webhook: {response.text}" - - client.post(url="/remove", data={"feed_url": feed_url}) - response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name}) - assert response.status_code == 200, f"Failed to add feed: {response.text}" - - response = client.get(url="/") - assert response.status_code == 200, f"Failed to get /: {response.text}" - - expected_link = f"/webhook_entries?webhook_url={urllib.parse.quote(webhook_url)}" - assert expected_link in response.text, "Expected dashboard webhook link to point to the webhook detail view" - - client.post(url="/remove", data={"feed_url": feed_url}) - - -def test_modify_webhook_redirects_back_to_webhook_detail() -> None: - """Webhook updates from the detail view should redirect back to that view with the new URL.""" - original_webhook_url = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz" - new_webhook_url = "https://discord.com/api/webhooks/1234567890/updated-token" - - client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url}) - client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url}) - - -def test_modify_webhook_triggers_git_backup_commit() -> None: - """Modifying a webhook URL should record a state change for git backup.""" - original_webhook_url = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz" - new_webhook_url = "https://discord.com/api/webhooks/1234567890/updated-token" - - client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url}) - client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url}) - - response: Response = client.post( - url="/add_webhook", - data={"webhook_name": webhook_name, "webhook_url": original_webhook_url}, - ) - assert response.status_code == 200, f"Failed to add webhook: {response.text}" - - no_redirect_client = TestClient(app, follow_redirects=False) - with patch("discord_rss_bot.main.commit_state_change") as mock_commit_state_change: - response = no_redirect_client.post( - url="/modify_webhook", - data={ - "old_hook": original_webhook_url, - "new_hook": new_webhook_url, - "redirect_to": f"/webhook_entries?webhook_url={urllib.parse.quote(original_webhook_url)}", - }, - ) - - assert response.status_code == 303, f"Expected 303 redirect, got {response.status_code}: {response.text}" - assert mock_commit_state_change.call_count == 1, "Expected webhook modification to trigger git backup commit" - - client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url}) - - response = client.post( - url="/add_webhook", - data={"webhook_name": webhook_name, "webhook_url": original_webhook_url}, - ) - assert response.status_code == 200, f"Failed to add webhook: {response.text}" - - no_redirect_client = TestClient(app, follow_redirects=False) - response = no_redirect_client.post( - url="/modify_webhook", - data={ - "old_hook": original_webhook_url, - "new_hook": new_webhook_url, - "redirect_to": f"/webhook_entries?webhook_url={urllib.parse.quote(original_webhook_url)}", - }, - ) - - assert response.status_code == 303, f"Expected 303 redirect, got {response.status_code}: {response.text}" - assert response.headers["location"] == (f"/webhook_entries?webhook_url={urllib.parse.quote(new_webhook_url)}"), ( - f"Unexpected redirect location: {response.headers['location']}" - ) - - client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url}) - - -def test_webhook_entries_mass_update_preview_shows_old_and_new_urls() -> None: - """Preview should list old->new feed URLs for webhook bulk replacement.""" - - @dataclass(slots=True) - class DummyFeed: - url: str - title: str | None = None - updates_enabled: bool = True - last_exception: None = None - - class StubReader: - def __init__(self) -> None: - self._feeds: list[DummyFeed] = [ - DummyFeed(url="https://old.example.com/rss/a.xml", title="A"), - DummyFeed(url="https://old.example.com/rss/b.xml", title="B"), - DummyFeed(url="https://unchanged.example.com/rss/c.xml", title="C"), - ] - - def get_tag(self, resource: object, key: str, default: object = None) -> object: - if resource == () and key == "webhooks": - return [{"name": webhook_name, "url": webhook_url}] - if key == "webhook" and isinstance(resource, str): - if resource.startswith("https://old.example.com"): - return webhook_url - if resource.startswith("https://unchanged.example.com"): - return webhook_url - return default - - def get_feeds(self) -> list[DummyFeed]: - return self._feeds - - def get_entries(self, **_kwargs: object) -> list[Entry]: - return [] - - app.dependency_overrides[get_reader_dependency] = StubReader - try: - with ( - patch( - "discord_rss_bot.main.get_data_from_hook_url", - return_value=main_module.WebhookInfo(custom_name=webhook_name, url=webhook_url), - ), - patch( - "discord_rss_bot.main.resolve_final_feed_url", - side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None), - ), - ): - response: Response = client.get( - url="/webhook_entries", - params={ - "webhook_url": webhook_url, - "replace_from": "old.example.com", - "replace_to": "new.example.com", - "resolve_urls": "true", - }, - ) - - assert response.status_code == 200, f"Failed to get preview: {response.text}" - assert "Mass update feed URLs" in response.text - assert "old.example.com/rss/a.xml" in response.text - assert "new.example.com/rss/a.xml" in response.text - assert "Will update" in response.text - assert "Matched: 2" in response.text - assert "Will update: 2" in response.text - finally: - app.dependency_overrides = {} - - -def test_bulk_change_feed_urls_updates_matching_feeds() -> None: - """Mass updater should change all matching feed URLs for a webhook.""" - - @dataclass(slots=True) - class DummyFeed: - url: str - - class StubReader: - def __init__(self) -> None: - self._feeds = [ - DummyFeed(url="https://old.example.com/rss/a.xml"), - DummyFeed(url="https://old.example.com/rss/b.xml"), - DummyFeed(url="https://unchanged.example.com/rss/c.xml"), - ] - self.change_calls: list[tuple[str, str]] = [] - self.updated_feeds: list[str] = [] - - def get_tag(self, resource: object, key: str, default: object = None) -> object: - if resource == () and key == "webhooks": - return [{"name": webhook_name, "url": webhook_url}] - if key == "webhook" and isinstance(resource, str): - return webhook_url - return default - - def get_feeds(self) -> list[DummyFeed]: - return self._feeds - - def change_feed_url(self, old_url: str, new_url: str) -> None: - self.change_calls.append((old_url, new_url)) - - def update_feed(self, feed_url: str) -> None: - self.updated_feeds.append(feed_url) - - def get_entries(self, **_kwargs: object) -> list[Entry]: - return [] - - def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001 - return - - stub_reader = StubReader() - app.dependency_overrides[get_reader_dependency] = lambda: stub_reader - no_redirect_client = TestClient(app, follow_redirects=False) - - try: - with patch( - "discord_rss_bot.main.resolve_final_feed_url", - side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None), - ): - response: Response = no_redirect_client.post( - url="/bulk_change_feed_urls", - data={ - "webhook_url": webhook_url, - "replace_from": "old.example.com", - "replace_to": "new.example.com", - "resolve_urls": "true", - }, - ) - - assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}" - assert "Updated%202%20feed%20URL%28s%29" in response.headers.get("location", "") - assert sorted(stub_reader.change_calls) == sorted([ - ("https://old.example.com/rss/a.xml", "https://new.example.com/rss/a.xml"), - ("https://old.example.com/rss/b.xml", "https://new.example.com/rss/b.xml"), - ]) - assert sorted(stub_reader.updated_feeds) == sorted([ - "https://new.example.com/rss/a.xml", - "https://new.example.com/rss/b.xml", - ]) - finally: - app.dependency_overrides = {} - - -def test_webhook_entries_mass_update_preview_fragment_endpoint() -> None: - """HTMX preview endpoint should render only the mass-update preview fragment.""" - - @dataclass(slots=True) - class DummyFeed: - url: str - title: str | None = None - updates_enabled: bool = True - last_exception: None = None - - class StubReader: - def __init__(self) -> None: - self._feeds: list[DummyFeed] = [ - DummyFeed(url="https://old.example.com/rss/a.xml", title="A"), - DummyFeed(url="https://old.example.com/rss/b.xml", title="B"), - ] - - def get_tag(self, resource: object, key: str, default: object = None) -> object: - if key == "webhook" and isinstance(resource, str): - return webhook_url - return default - - def get_feeds(self) -> list[DummyFeed]: - return self._feeds - - app.dependency_overrides[get_reader_dependency] = StubReader - try: - with patch( - "discord_rss_bot.main.resolve_final_feed_url", - side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None), - ): - response: Response = client.get( - url="/webhook_entries_mass_update_preview", - params={ - "webhook_url": webhook_url, - "replace_from": "old.example.com", - "replace_to": "new.example.com", - "resolve_urls": "true", - }, - ) - - assert response.status_code == 200, f"Failed to get HTMX preview fragment: {response.text}" - assert "Will update: 2" in response.text - assert " None: # noqa: C901 - """Force update should overwrite conflicting target URLs instead of skipping them.""" - - @dataclass(slots=True) - class DummyFeed: - url: str - - class StubReader: - def __init__(self) -> None: - self._feeds = [ - DummyFeed(url="https://old.example.com/rss/a.xml"), - DummyFeed(url="https://new.example.com/rss/a.xml"), - ] - self.delete_calls: list[str] = [] - self.change_calls: list[tuple[str, str]] = [] - - def get_tag(self, resource: object, key: str, default: object = None) -> object: - if resource == () and key == "webhooks": - return [{"name": webhook_name, "url": webhook_url}] - if key == "webhook" and isinstance(resource, str): - return webhook_url - return default - - def get_feeds(self) -> list[DummyFeed]: - return self._feeds - - def delete_feed(self, feed_url: str) -> None: - self.delete_calls.append(feed_url) - - def change_feed_url(self, old_url: str, new_url: str) -> None: - self.change_calls.append((old_url, new_url)) - - def update_feed(self, _feed_url: str) -> None: - return - - def get_entries(self, **_kwargs: object) -> list[Entry]: - return [] - - def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001 - return - - stub_reader = StubReader() - app.dependency_overrides[get_reader_dependency] = lambda: stub_reader - no_redirect_client = TestClient(app, follow_redirects=False) - - try: - with patch( - "discord_rss_bot.main.resolve_final_feed_url", - side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None), - ): - response: Response = no_redirect_client.post( - url="/bulk_change_feed_urls", - data={ - "webhook_url": webhook_url, - "replace_from": "old.example.com", - "replace_to": "new.example.com", - "resolve_urls": "true", - "force_update": "true", - }, - ) - - assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}" - assert stub_reader.delete_calls == ["https://new.example.com/rss/a.xml"] - assert stub_reader.change_calls == [ - ( - "https://old.example.com/rss/a.xml", - "https://new.example.com/rss/a.xml", - ), - ] - assert "Force%20overwrote%201" in response.headers.get("location", "") - finally: - app.dependency_overrides = {} - - -def test_bulk_change_feed_urls_force_update_ignores_resolution_error() -> None: - """Force update should proceed even when URL resolution returns an error (e.g. HTTP 404).""" - - @dataclass(slots=True) - class DummyFeed: - url: str - - class StubReader: - def __init__(self) -> None: - self._feeds = [ - DummyFeed(url="https://old.example.com/rss/a.xml"), - ] - self.change_calls: list[tuple[str, str]] = [] - - def get_tag(self, resource: object, key: str, default: object = None) -> object: - if resource == () and key == "webhooks": - return [{"name": webhook_name, "url": webhook_url}] - if key == "webhook" and isinstance(resource, str): - return webhook_url - return default - - def get_feeds(self) -> list[DummyFeed]: - return self._feeds - - def change_feed_url(self, old_url: str, new_url: str) -> None: - self.change_calls.append((old_url, new_url)) - - def update_feed(self, _feed_url: str) -> None: - return - - def get_entries(self, **_kwargs: object) -> list[Entry]: - return [] - - def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001 - return - - stub_reader = StubReader() - app.dependency_overrides[get_reader_dependency] = lambda: stub_reader - no_redirect_client = TestClient(app, follow_redirects=False) - - try: - with patch( - "discord_rss_bot.main.resolve_final_feed_url", - return_value=("https://new.example.com/rss/a.xml", "HTTP 404"), - ): - response: Response = no_redirect_client.post( - url="/bulk_change_feed_urls", - data={ - "webhook_url": webhook_url, - "replace_from": "old.example.com", - "replace_to": "new.example.com", - "resolve_urls": "true", - "force_update": "true", - }, - ) - - assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}" - assert stub_reader.change_calls == [ - ( - "https://old.example.com/rss/a.xml", - "https://new.example.com/rss/a.xml", - ), - ] - location = response.headers.get("location", "") - assert "Updated%201%20feed%20URL%28s%29" in location - assert "Failed%200" in location - finally: - app.dependency_overrides = {} - - -def test_reader_dependency_override_is_used() -> None: - """Reader should be injectable and overridable via FastAPI dependency overrides.""" - - class StubReader: - def get_tag(self, _resource: str, _key: str, default: str | None = None) -> str | None: - """Stub get_tag that always returns the default value. - - Args: - _resource: Ignored. - _key: Ignored. - default: The value to return. - - Returns: - The default value, simulating a missing tag. - """ - return default - - app.dependency_overrides[get_reader_dependency] = StubReader - try: - response: Response = client.get(url="/add") - assert response.status_code == 200, f"Expected /add to render with overridden reader: {response.text}" - finally: - app.dependency_overrides = {} diff --git a/tests/test_search.py b/tests/test_search.py index 77681cf..ada1464 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -46,7 +46,7 @@ def test_create_search_context() -> None: reader.update_search() # Create the search context. - context: dict = create_search_context("test", reader=reader) + context: dict = create_search_context("test", custom_reader=reader) assert context is not None, f"The context should not be None. Got: {context}" # Close the reader, so we can delete the directory. diff --git a/tests/test_settings.py b/tests/test_settings.py index bcab720..5a54094 100644 --- a/tests/test_settings.py +++ b/tests/test_settings.py @@ -22,12 +22,12 @@ def test_reader() -> None: Path.mkdir(Path(temp_dir), exist_ok=True) custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite") - reader: Reader = get_reader(custom_location=str(custom_loc)) - assert_msg = f"The custom reader should be an instance of Reader. But it was '{type(reader)}'." - assert isinstance(reader, Reader), assert_msg + custom_reader: Reader = get_reader(custom_location=str(custom_loc)) + assert_msg = f"The custom reader should be an instance of Reader. But it was '{type(custom_reader)}'." + assert isinstance(custom_reader, Reader), assert_msg # Close the reader, so we can delete the directory. - reader.close() + custom_reader.close() def test_data_dir() -> None: @@ -49,16 +49,16 @@ def test_get_webhook_for_entry() -> None: Path.mkdir(Path(temp_dir), exist_ok=True) custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite") - reader: Reader = get_reader(custom_location=str(custom_loc)) + custom_reader: Reader = get_reader(custom_location=str(custom_loc)) # Add a feed to the database. - reader.add_feed("https://www.reddit.com/r/movies.rss") - reader.update_feed("https://www.reddit.com/r/movies.rss") + custom_reader.add_feed("https://www.reddit.com/r/movies.rss") + custom_reader.update_feed("https://www.reddit.com/r/movies.rss") # Add a webhook to the database. - reader.set_tag("https://www.reddit.com/r/movies.rss", "webhook", "https://example.com") # pyright: ignore[reportArgumentType] - our_tag = reader.get_tag("https://www.reddit.com/r/movies.rss", "webhook") # pyright: ignore[reportArgumentType] + custom_reader.set_tag("https://www.reddit.com/r/movies.rss", "webhook", "https://example.com") # pyright: ignore[reportArgumentType] + our_tag = custom_reader.get_tag("https://www.reddit.com/r/movies.rss", "webhook") # pyright: ignore[reportArgumentType] assert our_tag == "https://example.com", f"The tag should be 'https://example.com'. But it was '{our_tag}'." # Close the reader, so we can delete the directory. - reader.close() + custom_reader.close() diff --git a/tests/test_whitelist.py b/tests/test_whitelist.py index 6e911fe..462d652 100644 --- a/tests/test_whitelist.py +++ b/tests/test_whitelist.py @@ -37,7 +37,7 @@ def test_has_white_tags() -> None: reader.update_feeds() # Test feed without any whitelist tags - assert has_white_tags(reader=get_reader(), feed=feed) is False, "Feed should not have any whitelist tags" + assert has_white_tags(custom_reader=get_reader(), feed=feed) is False, "Feed should not have any whitelist tags" check_if_has_tag(reader, feed, "whitelist_title") check_if_has_tag(reader, feed, "whitelist_summary") @@ -56,9 +56,9 @@ def test_has_white_tags() -> None: def check_if_has_tag(reader: Reader, feed: Feed, whitelist_name: str) -> None: reader.set_tag(feed, whitelist_name, "a") # pyright: ignore[reportArgumentType] - assert has_white_tags(reader=reader, feed=feed) is True, "Feed should have whitelist tags" + assert has_white_tags(custom_reader=reader, feed=feed) is True, "Feed should have whitelist tags" reader.delete_tag(feed, whitelist_name) - assert has_white_tags(reader=reader, feed=feed) is False, "Feed should not have any whitelist tags" + assert has_white_tags(custom_reader=reader, feed=feed) is False, "Feed should not have any whitelist tags" def test_should_be_sent() -> None: