Compare commits

..

10 commits

24 changed files with 1902 additions and 573 deletions

View file

@ -8,15 +8,11 @@ from discord_rss_bot.filter.blacklist import entry_should_be_skipped
from discord_rss_bot.filter.blacklist import feed_has_blacklist_tags
from discord_rss_bot.filter.whitelist import has_white_tags
from discord_rss_bot.filter.whitelist import should_be_sent
from discord_rss_bot.settings import get_reader
if TYPE_CHECKING:
from reader import Entry
from reader import Reader
# Our reader
reader: Reader = get_reader()
@lru_cache
def encode_url(url_to_quote: str) -> str:
@ -34,11 +30,12 @@ def encode_url(url_to_quote: str) -> str:
return urllib.parse.quote(string=url_to_quote) if url_to_quote else ""
def entry_is_whitelisted(entry_to_check: Entry) -> bool:
def entry_is_whitelisted(entry_to_check: Entry, reader: Reader) -> bool:
"""Check if the entry is whitelisted.
Args:
entry_to_check: The feed to check.
reader: Custom Reader instance.
Returns:
bool: True if the feed is whitelisted, False otherwise.
@ -47,11 +44,12 @@ def entry_is_whitelisted(entry_to_check: Entry) -> bool:
return bool(has_white_tags(reader, entry_to_check.feed) and should_be_sent(reader, entry_to_check))
def entry_is_blacklisted(entry_to_check: Entry) -> bool:
def entry_is_blacklisted(entry_to_check: Entry, reader: Reader) -> bool:
"""Check if the entry is blacklisted.
Args:
entry_to_check: The feed to check.
reader: Custom Reader instance.
Returns:
bool: True if the feed is blacklisted, False otherwise.

View file

@ -5,17 +5,18 @@ import json
import logging
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING
from bs4 import BeautifulSoup
from bs4 import Tag
from markdownify import markdownify
from reader import Entry
from reader import Feed
from reader import Reader
from reader import TagNotFoundError
from discord_rss_bot.is_url_valid import is_url_valid
from discord_rss_bot.settings import get_reader
if TYPE_CHECKING:
from reader import Entry
from reader import Feed
from reader import Reader
logger: logging.Logger = logging.getLogger(__name__)
@ -116,18 +117,18 @@ def format_entry_html_for_discord(text: str) -> str:
return _restore_discord_timestamp_tags(formatted_text, replacements)
def replace_tags_in_text_message(entry: Entry) -> str:
def replace_tags_in_text_message(entry: Entry, reader: Reader) -> str:
"""Replace tags in custom_message.
Args:
entry: The entry to get the tags from.
reader: Custom Reader instance.
Returns:
Returns the custom_message with the tags replaced.
"""
feed: Feed = entry.feed
custom_reader: Reader = get_reader()
custom_message: str = get_custom_message(feed=feed, custom_reader=custom_reader)
custom_message: str = get_custom_message(feed=feed, reader=reader)
content = ""
if entry.content:
@ -229,18 +230,18 @@ def get_first_image(summary: str | None, content: str | None) -> str:
return ""
def replace_tags_in_embed(feed: Feed, entry: Entry) -> CustomEmbed:
def replace_tags_in_embed(feed: Feed, entry: Entry, reader: Reader) -> CustomEmbed:
"""Replace tags in embed.
Args:
feed: The feed to get the tags from.
entry: The entry to get the tags from.
reader: Custom Reader instance.
Returns:
Returns the embed with the tags replaced.
"""
custom_reader: Reader = get_reader()
embed: CustomEmbed = get_embed(feed=feed, custom_reader=custom_reader)
embed: CustomEmbed = get_embed(feed=feed, reader=reader)
content = ""
if entry.content:
@ -331,31 +332,29 @@ def _replace_embed_tags(embed: CustomEmbed, template: str, replace_with: str) ->
embed.footer_icon_url = try_to_replace(embed.footer_icon_url, template, replace_with)
def get_custom_message(custom_reader: Reader, feed: Feed) -> str:
def get_custom_message(reader: Reader, feed: Feed) -> str:
"""Get custom_message tag from feed.
Args:
custom_reader: What Reader to use.
reader: What Reader to use.
feed: The feed to get the tag from.
Returns:
Returns the contents from the custom_message tag.
"""
try:
custom_message: str = str(custom_reader.get_tag(feed, "custom_message"))
except TagNotFoundError:
custom_message = ""
custom_message: str = str(reader.get_tag(feed, "custom_message", ""))
except ValueError:
custom_message = ""
return custom_message
def save_embed(custom_reader: Reader, feed: Feed, embed: CustomEmbed) -> None:
def save_embed(reader: Reader, feed: Feed, embed: CustomEmbed) -> None:
"""Set embed tag in feed.
Args:
custom_reader: What Reader to use.
reader: What Reader to use.
feed: The feed to set the tag in.
embed: The embed to set.
"""
@ -371,20 +370,20 @@ def save_embed(custom_reader: Reader, feed: Feed, embed: CustomEmbed) -> None:
"footer_text": embed.footer_text,
"footer_icon_url": embed.footer_icon_url,
}
custom_reader.set_tag(feed, "embed", json.dumps(embed_dict)) # pyright: ignore[reportArgumentType]
reader.set_tag(feed, "embed", json.dumps(embed_dict)) # pyright: ignore[reportArgumentType]
def get_embed(custom_reader: Reader, feed: Feed) -> CustomEmbed:
def get_embed(reader: Reader, feed: Feed) -> CustomEmbed:
"""Get embed tag from feed.
Args:
custom_reader: What Reader to use.
reader: What Reader to use.
feed: The feed to get the tag from.
Returns:
Returns the contents from the embed tag.
"""
embed = custom_reader.get_tag(feed, "embed", "")
embed = reader.get_tag(feed, "embed", "")
if embed:
if not isinstance(embed, str):

View file

@ -23,7 +23,6 @@ from reader import FeedNotFoundError
from reader import Reader
from reader import ReaderError
from reader import StorageError
from reader import TagNotFoundError
from discord_rss_bot.custom_message import CustomEmbed
from discord_rss_bot.custom_message import get_custom_message
@ -37,7 +36,6 @@ from discord_rss_bot.hoyolab_api import extract_post_id_from_hoyolab_url
from discord_rss_bot.hoyolab_api import fetch_hoyolab_post
from discord_rss_bot.hoyolab_api import is_c3kay_feed
from discord_rss_bot.is_url_valid import is_url_valid
from discord_rss_bot.missing_tags import add_missing_tags
from discord_rss_bot.settings import default_custom_message
from discord_rss_bot.settings import get_reader
@ -98,26 +96,23 @@ def extract_domain(url: str) -> str: # noqa: PLR0911
return "Other"
def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> str | None: # noqa: C901, PLR0912
def send_entry_to_discord(entry: Entry, reader: Reader) -> str | None: # noqa: C901
"""Send a single entry to Discord.
Args:
entry: The entry to send to Discord.
custom_reader: The reader to use. If None, the default reader will be used.
reader: The reader to use.
Returns:
str | None: The error message if there was an error, otherwise None.
"""
# Get the default reader if we didn't get a custom one.
reader: Reader = get_reader() if custom_reader is None else custom_reader
# Get the webhook URL for the entry.
webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook", ""))
if not webhook_url:
return "No webhook URL found."
# If https://discord.com/quests/<quest_id> is in the URL, send a separate message with the URL.
send_discord_quest_notification(entry, webhook_url)
send_discord_quest_notification(entry, webhook_url, reader=reader)
# Check if this is a c3kay feed
if is_c3kay_feed(entry.feed.url):
@ -128,7 +123,7 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) ->
post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id)
if post_data:
webhook = create_hoyolab_webhook(webhook_url, entry, post_data)
execute_webhook(webhook, entry)
execute_webhook(webhook, entry, reader=reader)
return None
logger.warning(
"Failed to create Hoyolab webhook for feed %s, falling back to regular processing",
@ -142,17 +137,14 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) ->
# Try to get the custom message for the feed. If the user has none, we will use the default message.
# This has to be a string for some reason so don't change it to "not custom_message.get_custom_message()"
if get_custom_message(reader, entry.feed) != "": # noqa: PLC1901
webhook_message: str = replace_tags_in_text_message(entry=entry)
webhook_message: str = replace_tags_in_text_message(entry=entry, reader=reader)
if not webhook_message:
webhook_message = "No message found."
# Create the webhook.
try:
should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed"))
except TagNotFoundError:
logger.exception("No should_send_embed tag found for feed: %s", entry.feed.url)
should_send_embed = True
should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed", True))
except StorageError:
logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url)
should_send_embed = True
@ -162,15 +154,15 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) ->
should_send_embed = False
if should_send_embed:
webhook = create_embed_webhook(webhook_url, entry)
webhook = create_embed_webhook(webhook_url, entry, reader=reader)
else:
webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True)
execute_webhook(webhook, entry)
execute_webhook(webhook, entry, reader=reader)
return None
def send_discord_quest_notification(entry: Entry, webhook_url: str) -> None:
def send_discord_quest_notification(entry: Entry, webhook_url: str, reader: Reader) -> None:
"""Send a separate message to Discord if the entry is a quest notification."""
quest_regex: re.Pattern[str] = re.compile(r"https://discord\.com/quests/\d+")
@ -182,7 +174,7 @@ def send_discord_quest_notification(entry: Entry, webhook_url: str) -> None:
content=quest_url,
rate_limit_retry=True,
)
execute_webhook(webhook, entry)
execute_webhook(webhook, entry, reader=reader)
# Iterate through the content of the entry
for content in entry.content:
@ -240,12 +232,17 @@ def set_title(custom_embed: CustomEmbed, discord_embed: DiscordEmbed) -> None:
discord_embed.set_title(embed_title) if embed_title else None
def create_embed_webhook(webhook_url: str, entry: Entry) -> DiscordWebhook: # noqa: C901
def create_embed_webhook( # noqa: C901
webhook_url: str,
entry: Entry,
reader: Reader,
) -> DiscordWebhook:
"""Create a webhook with an embed.
Args:
webhook_url (str): The webhook URL.
entry (Entry): The entry to send to Discord.
reader (Reader): The Reader instance to use for getting embed data.
Returns:
DiscordWebhook: The webhook with the embed.
@ -254,7 +251,7 @@ def create_embed_webhook(webhook_url: str, entry: Entry) -> DiscordWebhook: # n
feed: Feed = entry.feed
# Get the embed data from the database.
custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry)
custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry, reader=reader)
discord_embed: DiscordEmbed = DiscordEmbed()
@ -316,13 +313,14 @@ def get_webhook_url(reader: Reader, entry: Entry) -> str:
str: The webhook URL.
"""
try:
webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook"))
except TagNotFoundError:
logger.exception("No webhook URL found for feed: %s", entry.feed.url)
return ""
webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook", ""))
except StorageError:
logger.exception("Storage error getting webhook URL for feed: %s", entry.feed.url)
return ""
if not webhook_url:
logger.error("No webhook URL found for feed: %s", entry.feed.url)
return ""
return webhook_url
@ -341,53 +339,53 @@ def set_entry_as_read(reader: Reader, entry: Entry) -> None:
logger.exception("Error setting entry to read: %s", entry.id)
def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: # noqa: C901, PLR0912
def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: # noqa: C901, PLR0912
"""Send entries to Discord.
If response was not ok, we will log the error and mark the entry as unread, so it will be sent again next time.
Args:
custom_reader: If we should use a custom reader instead of the default one.
reader: If we should use a custom reader instead of the default one.
feed: The feed to send to Discord.
do_once: If we should only send one entry. This is used in the test.
"""
logger.info("Starting to send entries to Discord.")
# Get the default reader if we didn't get a custom one.
reader: Reader = get_reader() if custom_reader is None else custom_reader
effective_reader: Reader = get_reader() if reader is None else reader
# Check for new entries for every feed.
reader.update_feeds(
effective_reader.update_feeds(
scheduled=True,
workers=os.cpu_count() or 1,
)
# Loop through the unread entries.
entries: Iterable[Entry] = reader.get_entries(feed=feed, read=False)
entries: Iterable[Entry] = effective_reader.get_entries(feed=feed, read=False)
for entry in entries:
set_entry_as_read(reader, entry)
set_entry_as_read(effective_reader, entry)
if entry.added < datetime.datetime.now(tz=entry.added.tzinfo) - datetime.timedelta(days=1):
logger.info("Entry is older than 24 hours: %s from %s", entry.id, entry.feed.url)
continue
webhook_url: str = get_webhook_url(reader, entry)
webhook_url: str = get_webhook_url(effective_reader, entry)
if not webhook_url:
logger.info("No webhook URL found for feed: %s", entry.feed.url)
continue
should_send_embed: bool = should_send_embed_check(reader, entry)
should_send_embed: bool = should_send_embed_check(effective_reader, entry)
# Youtube feeds only need to send the link
if is_youtube_feed(entry.feed.url):
should_send_embed = False
if should_send_embed:
webhook = create_embed_webhook(webhook_url, entry)
webhook = create_embed_webhook(webhook_url, entry, reader=effective_reader)
else:
# If the user has set the custom message to an empty string, we will use the default message, otherwise we
# will use the custom message.
if get_custom_message(reader, entry.feed) != "": # noqa: PLC1901
webhook_message = replace_tags_in_text_message(entry)
if get_custom_message(effective_reader, entry.feed) != "": # noqa: PLC1901
webhook_message = replace_tags_in_text_message(entry, reader=effective_reader)
else:
webhook_message: str = str(default_custom_message)
@ -397,12 +395,12 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non
webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True)
# Check if the entry is blacklisted, and if it is, we will skip it.
if entry_should_be_skipped(reader, entry):
if entry_should_be_skipped(effective_reader, entry):
logger.info("Entry was blacklisted: %s", entry.id)
continue
# Check if the feed has a whitelist, and if it does, check if the entry is whitelisted.
if has_white_tags(reader, entry.feed) and not should_be_sent(reader, entry):
if has_white_tags(effective_reader, entry.feed) and not should_be_sent(effective_reader, entry):
logger.info("Entry was not whitelisted: %s", entry.id)
continue
@ -415,7 +413,7 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non
post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id)
if post_data:
webhook = create_hoyolab_webhook(webhook_url, entry, post_data)
execute_webhook(webhook, entry)
execute_webhook(webhook, entry, reader=effective_reader)
return
logger.warning(
"Failed to create Hoyolab webhook for feed %s, falling back to regular processing",
@ -425,7 +423,7 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non
logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url)
# Send the entry to Discord as it is not blacklisted or feed has a whitelist.
execute_webhook(webhook, entry)
execute_webhook(webhook, entry, reader=effective_reader)
# If we only want to send one entry, we will break the loop. This is used when testing this function.
if do_once:
@ -433,16 +431,15 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non
break
def execute_webhook(webhook: DiscordWebhook, entry: Entry) -> None:
def execute_webhook(webhook: DiscordWebhook, entry: Entry, reader: Reader) -> None:
"""Execute the webhook.
Args:
webhook (DiscordWebhook): The webhook to execute.
entry (Entry): The entry to send to Discord.
reader (Reader): The Reader instance to use for checking feed status.
"""
reader: Reader = get_reader()
# If the feed has been paused or deleted, we will not send the entry to Discord.
entry_feed: Feed = entry.feed
if entry_feed.updates_enabled is False:
@ -493,10 +490,7 @@ def should_send_embed_check(reader: Reader, entry: Entry) -> bool:
return False
try:
should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed"))
except TagNotFoundError:
logger.exception("No should_send_embed tag found for feed: %s", entry.feed.url)
should_send_embed = True
should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed", True))
except ReaderError:
logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url)
should_send_embed = True
@ -551,9 +545,7 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None:
reader.add_feed(clean_feed_url)
except FeedExistsError:
# Add the webhook to an already added feed if it doesn't have a webhook instead of trying to create a new.
try:
reader.get_tag(clean_feed_url, "webhook")
except TagNotFoundError:
if not reader.get_tag(clean_feed_url, "webhook", ""):
reader.set_tag(clean_feed_url, "webhook", webhook_url) # pyright: ignore[reportArgumentType]
except ReaderError as e:
raise HTTPException(status_code=404, detail=f"Error adding feed: {e}") from e
@ -580,5 +572,3 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None:
# Update the full-text search index so our new feed is searchable.
reader.update_search()
add_missing_tags(reader)

View file

@ -11,7 +11,7 @@ if TYPE_CHECKING:
from reader import Reader
def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool:
def feed_has_blacklist_tags(reader: Reader, feed: Feed) -> bool:
"""Return True if the feed has blacklist tags.
The following tags are checked:
@ -25,21 +25,21 @@ def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool:
- regex_blacklist_title
Args:
custom_reader: The reader.
reader: The reader.
feed: The feed to check.
Returns:
bool: If the feed has any of the tags.
"""
blacklist_author: str = str(custom_reader.get_tag(feed, "blacklist_author", "")).strip()
blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", "")).strip()
blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", "")).strip()
blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", "")).strip()
blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", "")).strip()
blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", "")).strip()
blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", "")).strip()
blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", "")).strip()
regex_blacklist_author: str = str(custom_reader.get_tag(feed, "regex_blacklist_author", "")).strip()
regex_blacklist_content: str = str(custom_reader.get_tag(feed, "regex_blacklist_content", "")).strip()
regex_blacklist_summary: str = str(custom_reader.get_tag(feed, "regex_blacklist_summary", "")).strip()
regex_blacklist_title: str = str(custom_reader.get_tag(feed, "regex_blacklist_title", "")).strip()
regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", "")).strip()
regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", "")).strip()
regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", "")).strip()
regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", "")).strip()
return bool(
blacklist_title
@ -53,11 +53,11 @@ def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool:
)
def entry_should_be_skipped(custom_reader: Reader, entry: Entry) -> bool: # noqa: PLR0911
def entry_should_be_skipped(reader: Reader, entry: Entry) -> bool: # noqa: PLR0911
"""Return True if the entry is in the blacklist.
Args:
custom_reader: The reader.
reader: The reader.
entry: The entry to check.
Returns:
@ -65,15 +65,15 @@ def entry_should_be_skipped(custom_reader: Reader, entry: Entry) -> bool: # noq
"""
feed = entry.feed
blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", "")).strip()
blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", "")).strip()
blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", "")).strip()
blacklist_author: str = str(custom_reader.get_tag(feed, "blacklist_author", "")).strip()
blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", "")).strip()
blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", "")).strip()
blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", "")).strip()
blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", "")).strip()
regex_blacklist_title: str = str(custom_reader.get_tag(feed, "regex_blacklist_title", "")).strip()
regex_blacklist_summary: str = str(custom_reader.get_tag(feed, "regex_blacklist_summary", "")).strip()
regex_blacklist_content: str = str(custom_reader.get_tag(feed, "regex_blacklist_content", "")).strip()
regex_blacklist_author: str = str(custom_reader.get_tag(feed, "regex_blacklist_author", "")).strip()
regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", "")).strip()
regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", "")).strip()
regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", "")).strip()
regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", "")).strip()
# TODO(TheLovinator): Also add support for entry_text and more.
# Check regular blacklist

View file

@ -11,7 +11,7 @@ if TYPE_CHECKING:
from reader import Reader
def has_white_tags(custom_reader: Reader, feed: Feed) -> bool:
def has_white_tags(reader: Reader, feed: Feed) -> bool:
"""Return True if the feed has whitelist tags.
The following tags are checked:
@ -25,21 +25,21 @@ def has_white_tags(custom_reader: Reader, feed: Feed) -> bool:
- whitelist_title
Args:
custom_reader: The reader.
reader: The reader.
feed: The feed to check.
Returns:
bool: If the feed has any of the tags.
"""
whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")).strip()
whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")).strip()
whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")).strip()
whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", "")).strip()
whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", "")).strip()
whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", "")).strip()
whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", "")).strip()
whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", "")).strip()
regex_whitelist_title: str = str(custom_reader.get_tag(feed, "regex_whitelist_title", "")).strip()
regex_whitelist_summary: str = str(custom_reader.get_tag(feed, "regex_whitelist_summary", "")).strip()
regex_whitelist_content: str = str(custom_reader.get_tag(feed, "regex_whitelist_content", "")).strip()
regex_whitelist_author: str = str(custom_reader.get_tag(feed, "regex_whitelist_author", "")).strip()
regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", "")).strip()
regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", "")).strip()
regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", "")).strip()
regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", "")).strip()
return bool(
whitelist_title
@ -53,11 +53,11 @@ def has_white_tags(custom_reader: Reader, feed: Feed) -> bool:
)
def should_be_sent(custom_reader: Reader, entry: Entry) -> bool: # noqa: PLR0911
def should_be_sent(reader: Reader, entry: Entry) -> bool: # noqa: PLR0911
"""Return True if the entry is in the whitelist.
Args:
custom_reader: The reader.
reader: The reader.
entry: The entry to check.
Returns:
@ -65,16 +65,16 @@ def should_be_sent(custom_reader: Reader, entry: Entry) -> bool: # noqa: PLR091
"""
feed: Feed = entry.feed
# Regular whitelist tags
whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")).strip()
whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")).strip()
whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")).strip()
whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", "")).strip()
whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", "")).strip()
whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", "")).strip()
whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", "")).strip()
whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", "")).strip()
# Regex whitelist tags
regex_whitelist_title: str = str(custom_reader.get_tag(feed, "regex_whitelist_title", "")).strip()
regex_whitelist_summary: str = str(custom_reader.get_tag(feed, "regex_whitelist_summary", "")).strip()
regex_whitelist_content: str = str(custom_reader.get_tag(feed, "regex_whitelist_content", "")).strip()
regex_whitelist_author: str = str(custom_reader.get_tag(feed, "regex_whitelist_author", "")).strip()
regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", "")).strip()
regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", "")).strip()
regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", "")).strip()
regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", "")).strip()
# Check regular whitelist
if entry.title and whitelist_title and is_word_in_text(whitelist_title, entry.title):

View file

@ -30,8 +30,6 @@ from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
from reader import TagNotFoundError
if TYPE_CHECKING:
from reader import Reader
@ -176,21 +174,15 @@ def export_state(reader: Reader, backup_path: Path) -> None:
logger.exception("Failed to read tag '%s' for feed '%s' during state export", tag, feed.url)
feeds_state.append(feed_data)
try:
webhooks: list[str | int | float | bool | dict[str, Any] | list[Any] | None] = list(
reader.get_tag((), "webhooks", []),
)
except TagNotFoundError:
webhooks = []
# Export global update interval if set
global_update_interval: dict[str, Any] | None = None
try:
global_update_config = reader.get_tag((), ".reader.update", None)
if isinstance(global_update_config, dict):
global_update_interval = global_update_config
except TagNotFoundError:
pass
state: dict = {"feeds": feeds_state, "webhooks": webhooks}
if global_update_interval is not None:

View file

@ -19,6 +19,7 @@ import httpx
import sentry_sdk
import uvicorn
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import Depends
from fastapi import FastAPI
from fastapi import Form
from fastapi import HTTPException
@ -53,7 +54,7 @@ from discord_rss_bot.feeds import send_entry_to_discord
from discord_rss_bot.feeds import send_to_discord
from discord_rss_bot.git_backup import commit_state_change
from discord_rss_bot.git_backup import get_backup_path
from discord_rss_bot.missing_tags import add_missing_tags
from discord_rss_bot.is_url_valid import is_url_valid
from discord_rss_bot.search import create_search_context
from discord_rss_bot.settings import get_reader
@ -100,7 +101,16 @@ LOGGING_CONFIG: dict[str, Any] = {
logging.config.dictConfig(LOGGING_CONFIG)
logger: logging.Logger = logging.getLogger(__name__)
reader: Reader = get_reader()
def get_reader_dependency() -> Reader:
"""Provide the app Reader instance as a FastAPI dependency.
Returns:
Reader: The shared Reader instance.
"""
return get_reader()
# Time constants for relative time formatting
SECONDS_PER_MINUTE = 60
@ -146,7 +156,7 @@ def relative_time(dt: datetime | None) -> str:
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
"""Lifespan function for the FastAPI app."""
add_missing_tags(reader)
reader: Reader = get_reader()
scheduler: AsyncIOScheduler = AsyncIOScheduler(timezone=UTC)
scheduler.add_job(
func=send_to_discord,
@ -170,8 +180,6 @@ templates: Jinja2Templates = Jinja2Templates(directory="discord_rss_bot/template
# Add the filters to the Jinja2 environment so they can be used in html templates.
templates.env.filters["encode_url"] = lambda url: urllib.parse.quote(url) if url else ""
templates.env.filters["entry_is_whitelisted"] = entry_is_whitelisted
templates.env.filters["entry_is_blacklisted"] = entry_is_blacklisted
templates.env.filters["discord_markdown"] = markdownify
templates.env.filters["relative_time"] = relative_time
templates.env.globals["get_backup_path"] = get_backup_path
@ -181,12 +189,14 @@ templates.env.globals["get_backup_path"] = get_backup_path
async def post_add_webhook(
webhook_name: Annotated[str, Form()],
webhook_url: Annotated[str, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
) -> RedirectResponse:
"""Add a feed to the database.
Args:
webhook_name: The name of the webhook.
webhook_url: The url of the webhook.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the index page.
@ -219,11 +229,15 @@ async def post_add_webhook(
@app.post("/delete_webhook")
async def post_delete_webhook(webhook_url: Annotated[str, Form()]) -> RedirectResponse:
async def post_delete_webhook(
webhook_url: Annotated[str, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
) -> RedirectResponse:
"""Delete a webhook from the database.
Args:
webhook_url: The url of the webhook.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the index page.
@ -266,12 +280,14 @@ async def post_delete_webhook(webhook_url: Annotated[str, Form()]) -> RedirectRe
async def post_create_feed(
feed_url: Annotated[str, Form()],
webhook_dropdown: Annotated[str, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
) -> RedirectResponse:
"""Add a feed to the database.
Args:
feed_url: The feed to add.
webhook_dropdown: The webhook to use.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the feed page.
@ -283,11 +299,15 @@ async def post_create_feed(
@app.post("/pause")
async def post_pause_feed(feed_url: Annotated[str, Form()]) -> RedirectResponse:
async def post_pause_feed(
feed_url: Annotated[str, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
) -> RedirectResponse:
"""Pause a feed.
Args:
feed_url: The feed to pause.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the feed page.
@ -298,11 +318,15 @@ async def post_pause_feed(feed_url: Annotated[str, Form()]) -> RedirectResponse:
@app.post("/unpause")
async def post_unpause_feed(feed_url: Annotated[str, Form()]) -> RedirectResponse:
async def post_unpause_feed(
feed_url: Annotated[str, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
) -> RedirectResponse:
"""Unpause a feed.
Args:
feed_url: The Feed to unpause.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the feed page.
@ -314,6 +338,7 @@ async def post_unpause_feed(feed_url: Annotated[str, Form()]) -> RedirectRespons
@app.post("/whitelist")
async def post_set_whitelist(
reader: Annotated[Reader, Depends(get_reader_dependency)],
whitelist_title: Annotated[str, Form()] = "",
whitelist_summary: Annotated[str, Form()] = "",
whitelist_content: Annotated[str, Form()] = "",
@ -336,6 +361,7 @@ async def post_set_whitelist(
regex_whitelist_content: Whitelisted regex for when checking the content.
regex_whitelist_author: Whitelisted regex for when checking the author.
feed_url: The feed we should set the whitelist for.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the feed page.
@ -356,12 +382,17 @@ async def post_set_whitelist(
@app.get("/whitelist", response_class=HTMLResponse)
async def get_whitelist(feed_url: str, request: Request):
async def get_whitelist(
feed_url: str,
request: Request,
reader: Annotated[Reader, Depends(get_reader_dependency)],
):
"""Get the whitelist.
Args:
feed_url: What feed we should get the whitelist for.
request: The request object.
reader: The Reader instance.
Returns:
HTMLResponse: The whitelist page.
@ -395,6 +426,7 @@ async def get_whitelist(feed_url: str, request: Request):
@app.post("/blacklist")
async def post_set_blacklist(
reader: Annotated[Reader, Depends(get_reader_dependency)],
blacklist_title: Annotated[str, Form()] = "",
blacklist_summary: Annotated[str, Form()] = "",
blacklist_content: Annotated[str, Form()] = "",
@ -420,6 +452,7 @@ async def post_set_blacklist(
regex_blacklist_content: Blacklisted regex for when checking the content.
regex_blacklist_author: Blacklisted regex for when checking the author.
feed_url: What feed we should set the blacklist for.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the feed page.
@ -438,12 +471,17 @@ async def post_set_blacklist(
@app.get("/blacklist", response_class=HTMLResponse)
async def get_blacklist(feed_url: str, request: Request):
async def get_blacklist(
feed_url: str,
request: Request,
reader: Annotated[Reader, Depends(get_reader_dependency)],
):
"""Get the blacklist.
Args:
feed_url: What feed we should get the blacklist for.
request: The request object.
reader: The Reader instance.
Returns:
HTMLResponse: The blacklist page.
@ -477,6 +515,7 @@ async def get_blacklist(feed_url: str, request: Request):
@app.post("/custom")
async def post_set_custom(
feed_url: Annotated[str, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
custom_message: Annotated[str, Form()] = "",
) -> RedirectResponse:
"""Set the custom message, this is used when sending the message.
@ -484,6 +523,7 @@ async def post_set_custom(
Args:
custom_message: The custom message.
feed_url: The feed we should set the custom message for.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the feed page.
@ -505,12 +545,17 @@ async def post_set_custom(
@app.get("/custom", response_class=HTMLResponse)
async def get_custom(feed_url: str, request: Request):
async def get_custom(
feed_url: str,
request: Request,
reader: Annotated[Reader, Depends(get_reader_dependency)],
):
"""Get the custom message. This is used when sending the message to Discord.
Args:
feed_url: What feed we should get the custom message for.
request: The request object.
reader: The Reader instance.
Returns:
HTMLResponse: The custom message page.
@ -531,12 +576,17 @@ async def get_custom(feed_url: str, request: Request):
@app.get("/embed", response_class=HTMLResponse)
async def get_embed_page(feed_url: str, request: Request):
async def get_embed_page(
feed_url: str,
request: Request,
reader: Annotated[Reader, Depends(get_reader_dependency)],
):
"""Get the custom message. This is used when sending the message to Discord.
Args:
feed_url: What feed we should get the custom message for.
request: The request object.
reader: The Reader instance.
Returns:
HTMLResponse: The embed page.
@ -572,6 +622,7 @@ async def get_embed_page(feed_url: str, request: Request):
@app.post("/embed", response_class=HTMLResponse)
async def post_embed(
feed_url: Annotated[str, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
title: Annotated[str, Form()] = "",
description: Annotated[str, Form()] = "",
color: Annotated[str, Form()] = "",
@ -597,7 +648,7 @@ async def post_embed(
author_icon_url: The author icon url of the embed.
footer_text: The footer text of the embed.
footer_icon_url: The footer icon url of the embed.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the embed page.
@ -625,11 +676,15 @@ async def post_embed(
@app.post("/use_embed")
async def post_use_embed(feed_url: Annotated[str, Form()]) -> RedirectResponse:
async def post_use_embed(
feed_url: Annotated[str, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
) -> RedirectResponse:
"""Use embed instead of text.
Args:
feed_url: The feed to change.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the feed page.
@ -641,11 +696,15 @@ async def post_use_embed(feed_url: Annotated[str, Form()]) -> RedirectResponse:
@app.post("/use_text")
async def post_use_text(feed_url: Annotated[str, Form()]) -> RedirectResponse:
async def post_use_text(
feed_url: Annotated[str, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
) -> RedirectResponse:
"""Use text instead of embed.
Args:
feed_url: The feed to change.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the feed page.
@ -659,6 +718,7 @@ async def post_use_text(feed_url: Annotated[str, Form()]) -> RedirectResponse:
@app.post("/set_update_interval")
async def post_set_update_interval(
feed_url: Annotated[str, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
interval_minutes: Annotated[int | None, Form()] = None,
redirect_to: Annotated[str, Form()] = "",
) -> RedirectResponse:
@ -668,6 +728,7 @@ async def post_set_update_interval(
feed_url: The feed to change.
interval_minutes: The update interval in minutes (None to reset to global default).
redirect_to: Optional redirect URL (defaults to feed page).
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the specified page or feed page.
@ -703,12 +764,14 @@ async def post_set_update_interval(
async def post_change_feed_url(
old_feed_url: Annotated[str, Form()],
new_feed_url: Annotated[str, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
) -> RedirectResponse:
"""Change the URL for an existing feed.
Args:
old_feed_url: Current feed URL.
new_feed_url: New feed URL to change to.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the feed page for the resulting URL.
@ -734,6 +797,19 @@ async def post_change_feed_url(
except ReaderError as e:
raise HTTPException(status_code=400, detail=f"Failed to change feed URL: {e}") from e
# Update the feed with the new URL so we can discover what entries it returns.
# Then mark all unread entries as read so the scheduler doesn't resend them.
try:
reader.update_feed(clean_new_feed_url)
except Exception:
logger.exception("Failed to update feed after URL change: %s", clean_new_feed_url)
for entry in reader.get_entries(feed=clean_new_feed_url, read=False):
try:
reader.set_entry_read(entry, True)
except Exception:
logger.exception("Failed to mark entry as read after URL change: %s", entry.id)
commit_state_change(reader, f"Change feed URL from {clean_old_feed_url} to {clean_new_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_new_feed_url)}", status_code=303)
@ -741,6 +817,7 @@ async def post_change_feed_url(
@app.post("/reset_update_interval")
async def post_reset_update_interval(
feed_url: Annotated[str, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
redirect_to: Annotated[str, Form()] = "",
) -> RedirectResponse:
"""Reset the update interval for a feed to use the global default.
@ -748,6 +825,7 @@ async def post_reset_update_interval(
Args:
feed_url: The feed to change.
redirect_to: Optional redirect URL (defaults to feed page).
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the specified page or feed page.
@ -774,11 +852,15 @@ async def post_reset_update_interval(
@app.post("/set_global_update_interval")
async def post_set_global_update_interval(interval_minutes: Annotated[int, Form()]) -> RedirectResponse:
async def post_set_global_update_interval(
interval_minutes: Annotated[int, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
) -> RedirectResponse:
"""Set the global default update interval.
Args:
interval_minutes: The update interval in minutes.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the settings page.
@ -792,11 +874,15 @@ async def post_set_global_update_interval(interval_minutes: Annotated[int, Form(
@app.get("/add", response_class=HTMLResponse)
def get_add(request: Request):
def get_add(
request: Request,
reader: Annotated[Reader, Depends(get_reader_dependency)],
):
"""Page for adding a new feed.
Args:
request: The request object.
reader: The Reader instance.
Returns:
HTMLResponse: The add feed page.
@ -809,13 +895,19 @@ def get_add(request: Request):
@app.get("/feed", response_class=HTMLResponse)
async def get_feed(feed_url: str, request: Request, starting_after: str = ""): # noqa: C901, PLR0912, PLR0914, PLR0915
async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
feed_url: str,
request: Request,
reader: Annotated[Reader, Depends(get_reader_dependency)],
starting_after: str = "",
):
"""Get a feed by URL.
Args:
feed_url: The feed to add.
request: The request object.
starting_after: The entry to start after. Used for pagination.
reader: The Reader instance.
Returns:
HTMLResponse: The feed page.
@ -845,28 +937,22 @@ async def get_feed(feed_url: str, request: Request, starting_after: str = ""):
except EntryNotFoundError as e:
current_entries = list(reader.get_entries(feed=clean_feed_url))
msg: str = f"{e}\n\n{[entry.id for entry in current_entries]}"
html: str = create_html_for_feed(current_entries, clean_feed_url)
html: str = create_html_for_feed(reader=reader, entries=current_entries, current_feed_url=clean_feed_url)
# Get feed and global intervals for error case too
feed_interval: int | None = None
try:
feed_update_config = reader.get_tag(feed, ".reader.update")
feed_update_config = reader.get_tag(feed, ".reader.update", None)
if isinstance(feed_update_config, dict) and "interval" in feed_update_config:
interval_value = feed_update_config["interval"]
if isinstance(interval_value, int):
feed_interval = interval_value
except TagNotFoundError:
pass
global_interval: int = 60
try:
global_update_config = reader.get_tag((), ".reader.update")
global_update_config = reader.get_tag((), ".reader.update", None)
if isinstance(global_update_config, dict) and "interval" in global_update_config:
interval_value = global_update_config["interval"]
if isinstance(interval_value, int):
global_interval = interval_value
except TagNotFoundError:
pass
context = {
"request": request,
@ -901,36 +987,25 @@ async def get_feed(feed_url: str, request: Request, starting_after: str = ""):
last_entry = entries[-1]
# Create the html for the entries.
html: str = create_html_for_feed(entries, clean_feed_url)
html: str = create_html_for_feed(reader=reader, entries=entries, current_feed_url=clean_feed_url)
try:
should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed"))
except TagNotFoundError:
add_missing_tags(reader)
should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed"))
should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed", True))
# Get the update interval for this feed
feed_interval: int | None = None
try:
feed_update_config = reader.get_tag(feed, ".reader.update")
feed_update_config = reader.get_tag(feed, ".reader.update", None)
if isinstance(feed_update_config, dict) and "interval" in feed_update_config:
interval_value = feed_update_config["interval"]
if isinstance(interval_value, int):
feed_interval = interval_value
except TagNotFoundError:
# No custom interval set for this feed, will use global default
pass
# Get the global default update interval
global_interval: int = 60 # Default to 60 minutes if not set
try:
global_update_config = reader.get_tag((), ".reader.update")
global_update_config = reader.get_tag((), ".reader.update", None)
if isinstance(global_update_config, dict) and "interval" in global_update_config:
interval_value = global_update_config["interval"]
if isinstance(interval_value, int):
global_interval = interval_value
except TagNotFoundError:
pass
context = {
"request": request,
@ -948,10 +1023,15 @@ async def get_feed(feed_url: str, request: Request, starting_after: str = ""):
return templates.TemplateResponse(request=request, name="feed.html", context=context)
def create_html_for_feed(entries: Iterable[Entry], current_feed_url: str = "") -> str: # noqa: C901, PLR0914
def create_html_for_feed( # noqa: C901, PLR0914
reader: Reader,
entries: Iterable[Entry],
current_feed_url: str = "",
) -> str:
"""Create HTML for the search results.
Args:
reader: The Reader instance to use.
entries: The entries to create HTML for.
current_feed_url: The feed URL currently being viewed in /feed.
@ -969,17 +1049,19 @@ def create_html_for_feed(entries: Iterable[Entry], current_feed_url: str = "") -
first_image = get_first_image(summary, content)
text: str = replace_tags_in_text_message(entry) or "<div class='text-muted'>No content available.</div>"
text: str = replace_tags_in_text_message(entry, reader=reader) or (
"<div class='text-muted'>No content available.</div>"
)
published = ""
if entry.published:
published: str = entry.published.strftime("%Y-%m-%d %H:%M:%S")
blacklisted: str = ""
if entry_is_blacklisted(entry):
if entry_is_blacklisted(entry, reader=reader):
blacklisted = "<span class='badge bg-danger'>Blacklisted</span>"
whitelisted: str = ""
if entry_is_whitelisted(entry):
if entry_is_whitelisted(entry, reader=reader):
whitelisted = "<span class='badge bg-success'>Whitelisted</span>"
source_feed_url: str = getattr(entry, "original_feed_url", None) or entry.feed.url
@ -999,7 +1081,11 @@ def create_html_for_feed(entries: Iterable[Entry], current_feed_url: str = "") -
)
entry_id: str = urllib.parse.quote(entry.id)
to_discord_html: str = f"<a class='text-muted' href='/post_entry?entry_id={entry_id}'>Send to Discord</a>"
encoded_source_feed_url: str = urllib.parse.quote(source_feed_url)
to_discord_html: str = (
f"<a class='text-muted' href='/post_entry?entry_id={entry_id}&feed_url={encoded_source_feed_url}'>"
"Send to Discord</a>"
)
# Check if this is a YouTube feed entry and the entry has a link
is_youtube_feed = "youtube.com/feeds/videos.xml" in entry.feed.url
@ -1070,6 +1156,7 @@ def get_data_from_hook_url(hook_name: str, hook_url: str) -> WebhookInfo:
hook_name (str): The webhook name.
hook_url (str): The webhook URL.
Returns:
WebhookInfo: The webhook username, avatar, guild id, etc.
"""
@ -1091,39 +1178,37 @@ def get_data_from_hook_url(hook_name: str, hook_url: str) -> WebhookInfo:
@app.get("/settings", response_class=HTMLResponse)
async def get_settings(request: Request):
async def get_settings(
request: Request,
reader: Annotated[Reader, Depends(get_reader_dependency)],
):
"""Settings page.
Args:
request: The request object.
reader: The Reader instance.
Returns:
HTMLResponse: The settings page.
"""
# Get the global default update interval
global_interval: int = 60 # Default to 60 minutes if not set
try:
global_update_config = reader.get_tag((), ".reader.update")
global_update_config = reader.get_tag((), ".reader.update", None)
if isinstance(global_update_config, dict) and "interval" in global_update_config:
interval_value = global_update_config["interval"]
if isinstance(interval_value, int):
global_interval = interval_value
except TagNotFoundError:
pass
# Get all feeds with their intervals
feeds: Iterable[Feed] = reader.get_feeds()
feed_intervals = []
for feed in feeds:
feed_interval: int | None = None
try:
feed_update_config = reader.get_tag(feed, ".reader.update")
feed_update_config = reader.get_tag(feed, ".reader.update", None)
if isinstance(feed_update_config, dict) and "interval" in feed_update_config:
interval_value = feed_update_config["interval"]
if isinstance(interval_value, int):
feed_interval = interval_value
except TagNotFoundError:
pass
feed_intervals.append({
"feed": feed,
@ -1141,11 +1226,15 @@ async def get_settings(request: Request):
@app.get("/webhooks", response_class=HTMLResponse)
async def get_webhooks(request: Request):
async def get_webhooks(
request: Request,
reader: Annotated[Reader, Depends(get_reader_dependency)],
):
"""Page for adding a new webhook.
Args:
request: The request object.
reader: The Reader instance.
Returns:
HTMLResponse: The add webhook page.
@ -1166,54 +1255,65 @@ async def get_webhooks(request: Request):
@app.get("/", response_class=HTMLResponse)
def get_index(request: Request, message: str = ""):
def get_index(
request: Request,
reader: Annotated[Reader, Depends(get_reader_dependency)],
message: str = "",
):
"""This is the root of the website.
Args:
request: The request object.
message: Optional message to display to the user.
reader: The Reader instance.
Returns:
HTMLResponse: The index page.
"""
return templates.TemplateResponse(request=request, name="index.html", context=make_context_index(request, message))
return templates.TemplateResponse(
request=request,
name="index.html",
context=make_context_index(request, message, reader),
)
def make_context_index(request: Request, message: str = ""):
def make_context_index(request: Request, message: str = "", reader: Reader | None = None):
"""Create the needed context for the index page.
Args:
request: The request object.
message: Optional message to display to the user.
reader: The Reader instance.
Returns:
dict: The context for the index page.
"""
hooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(reader.get_tag((), "webhooks", [])))
effective_reader: Reader = reader or get_reader_dependency()
hooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(effective_reader.get_tag((), "webhooks", [])))
feed_list = []
broken_feeds = []
feeds_without_attached_webhook = []
feed_list: list[dict[str, JSONType | Feed | str]] = []
broken_feeds: list[Feed] = []
feeds_without_attached_webhook: list[Feed] = []
# Get all feeds and organize them
feeds: Iterable[Feed] = reader.get_feeds()
feeds: Iterable[Feed] = effective_reader.get_feeds()
for feed in feeds:
try:
webhook = reader.get_tag(feed.url, "webhook")
feed_list.append({"feed": feed, "webhook": webhook, "domain": extract_domain(feed.url)})
except TagNotFoundError:
webhook: str = str(effective_reader.get_tag(feed.url, "webhook", ""))
if not webhook:
broken_feeds.append(feed)
continue
webhook_list = [hook["url"] for hook in hooks]
feed_list.append({"feed": feed, "webhook": webhook, "domain": extract_domain(feed.url)})
webhook_list: list[str] = [hook["url"] for hook in hooks]
if webhook not in webhook_list:
feeds_without_attached_webhook.append(feed)
return {
"request": request,
"feeds": feed_list,
"feed_count": reader.get_feed_counts(),
"entry_count": reader.get_entry_counts(),
"feed_count": effective_reader.get_feed_counts(),
"entry_count": effective_reader.get_entry_counts(),
"webhooks": hooks,
"broken_feeds": broken_feeds,
"feeds_without_attached_webhook": feeds_without_attached_webhook,
@ -1222,12 +1322,15 @@ def make_context_index(request: Request, message: str = ""):
@app.post("/remove", response_class=HTMLResponse)
async def remove_feed(feed_url: Annotated[str, Form()]):
async def remove_feed(
feed_url: Annotated[str, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
):
"""Get a feed by URL.
Args:
feed_url: The feed to add.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the index page.
@ -1246,13 +1349,17 @@ async def remove_feed(feed_url: Annotated[str, Form()]):
@app.get("/update", response_class=HTMLResponse)
async def update_feed(request: Request, feed_url: str):
async def update_feed(
request: Request,
feed_url: str,
reader: Annotated[Reader, Depends(get_reader_dependency)],
):
"""Update a feed.
Args:
request: The request object.
feed_url: The feed URL to update.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the feed page.
@ -1270,11 +1377,15 @@ async def update_feed(request: Request, feed_url: str):
@app.post("/backup")
async def manual_backup(request: Request) -> RedirectResponse:
async def manual_backup(
request: Request,
reader: Annotated[Reader, Depends(get_reader_dependency)],
) -> RedirectResponse:
"""Manually trigger a git backup of the current state.
Args:
request: The request object.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the index page with a success or error message.
@ -1297,51 +1408,81 @@ async def manual_backup(request: Request) -> RedirectResponse:
@app.get("/search", response_class=HTMLResponse)
async def search(request: Request, query: str):
async def search(
request: Request,
query: str,
reader: Annotated[Reader, Depends(get_reader_dependency)],
):
"""Get entries matching a full-text search query.
Args:
query: The query to search for.
request: The request object.
reader: The Reader instance.
Returns:
HTMLResponse: The search page.
"""
reader.update_search()
context = create_search_context(query)
context = create_search_context(query, reader=reader)
return templates.TemplateResponse(request=request, name="search.html", context={"request": request, **context})
@app.get("/post_entry", response_class=HTMLResponse)
async def post_entry(entry_id: str):
async def post_entry(
entry_id: str,
reader: Annotated[Reader, Depends(get_reader_dependency)],
feed_url: str = "",
):
"""Send single entry to Discord.
Args:
entry_id: The entry to send.
feed_url: Optional feed URL used to disambiguate entries with identical IDs.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the feed page.
"""
unquoted_entry_id: str = urllib.parse.unquote(entry_id)
entry: Entry | None = next((entry for entry in reader.get_entries() if entry.id == unquoted_entry_id), None)
clean_feed_url: str = urllib.parse.unquote(feed_url.strip()) if feed_url else ""
# Prefer feed-scoped lookup when feed_url is provided. This avoids ambiguity when
# multiple feeds contain entries with the same ID.
entry: Entry | None = None
if clean_feed_url:
entry = next(
(entry for entry in reader.get_entries(feed=clean_feed_url) if entry.id == unquoted_entry_id),
None,
)
else:
entry = next((entry for entry in reader.get_entries() if entry.id == unquoted_entry_id), None)
if entry is None:
return HTMLResponse(status_code=404, content=f"Entry '{entry_id}' not found.")
if result := send_entry_to_discord(entry=entry):
if result := send_entry_to_discord(entry=entry, reader=reader):
return result
# Redirect to the feed page.
clean_feed_url: str = entry.feed.url.strip()
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
redirect_feed_url: str = entry.feed.url.strip()
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(redirect_feed_url)}", status_code=303)
@app.post("/modify_webhook", response_class=HTMLResponse)
def modify_webhook(old_hook: Annotated[str, Form()], new_hook: Annotated[str, Form()]):
def modify_webhook(
old_hook: Annotated[str, Form()],
new_hook: Annotated[str, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
redirect_to: Annotated[str, Form()] = "",
):
"""Modify a webhook.
Args:
old_hook: The webhook to modify.
new_hook: The new webhook.
redirect_to: Optional redirect URL after the update.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the webhook page.
@ -1356,15 +1497,20 @@ def modify_webhook(old_hook: Annotated[str, Form()], new_hook: Annotated[str, Fo
# Webhooks are stored as a list of dictionaries.
# Example: [{"name": "webhook_name", "url": "webhook_url"}]
webhooks = cast("list[dict[str, str]]", webhooks)
old_hook_clean: str = old_hook.strip()
new_hook_clean: str = new_hook.strip()
webhook_modified: bool = False
for hook in webhooks:
if hook["url"] in old_hook.strip():
hook["url"] = new_hook.strip()
if hook["url"] in old_hook_clean:
hook["url"] = new_hook_clean
# Check if it has been modified.
if hook["url"] != new_hook.strip():
if hook["url"] != new_hook_clean:
raise HTTPException(status_code=500, detail="Webhook could not be modified")
webhook_modified = True
# Add our new list of webhooks to the database.
reader.set_tag((), "webhooks", webhooks) # pyright: ignore[reportArgumentType]
@ -1372,16 +1518,21 @@ def modify_webhook(old_hook: Annotated[str, Form()], new_hook: Annotated[str, Fo
# matches the old one.
feeds: Iterable[Feed] = reader.get_feeds()
for feed in feeds:
try:
webhook = reader.get_tag(feed, "webhook")
except TagNotFoundError:
continue
webhook: str = str(reader.get_tag(feed, "webhook", ""))
if webhook == old_hook.strip():
reader.set_tag(feed.url, "webhook", new_hook.strip()) # pyright: ignore[reportArgumentType]
if webhook == old_hook_clean:
reader.set_tag(feed.url, "webhook", new_hook_clean) # pyright: ignore[reportArgumentType]
# Redirect to the webhook page.
return RedirectResponse(url="/webhooks", status_code=303)
if webhook_modified and old_hook_clean != new_hook_clean:
commit_state_change(reader, f"Modify webhook URL from {old_hook_clean} to {new_hook_clean}")
redirect_url: str = redirect_to.strip() or "/webhooks"
if redirect_to:
redirect_url = redirect_url.replace(urllib.parse.quote(old_hook_clean), urllib.parse.quote(new_hook_clean))
redirect_url = redirect_url.replace(old_hook_clean, new_hook_clean)
# Redirect to the requested page.
return RedirectResponse(url=redirect_url, status_code=303)
def extract_youtube_video_id(url: str) -> str | None:
@ -1407,11 +1558,216 @@ def extract_youtube_video_id(url: str) -> str | None:
return None
@app.get("/webhook_entries", response_class=HTMLResponse)
async def get_webhook_entries( # noqa: C901, PLR0912, PLR0914
def resolve_final_feed_url(url: str) -> tuple[str, str | None]:
"""Resolve a feed URL by following redirects.
Args:
url: The feed URL to resolve.
Returns:
tuple[str, str | None]: A tuple with (resolved_url, error_message).
error_message is None when resolution succeeded.
"""
clean_url: str = url.strip()
if not clean_url:
return "", "URL is empty"
if not is_url_valid(clean_url):
return clean_url, "URL is invalid"
try:
response: Response = httpx.get(clean_url, follow_redirects=True, timeout=10.0)
except httpx.HTTPError as e:
return clean_url, str(e)
if not response.is_success:
return clean_url, f"HTTP {response.status_code}"
return str(response.url), None
def create_webhook_feed_url_preview(
webhook_feeds: list[Feed],
replace_from: str,
replace_to: str,
resolve_urls: bool, # noqa: FBT001
force_update: bool = False, # noqa: FBT001, FBT002
existing_feed_urls: set[str] | None = None,
) -> list[dict[str, str | bool | None]]:
"""Create preview rows for bulk feed URL replacement.
Args:
webhook_feeds: Feeds attached to a webhook.
replace_from: Text to replace in each URL.
replace_to: Replacement text.
resolve_urls: Whether to resolve resulting URLs via HTTP redirects.
force_update: Whether conflicts should be marked as force-overwritable.
existing_feed_urls: Optional set of all tracked feed URLs used for conflict detection.
Returns:
list[dict[str, str | bool | None]]: Rows used in the preview table.
"""
known_feed_urls: set[str] = existing_feed_urls or {feed.url for feed in webhook_feeds}
preview_rows: list[dict[str, str | bool | None]] = []
for feed in webhook_feeds:
old_url: str = feed.url
has_match: bool = bool(replace_from and replace_from in old_url)
candidate_url: str = old_url
if has_match:
candidate_url = old_url.replace(replace_from, replace_to)
resolved_url: str = candidate_url
resolution_error: str | None = None
if has_match and candidate_url != old_url and resolve_urls:
resolved_url, resolution_error = resolve_final_feed_url(candidate_url)
will_force_ignore_errors: bool = bool(
force_update and bool(resolution_error) and has_match and old_url != candidate_url,
)
target_exists: bool = bool(
has_match and not resolution_error and resolved_url != old_url and resolved_url in known_feed_urls,
)
will_force_overwrite: bool = bool(target_exists and force_update)
will_change: bool = bool(
has_match
and old_url != (candidate_url if will_force_ignore_errors else resolved_url)
and (not target_exists or will_force_overwrite)
and (not resolution_error or will_force_ignore_errors),
)
preview_rows.append({
"old_url": old_url,
"candidate_url": candidate_url,
"resolved_url": resolved_url,
"has_match": has_match,
"will_change": will_change,
"target_exists": target_exists,
"will_force_overwrite": will_force_overwrite,
"will_force_ignore_errors": will_force_ignore_errors,
"resolution_error": resolution_error,
})
return preview_rows
def build_webhook_mass_update_context(
webhook_feeds: list[Feed],
all_feeds: list[Feed],
replace_from: str,
replace_to: str,
resolve_urls: bool, # noqa: FBT001
force_update: bool = False, # noqa: FBT001, FBT002
) -> dict[str, str | bool | int | list[dict[str, str | bool | None]] | dict[str, int]]:
"""Build context data used by the webhook mass URL update preview UI.
Args:
webhook_feeds: Feeds attached to the selected webhook.
all_feeds: All tracked feeds.
replace_from: Text to replace in URLs.
replace_to: Replacement text.
resolve_urls: Whether to resolve resulting URLs.
force_update: Whether to allow overwriting existing target URLs.
Returns:
dict[str, ...]: Context values for rendering preview controls and table.
"""
clean_replace_from: str = replace_from.strip()
clean_replace_to: str = replace_to.strip()
preview_rows: list[dict[str, str | bool | None]] = []
if clean_replace_from:
preview_rows = create_webhook_feed_url_preview(
webhook_feeds=webhook_feeds,
replace_from=clean_replace_from,
replace_to=clean_replace_to,
resolve_urls=resolve_urls,
force_update=force_update,
existing_feed_urls={feed.url for feed in all_feeds},
)
preview_summary: dict[str, int] = {
"total": len(preview_rows),
"matched": sum(1 for row in preview_rows if row["has_match"]),
"will_update": sum(1 for row in preview_rows if row["will_change"]),
"conflicts": sum(1 for row in preview_rows if row["target_exists"] and not row["will_force_overwrite"]),
"force_overwrite": sum(1 for row in preview_rows if row["will_force_overwrite"]),
"force_ignore_errors": sum(1 for row in preview_rows if row["will_force_ignore_errors"]),
"resolve_errors": sum(1 for row in preview_rows if row["resolution_error"]),
}
preview_summary["no_match"] = preview_summary["total"] - preview_summary["matched"]
preview_summary["no_change"] = sum(
1 for row in preview_rows if row["has_match"] and not row["resolution_error"] and not row["will_change"]
)
return {
"replace_from": clean_replace_from,
"replace_to": clean_replace_to,
"resolve_urls": resolve_urls,
"force_update": force_update,
"preview_rows": preview_rows,
"preview_summary": preview_summary,
"preview_change_count": preview_summary["will_update"],
}
@app.get("/webhook_entries_mass_update_preview", response_class=HTMLResponse)
async def get_webhook_entries_mass_update_preview(
webhook_url: str,
request: Request,
reader: Annotated[Reader, Depends(get_reader_dependency)],
replace_from: str = "",
replace_to: str = "",
resolve_urls: bool = True, # noqa: FBT001, FBT002
force_update: bool = False, # noqa: FBT001, FBT002
) -> HTMLResponse:
"""Render the mass-update preview fragment for a webhook using HTMX.
Args:
webhook_url: Webhook URL whose feeds are being updated.
request: The request object.
reader: The Reader instance.
replace_from: Text to find in URLs.
replace_to: Replacement text.
resolve_urls: Whether to resolve resulting URLs.
force_update: Whether to allow overwriting existing target URLs.
Returns:
HTMLResponse: Rendered partial template containing summary + preview table.
"""
clean_webhook_url: str = urllib.parse.unquote(webhook_url.strip())
all_feeds: list[Feed] = list(reader.get_feeds())
webhook_feeds: list[Feed] = [
feed for feed in all_feeds if str(reader.get_tag(feed.url, "webhook", "")) == clean_webhook_url
]
context = {
"request": request,
"webhook_url": clean_webhook_url,
**build_webhook_mass_update_context(
webhook_feeds=webhook_feeds,
all_feeds=all_feeds,
replace_from=replace_from,
replace_to=replace_to,
resolve_urls=resolve_urls,
force_update=force_update,
),
}
return templates.TemplateResponse(request=request, name="_webhook_mass_update_preview.html", context=context)
@app.get("/webhook_entries", response_class=HTMLResponse)
async def get_webhook_entries( # noqa: C901, PLR0914
webhook_url: str,
request: Request,
reader: Annotated[Reader, Depends(get_reader_dependency)],
starting_after: str = "",
replace_from: str = "",
replace_to: str = "",
resolve_urls: bool = True, # noqa: FBT001, FBT002
force_update: bool = False, # noqa: FBT001, FBT002
message: str = "",
) -> HTMLResponse:
"""Get all latest entries from all feeds for a specific webhook.
@ -1419,6 +1775,12 @@ async def get_webhook_entries( # noqa: C901, PLR0912, PLR0914
webhook_url: The webhook URL to get entries for.
request: The request object.
starting_after: The entry to start after. Used for pagination.
replace_from: Optional URL substring to find for bulk URL replacement preview.
replace_to: Optional replacement substring used in bulk URL replacement preview.
resolve_urls: Whether to resolve replaced URLs by following redirects.
force_update: Whether to allow overwriting existing target URLs during apply.
message: Optional status message shown in the UI.
reader: The Reader instance.
Returns:
HTMLResponse: The webhook entries page.
@ -1440,24 +1802,26 @@ async def get_webhook_entries( # noqa: C901, PLR0912, PLR0914
if not webhook_name:
raise HTTPException(status_code=404, detail=f"Webhook not found: {clean_webhook_url}")
hook_info: WebhookInfo = get_data_from_hook_url(hook_name=webhook_name, hook_url=clean_webhook_url)
# Get all feeds associated with this webhook
all_feeds: list[Feed] = list(reader.get_feeds())
webhook_feeds: list[Feed] = []
for feed in all_feeds:
try:
feed_webhook: str = str(reader.get_tag(feed.url, "webhook", ""))
if feed_webhook == clean_webhook_url:
webhook_feeds.append(feed)
except TagNotFoundError:
continue
# Get all entries from all feeds for this webhook, sorted by published date
all_entries: list[Entry] = [entry for feed in webhook_feeds for entry in reader.get_entries(feed=feed)]
# Sort entries by published date (newest first)
# Sort entries by published date (newest first), with undated entries last.
all_entries.sort(
key=lambda e: e.published or datetime.now(tz=UTC),
key=lambda e: (
e.published is not None,
e.published or datetime.min.replace(tzinfo=UTC),
),
reverse=True,
)
@ -1490,7 +1854,16 @@ async def get_webhook_entries( # noqa: C901, PLR0912, PLR0914
last_entry = paginated_entries[-1]
# Create the html for the entries
html: str = create_html_for_feed(paginated_entries)
html: str = create_html_for_feed(reader=reader, entries=paginated_entries)
mass_update_context = build_webhook_mass_update_context(
webhook_feeds=webhook_feeds,
all_feeds=all_feeds,
replace_from=replace_from,
replace_to=replace_to,
resolve_urls=resolve_urls,
force_update=force_update,
)
# Check if there are more entries available
total_entries: int = len(all_entries)
@ -1498,18 +1871,155 @@ async def get_webhook_entries( # noqa: C901, PLR0912, PLR0914
context = {
"request": request,
"hook_info": hook_info,
"webhook_name": webhook_name,
"webhook_url": clean_webhook_url,
"webhook_feeds": webhook_feeds,
"entries": paginated_entries,
"html": html,
"last_entry": last_entry,
"is_show_more_entries_button_visible": is_show_more_entries_button_visible,
"total_entries": total_entries,
"feeds_count": len(webhook_feeds),
"message": urllib.parse.unquote(message) if message else "",
**mass_update_context,
}
return templates.TemplateResponse(request=request, name="webhook_entries.html", context=context)
@app.post("/bulk_change_feed_urls", response_class=HTMLResponse)
async def post_bulk_change_feed_urls( # noqa: C901, PLR0914, PLR0912, PLR0915
webhook_url: Annotated[str, Form()],
replace_from: Annotated[str, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
replace_to: Annotated[str, Form()] = "",
resolve_urls: Annotated[bool, Form()] = True, # noqa: FBT002
force_update: Annotated[bool, Form()] = False, # noqa: FBT002
) -> RedirectResponse:
"""Bulk-change feed URLs attached to a webhook.
Args:
webhook_url: The webhook URL whose feeds should be updated.
replace_from: Text to find in each URL.
replace_to: Text to replace with.
resolve_urls: Whether to resolve resulting URLs via redirects.
force_update: Whether existing target feed URLs should be overwritten.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to webhook detail with status message.
Raises:
HTTPException: If webhook is missing or replace_from is empty.
"""
clean_webhook_url: str = urllib.parse.unquote(webhook_url.strip())
clean_replace_from: str = replace_from.strip()
clean_replace_to: str = replace_to.strip()
if not clean_replace_from:
raise HTTPException(status_code=400, detail="replace_from cannot be empty")
webhooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(reader.get_tag((), "webhooks", [])))
if not any(hook["url"] == clean_webhook_url for hook in webhooks):
raise HTTPException(status_code=404, detail=f"Webhook not found: {clean_webhook_url}")
all_feeds: list[Feed] = list(reader.get_feeds())
webhook_feeds: list[Feed] = []
for feed in all_feeds:
feed_webhook: str = str(reader.get_tag(feed.url, "webhook", ""))
if feed_webhook == clean_webhook_url:
webhook_feeds.append(feed)
preview_rows: list[dict[str, str | bool | None]] = create_webhook_feed_url_preview(
webhook_feeds=webhook_feeds,
replace_from=clean_replace_from,
replace_to=clean_replace_to,
resolve_urls=resolve_urls,
force_update=force_update,
existing_feed_urls={feed.url for feed in all_feeds},
)
changed_count: int = 0
skipped_count: int = 0
failed_count: int = 0
conflict_count: int = 0
force_overwrite_count: int = 0
for row in preview_rows:
if not row["has_match"]:
continue
if row["resolution_error"] and not force_update:
skipped_count += 1
continue
if row["target_exists"] and not force_update:
conflict_count += 1
skipped_count += 1
continue
old_url: str = str(row["old_url"])
new_url: str = str(row["candidate_url"] if row["will_force_ignore_errors"] else row["resolved_url"])
if old_url == new_url:
skipped_count += 1
continue
if row["target_exists"] and force_update:
try:
reader.delete_feed(new_url)
force_overwrite_count += 1
except FeedNotFoundError:
pass
except ReaderError:
failed_count += 1
continue
try:
reader.change_feed_url(old_url, new_url)
except FeedExistsError:
skipped_count += 1
continue
except FeedNotFoundError:
skipped_count += 1
continue
except ReaderError:
failed_count += 1
continue
try:
reader.update_feed(new_url)
except Exception:
logger.exception("Failed to update feed after URL change: %s", new_url)
for entry in reader.get_entries(feed=new_url, read=False):
try:
reader.set_entry_read(entry, True)
except Exception:
logger.exception("Failed to mark entry as read after URL change: %s", entry.id)
changed_count += 1
if changed_count > 0:
commit_state_change(
reader,
f"Bulk change {changed_count} feed URL(s) for webhook {clean_webhook_url}",
)
status_message: str = (
f"Updated {changed_count} feed URL(s). "
f"Force overwrote {force_overwrite_count}. "
f"Conflicts {conflict_count}. "
f"Skipped {skipped_count}. "
f"Failed {failed_count}."
)
redirect_url: str = (
f"/webhook_entries?webhook_url={urllib.parse.quote(clean_webhook_url)}"
f"&message={urllib.parse.quote(status_message)}"
)
return RedirectResponse(url=redirect_url, status_code=303)
if __name__ == "__main__":
sentry_sdk.init(
dsn="https://6e77a0d7acb9c7ea22e85a375e0ff1f4@o4505228040339456.ingest.us.sentry.io/4508792887967744",

View file

@ -1,109 +0,0 @@
from __future__ import annotations
from reader import Feed
from reader import Reader
from reader import TagNotFoundError
from discord_rss_bot.settings import default_custom_embed
from discord_rss_bot.settings import default_custom_message
def add_custom_message(reader: Reader, feed: Feed) -> None:
"""Add the custom message tag to the feed if it doesn't exist.
Args:
reader: What Reader to use.
feed: The feed to add the tag to.
"""
try:
reader.get_tag(feed, "custom_message")
except TagNotFoundError:
reader.set_tag(feed.url, "custom_message", default_custom_message) # pyright: ignore[reportArgumentType]
reader.set_tag(feed.url, "has_custom_message", True) # pyright: ignore[reportArgumentType]
def add_has_custom_message(reader: Reader, feed: Feed) -> None:
"""Add the has_custom_message tag to the feed if it doesn't exist.
Args:
reader: What Reader to use.
feed: The feed to add the tag to.
"""
try:
reader.get_tag(feed, "has_custom_message")
except TagNotFoundError:
if reader.get_tag(feed, "custom_message") == default_custom_message:
reader.set_tag(feed.url, "has_custom_message", False) # pyright: ignore[reportArgumentType]
else:
reader.set_tag(feed.url, "has_custom_message", True) # pyright: ignore[reportArgumentType]
def add_if_embed(reader: Reader, feed: Feed) -> None:
"""Add the if_embed tag to the feed if it doesn't exist.
Args:
reader: What Reader to use.
feed: The feed to add the tag to.
"""
try:
reader.get_tag(feed, "if_embed")
except TagNotFoundError:
reader.set_tag(feed.url, "if_embed", True) # pyright: ignore[reportArgumentType]
def add_custom_embed(reader: Reader, feed: Feed) -> None:
"""Add the custom embed tag to the feed if it doesn't exist.
Args:
reader: What Reader to use.
feed: The feed to add the tag to.
"""
try:
reader.get_tag(feed, "embed")
except TagNotFoundError:
reader.set_tag(feed.url, "embed", default_custom_embed) # pyright: ignore[reportArgumentType]
reader.set_tag(feed.url, "has_custom_embed", True) # pyright: ignore[reportArgumentType]
def add_has_custom_embed(reader: Reader, feed: Feed) -> None:
"""Add the has_custom_embed tag to the feed if it doesn't exist.
Args:
reader: What Reader to use.
feed: The feed to add the tag to.
"""
try:
reader.get_tag(feed, "has_custom_embed")
except TagNotFoundError:
if reader.get_tag(feed, "embed") == default_custom_embed:
reader.set_tag(feed.url, "has_custom_embed", False) # pyright: ignore[reportArgumentType]
else:
reader.set_tag(feed.url, "has_custom_embed", True) # pyright: ignore[reportArgumentType]
def add_should_send_embed(reader: Reader, feed: Feed) -> None:
"""Add the should_send_embed tag to the feed if it doesn't exist.
Args:
reader: What Reader to use.
feed: The feed to add the tag to.
"""
try:
reader.get_tag(feed, "should_send_embed")
except TagNotFoundError:
reader.set_tag(feed.url, "should_send_embed", True) # pyright: ignore[reportArgumentType]
def add_missing_tags(reader: Reader) -> None:
"""Add missing tags to feeds.
Args:
reader: What Reader to use.
"""
for feed in reader.get_feeds():
add_custom_message(reader, feed)
add_has_custom_message(reader, feed)
add_if_embed(reader, feed)
add_custom_embed(reader, feed)
add_has_custom_embed(reader, feed)
add_should_send_embed(reader, feed)

View file

@ -3,8 +3,6 @@ from __future__ import annotations
import urllib.parse
from typing import TYPE_CHECKING
from discord_rss_bot.settings import get_reader
if TYPE_CHECKING:
from collections.abc import Iterable
@ -14,19 +12,16 @@ if TYPE_CHECKING:
from reader import Reader
def create_search_context(query: str, custom_reader: Reader | None = None) -> dict:
def create_search_context(query: str, reader: Reader) -> dict:
"""Build context for search.html template.
If custom_reader is None, use the default reader from settings.
Args:
query (str): The search query.
custom_reader (Reader | None): Optional custom Reader instance.
reader (Reader): Custom Reader instance.
Returns:
dict: Context dictionary for rendering the search results.
"""
reader: Reader = get_reader() if custom_reader is None else custom_reader
search_results: Iterable[EntrySearchResult] = reader.search_entries(query)
results: list[dict] = []

View file

@ -7,7 +7,6 @@ from pathlib import Path
from platformdirs import user_data_dir
from reader import Reader
from reader import TagNotFoundError
from reader import make_reader
if typing.TYPE_CHECKING:
@ -48,9 +47,7 @@ def get_reader(custom_location: Path | None = None) -> Reader:
# https://reader.readthedocs.io/en/latest/api.html#reader.types.UpdateConfig
# Set the default update interval to 15 minutes if not already configured
# Users can change this via the Settings page or per-feed in the feed page
try:
reader.get_tag((), ".reader.update")
except TagNotFoundError:
if reader.get_tag((), ".reader.update", None) is None:
# Set default
reader.set_tag((), ".reader.update", {"interval": 15})

View file

@ -0,0 +1,73 @@
{% if preview_rows %}
<p class="small text-muted mb-1">
{{ preview_change_count }} feed URL{{ 's' if preview_change_count != 1 else '' }} ready to update.
</p>
<div class="small text-muted mb-2 d-flex flex-wrap gap-2">
<span class="badge bg-secondary">Total: {{ preview_summary.total }}</span>
<span class="badge bg-info text-dark">Matched: {{ preview_summary.matched }}</span>
<span class="badge bg-success">Will update: {{ preview_summary.will_update }}</span>
<span class="badge bg-warning text-dark">Conflicts: {{ preview_summary.conflicts }}</span>
<span class="badge bg-warning">Force overwrite: {{ preview_summary.force_overwrite }}</span>
<span class="badge bg-warning text-dark">Force ignore errors: {{ preview_summary.force_ignore_errors }}</span>
<span class="badge bg-danger">Resolve errors: {{ preview_summary.resolve_errors }}</span>
<span class="badge bg-secondary">No change: {{ preview_summary.no_change }}</span>
<span class="badge bg-secondary">No match: {{ preview_summary.no_match }}</span>
</div>
<form action="/bulk_change_feed_urls" method="post" class="mb-2">
<input type="hidden" name="webhook_url" value="{{ webhook_url }}" />
<input type="hidden" name="replace_from" value="{{ replace_from }}" />
<input type="hidden" name="replace_to" value="{{ replace_to }}" />
<input type="hidden"
name="resolve_urls"
value="{{ 'true' if resolve_urls else 'false' }}" />
<input type="hidden"
name="force_update"
value="{{ 'true' if force_update else 'false' }}" />
<button type="submit"
class="btn btn-warning w-100"
{% if preview_change_count == 0 %}disabled{% endif %}
onclick="return confirm('Apply these feed URL updates?');">Apply mass update</button>
</form>
<div class="table-responsive mt-2">
<table class="table table-sm table-dark table-striped align-middle mb-0">
<thead>
<tr>
<th scope="col">Old URL</th>
<th scope="col">New URL</th>
<th scope="col">Status</th>
</tr>
</thead>
<tbody>
{% for row in preview_rows %}
<tr>
<td>
<code>{{ row.old_url }}</code>
</td>
<td>
<code>{{ row.resolved_url if resolve_urls else row.candidate_url }}</code>
</td>
<td>
{% if not row.has_match %}
<span class="badge bg-secondary">No match</span>
{% elif row.will_force_ignore_errors %}
<span class="badge bg-warning text-dark">Will force update (ignore resolve error)</span>
{% elif row.resolution_error %}
<span class="badge bg-danger">{{ row.resolution_error }}</span>
{% elif row.will_force_overwrite %}
<span class="badge bg-warning">Will force overwrite</span>
{% elif row.target_exists %}
<span class="badge bg-warning text-dark">Conflict: target URL exists</span>
{% elif row.will_change %}
<span class="badge bg-success">Will update</span>
{% else %}
<span class="badge bg-secondary">No change</span>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% elif replace_from %}
<p class="small text-muted mb-0">No preview rows found for that replacement pattern.</p>
{% endif %}

View file

@ -1,6 +1,5 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
@ -18,7 +17,6 @@
{% block head %}
{% endblock head %}
</head>
<body class="text-white-50">
{% include "nav.html" %}
<div class="p-2 mb-2">
@ -27,10 +25,12 @@
{% if messages %}
<div class="alert alert-warning alert-dismissible fade show" role="alert">
<pre>{{ messages }}</pre>
<button type="button" class="btn-close" data-bs-dismiss="alert" aria-label="Close"></button>
<button type="button"
class="btn-close"
data-bs-dismiss="alert"
aria-label="Close"></button>
</div>
{% endif %}
{% block content %}
{% endblock content %}
<footer class="d-flex flex-wrap justify-content-between align-items-center py-3 my-4 border-top">
@ -52,7 +52,9 @@
</div>
</div>
</div>
<script src="https://cdn.jsdelivr.net/npm/htmx.org@2.0.8/dist/htmx.min.js"
integrity="sha384-/TgkGk7p307TH7EXJDuUlgG3Ce1UVolAOFopFekQkkXihi5u/6OCvVKyz1W+idaz"
crossorigin="anonymous"></script>
<script src="/static/bootstrap.min.js" defer></script>
</body>
</html>

View file

@ -32,10 +32,10 @@
{% for hook_from_context in webhooks %}
<div class="p-2 mb-3 border border-dark">
<div class="d-flex justify-content-between align-items-center mb-3">
<h2 class="h5 mb-0">
<a class="text-muted" href="/webhooks">{{ hook_from_context.name }}</a>
</h2>
<a class="text-muted"
<h2 class="h5 mb-0">{{ hook_from_context.name }}</h2>
<a class="text-muted fs-6 btn btn-outline-light btn-sm ms-auto me-2"
href="/webhook_entries?webhook_url={{ hook_from_context.url|encode_url }}">Settings</a>
<a class="text-muted fs-6 btn btn-outline-light btn-sm"
href="/webhook_entries?webhook_url={{ hook_from_context.url|encode_url }}">View Latest Entries</a>
</div>
<!-- Group feeds by domain within each webhook -->

View file

@ -1,20 +1,149 @@
{% extends "base.html" %}
{% block title %}
| {{ webhook_name }} - Latest Entries
| {{ webhook_name }}
{% endblock title %}
{% block content %}
{% if message %}<div class="alert alert-info" role="alert">{{ message }}</div>{% endif %}
<div class="card mb-3 border border-dark p-3 text-light">
<!-- Webhook Title -->
<h2>{{ webhook_name }} - Latest Entries ({{ total_entries }} total from {{ feeds_count }} feeds)</h2>
<!-- Webhook Info -->
<div class="mt-3">
<p class="text-muted">
<div class="d-flex flex-column flex-md-row justify-content-between gap-3">
<div>
<h2 class="mb-2">{{ webhook_name }}</h2>
<p class="text-muted mb-1">
{{ total_entries }} total from {{ feeds_count }} feed{{ 's' if feeds_count != 1 else '' }}
</p>
<p class="text-muted mb-0">
<code>{{ webhook_url }}</code>
</p>
</div>
<div class="d-flex gap-2 align-items-start">
<a class="btn btn-outline-light btn-sm" href="/">Back to dashboard</a>
<a class="btn btn-outline-info btn-sm" href="/webhooks">All webhooks</a>
</div>
</div>
</div>
<div class="row g-3 mb-3">
<div class="col-lg-5">
<div class="card border border-dark p-3 text-light h-100">
<h3 class="h5">Settings</h3>
<ul class="list-unstyled text-muted mb-3">
<li>
<strong>Custom name:</strong> {{ hook_info.custom_name }}
</li>
<li>
<strong>Discord name:</strong> {{ hook_info.name or 'Unavailable' }}
</li>
<li>
<strong>Webhook:</strong>
<a class="text-muted" href="{{ hook_info.url }}">{{ hook_info.url | replace('https://discord.com/api/webhooks', '') }}</a>
</li>
</ul>
<form action="/modify_webhook" method="post" class="row g-3 mb-3">
<input type="hidden" name="old_hook" value="{{ webhook_url }}" />
<input type="hidden"
name="redirect_to"
value="/webhook_entries?webhook_url={{ webhook_url|encode_url }}" />
<div class="col-12">
<label for="new_hook" class="form-label">Modify Webhook</label>
<input type="text"
name="new_hook"
id="new_hook"
class="form-control border text-muted bg-dark"
placeholder="Enter new webhook URL" />
</div>
<div class="col-12">
<button type="submit" class="btn btn-primary w-100">Save Webhook URL</button>
</div>
</form>
<form action="/delete_webhook" method="post">
<input type="hidden" name="webhook_url" value="{{ webhook_url }}" />
<button type="submit"
class="btn btn-danger w-100"
onclick="return confirm('Are you sure you want to delete this webhook?');">
Delete Webhook
</button>
</form>
<hr class="border-secondary my-3" />
<h3 class="h6">Mass update feed URLs</h3>
<p class="text-muted small mb-2">Replace part of feed URLs for all feeds attached to this webhook.</p>
<form action="/webhook_entries"
method="get"
class="row g-2 mb-2"
hx-get="/webhook_entries_mass_update_preview"
hx-target="#mass-update-preview"
hx-swap="innerHTML">
<input type="hidden" name="webhook_url" value="{{ webhook_url|encode_url }}" />
<div class="col-12">
<label for="replace_from" class="form-label small">Replace this</label>
<input type="text"
name="replace_from"
id="replace_from"
class="form-control border text-muted bg-dark"
value="{{ replace_from }}"
placeholder="https://old-domain.example" />
</div>
<div class="col-12">
<label for="replace_to" class="form-label small">With this</label>
<input type="text"
name="replace_to"
id="replace_to"
class="form-control border text-muted bg-dark"
value="{{ replace_to }}"
placeholder="https://new-domain.example" />
</div>
<div class="col-12 form-check ms-1">
<input class="form-check-input"
type="checkbox"
value="true"
id="resolve_urls"
name="resolve_urls"
{% if resolve_urls %}checked{% endif %} />
<label class="form-check-label small" for="resolve_urls">Resolve final URL with redirects (uses httpx)</label>
</div>
<div class="col-12 form-check ms-1">
<input class="form-check-input"
type="checkbox"
value="true"
id="force_update"
name="force_update"
{% if force_update %}checked{% endif %} />
<label class="form-check-label small" for="force_update">Force update (overwrite conflicting target feed URLs)</label>
</div>
<div class="col-12">
<button type="submit" class="btn btn-outline-warning w-100">Preview changes</button>
</div>
</form>
<div id="mass-update-preview">{% include "_webhook_mass_update_preview.html" %}</div>
</div>
</div>
<div class="col-lg-7">
<div class="card border border-dark p-3 text-light h-100">
<h3 class="h5">Attached feeds</h3>
{% if webhook_feeds %}
<ul class="list-group list-unstyled mb-0">
{% for feed in webhook_feeds %}
<li class="mb-2">
<a class="text-muted" href="/feed?feed_url={{ feed.url|encode_url }}">
{% if feed.title %}
{{ feed.title }}
{% else %}
{{ feed.url }}
{% endif %}
</a>
{% if feed.title %}<span class="text-muted">- {{ feed.url }}</span>{% endif %}
{% if not feed.updates_enabled %}<span class="text-warning">Disabled</span>{% endif %}
{% if feed.last_exception %}<span class="text-danger">({{ feed.last_exception.value_str }})</span>{% endif %}
</li>
{% endfor %}
</ul>
{% else %}
<p class="text-muted mb-0">No feeds are attached to this webhook yet.</p>
{% endif %}
</div>
</div>
</div>
{# Rendered HTML content #}
{% if entries %}
<h3 class="h5 text-light">Latest entries</h3>
<pre>{{ html|safe }}</pre>
{% if is_show_more_entries_button_visible and last_entry %}
<a class="btn btn-dark mt-3"

View file

@ -6,10 +6,24 @@ import sys
import tempfile
from contextlib import suppress
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
if TYPE_CHECKING:
import pytest
def pytest_configure() -> None:
def pytest_addoption(parser: pytest.Parser) -> None:
"""Register custom command-line options for optional integration tests."""
parser.addoption(
"--run-real-git-backup-tests",
action="store_true",
default=False,
help="Run tests that push git backup state to a real repository.",
)
def pytest_sessionstart(session: pytest.Session) -> None:
"""Isolate persistent app state per xdist worker to avoid cross-worker test interference."""
worker_id: str = os.environ.get("PYTEST_XDIST_WORKER", "gw0")
worker_data_dir: Path = Path(tempfile.gettempdir()) / "discord-rss-bot-tests" / worker_id
@ -37,4 +51,10 @@ def pytest_configure() -> None:
current_reader.close()
get_reader: Any = getattr(settings_module, "get_reader", None)
if callable(get_reader):
main_module.reader = get_reader()
get_reader()
def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item]) -> None:
"""Skip real git-repo push tests unless explicitly requested."""
if config.getoption("--run-real-git-backup-tests"):
return

View file

@ -38,7 +38,7 @@ def test_has_black_tags() -> None:
# Test feed without any blacklist tags
assert_msg: str = "Feed should not have any blacklist tags"
assert feed_has_blacklist_tags(custom_reader=get_reader(), feed=feed) is False, assert_msg
assert feed_has_blacklist_tags(reader=get_reader(), feed=feed) is False, assert_msg
check_if_has_tag(reader, feed, "blacklist_title")
check_if_has_tag(reader, feed, "blacklist_summary")
@ -58,11 +58,11 @@ def test_has_black_tags() -> None:
def check_if_has_tag(reader: Reader, feed: Feed, blacklist_name: str) -> None:
reader.set_tag(feed, blacklist_name, "a") # pyright: ignore[reportArgumentType]
assert_msg: str = f"Feed should have blacklist tags: {blacklist_name}"
assert feed_has_blacklist_tags(custom_reader=reader, feed=feed) is True, assert_msg
assert feed_has_blacklist_tags(reader=reader, feed=feed) is True, assert_msg
asset_msg: str = f"Feed should not have any blacklist tags: {blacklist_name}"
reader.delete_tag(feed, blacklist_name)
assert feed_has_blacklist_tags(custom_reader=reader, feed=feed) is False, asset_msg
assert feed_has_blacklist_tags(reader=reader, feed=feed) is False, asset_msg
def test_should_be_skipped() -> None:

View file

@ -45,39 +45,39 @@ def test_entry_is_whitelisted() -> None:
Path.mkdir(Path(temp_dir), exist_ok=True)
custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite")
custom_reader: Reader = get_reader(custom_location=str(custom_loc))
reader: Reader = get_reader(custom_location=str(custom_loc))
# Add a feed to the database.
custom_reader.add_feed("https://lovinator.space/rss_test.xml")
custom_reader.update_feed("https://lovinator.space/rss_test.xml")
reader.add_feed("https://lovinator.space/rss_test.xml")
reader.update_feed("https://lovinator.space/rss_test.xml")
# whitelist_title
custom_reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
for entry in custom_reader.get_entries():
if entry_is_whitelisted(entry) is True:
reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
for entry in reader.get_entries():
if entry_is_whitelisted(entry, reader=reader) is True:
assert entry.title == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.title}"
break
custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_title")
reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_title")
# whitelist_summary
custom_reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
for entry in custom_reader.get_entries():
if entry_is_whitelisted(entry) is True:
reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
for entry in reader.get_entries():
if entry_is_whitelisted(entry, reader=reader) is True:
assert entry.summary == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.summary}"
break
custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_summary")
reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_summary")
# whitelist_content
custom_reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
for entry in custom_reader.get_entries():
if entry_is_whitelisted(entry) is True:
reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
for entry in reader.get_entries():
if entry_is_whitelisted(entry, reader=reader) is True:
assert_msg = f"Expected: <p>ffdnfdnfdnfdnfdndfn</p>, Got: {entry.content[0].value}"
assert entry.content[0].value == "<p>ffdnfdnfdnfdnfdndfn</p>", assert_msg
break
custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_content")
reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_content")
# Close the reader, so we can delete the directory.
custom_reader.close()
reader.close()
def test_entry_is_blacklisted() -> None:
@ -87,36 +87,36 @@ def test_entry_is_blacklisted() -> None:
Path.mkdir(Path(temp_dir), exist_ok=True)
custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite")
custom_reader: Reader = get_reader(custom_location=str(custom_loc))
reader: Reader = get_reader(custom_location=str(custom_loc))
# Add a feed to the database.
custom_reader.add_feed("https://lovinator.space/rss_test.xml")
custom_reader.update_feed("https://lovinator.space/rss_test.xml")
reader.add_feed("https://lovinator.space/rss_test.xml")
reader.update_feed("https://lovinator.space/rss_test.xml")
# blacklist_title
custom_reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
for entry in custom_reader.get_entries():
if entry_is_blacklisted(entry) is True:
reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
for entry in reader.get_entries():
if entry_is_blacklisted(entry, reader=reader) is True:
assert entry.title == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.title}"
break
custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_title")
reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_title")
# blacklist_summary
custom_reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
for entry in custom_reader.get_entries():
if entry_is_blacklisted(entry) is True:
reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
for entry in reader.get_entries():
if entry_is_blacklisted(entry, reader=reader) is True:
assert entry.summary == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.summary}"
break
custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_summary")
reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_summary")
# blacklist_content
custom_reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
for entry in custom_reader.get_entries():
if entry_is_blacklisted(entry) is True:
reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
for entry in reader.get_entries():
if entry_is_blacklisted(entry, reader=reader) is True:
assert_msg = f"Expected: <p>ffdnfdnfdnfdnfdndfn</p>, Got: {entry.content[0].value}"
assert entry.content[0].value == "<p>ffdnfdnfdnfdnfdndfn</p>", assert_msg
break
custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_content")
reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_content")
# Close the reader, so we can delete the directory.
custom_reader.close()
reader.close()

View file

@ -102,12 +102,10 @@ def test_format_entry_html_for_discord_does_not_preserve_invalid_timestamp_style
@patch("discord_rss_bot.custom_message.get_custom_message")
@patch("discord_rss_bot.custom_message.get_reader")
def test_replace_tags_in_text_message_preserves_timestamp_tags(
mock_get_reader: MagicMock,
mock_get_custom_message: MagicMock,
) -> None:
mock_get_reader.return_value = MagicMock()
mock_reader = MagicMock()
mock_get_custom_message.return_value = "{{entry_summary}}"
summary_parts: list[str] = [
f"<p>Format {index}: ({timestamp_tag.replace('<', '&lt;').replace('>', '&gt;')})</p>"
@ -116,19 +114,17 @@ def test_replace_tags_in_text_message_preserves_timestamp_tags(
entry_ns: SimpleNamespace = make_entry("".join(summary_parts))
entry: Entry = typing.cast("Entry", entry_ns)
rendered: str = replace_tags_in_text_message(entry)
rendered: str = replace_tags_in_text_message(entry, reader=mock_reader)
for timestamp_tag in TIMESTAMP_FORMATS:
assert timestamp_tag in rendered
@patch("discord_rss_bot.custom_message.get_embed")
@patch("discord_rss_bot.custom_message.get_reader")
def test_replace_tags_in_embed_preserves_timestamp_tags(
mock_get_reader: MagicMock,
mock_get_embed: MagicMock,
) -> None:
mock_get_reader.return_value = MagicMock()
mock_reader = MagicMock()
mock_get_embed.return_value = CustomEmbed(description="{{entry_summary}}")
summary_parts: list[str] = [
f"<p>Format {index}: ({timestamp_tag.replace('<', '&lt;').replace('>', '&gt;')})</p>"
@ -138,7 +134,7 @@ def test_replace_tags_in_embed_preserves_timestamp_tags(
entry: Entry = typing.cast("Entry", entry_ns)
embed: CustomEmbed = replace_tags_in_embed(entry_ns.feed, entry)
embed: CustomEmbed = replace_tags_in_embed(entry_ns.feed, entry, reader=mock_reader)
for timestamp_tag in TIMESTAMP_FORMATS:
assert timestamp_tag in embed.description

View file

@ -18,7 +18,6 @@ from discord_rss_bot.feeds import send_entry_to_discord
from discord_rss_bot.feeds import send_to_discord
from discord_rss_bot.feeds import should_send_embed_check
from discord_rss_bot.feeds import truncate_webhook_message
from discord_rss_bot.missing_tags import add_missing_tags
def test_send_to_discord() -> None:
@ -35,8 +34,6 @@ def test_send_to_discord() -> None:
# Add a feed to the reader.
reader.add_feed("https://www.reddit.com/r/Python/.rss")
add_missing_tags(reader)
# Update the feed to get the entries.
reader.update_feeds()
@ -58,7 +55,7 @@ def test_send_to_discord() -> None:
assert reader.get_tag(feed, "webhook") == webhook_url, f"The webhook URL should be '{webhook_url}'."
# Send the feed to Discord.
send_to_discord(custom_reader=reader, feed=feed, do_once=True)
send_to_discord(reader=reader, feed=feed, do_once=True)
# Close the reader, so we can delete the directory.
reader.close()
@ -191,7 +188,7 @@ def test_send_entry_to_discord_youtube_feed(
mock_discord_webhook.return_value = mock_webhook
# Call the function
send_entry_to_discord(mock_entry)
send_entry_to_discord(mock_entry, mock_reader)
# Assertions
mock_create_embed.assert_not_called()
@ -203,7 +200,7 @@ def test_send_entry_to_discord_youtube_feed(
assert webhook_call_kwargs["url"] == "https://discord.com/api/webhooks/123/abc"
# Verify execute_webhook was called
mock_execute_webhook.assert_called_once_with(mock_webhook, mock_entry)
mock_execute_webhook.assert_called_once_with(mock_webhook, mock_entry, reader=mock_reader)
def test_extract_domain_youtube_feed() -> None:

View file

@ -304,102 +304,6 @@ def test_commit_state_change_no_push_when_remote_unset(monkeypatch: pytest.Monke
assert not push_calls, "git push should NOT be called when GIT_BACKUP_REMOTE is not set"
@SKIP_IF_NO_GIT
def test_commit_state_change_e2e_push_to_bare_repo(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""End-to-end test: commit_state_change pushes to a real bare git repository."""
git_executable: str | None = shutil.which("git")
assert git_executable is not None, "git executable not found"
# Create a bare remote repository
bare_repo_path: Path = tmp_path / "remote.git"
subprocess.run([git_executable, "init", "--bare", str(bare_repo_path)], check=True, capture_output=True) # noqa: S603
# Configure backup with remote pointing to bare repo
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.setenv("GIT_BACKUP_REMOTE", str(bare_repo_path))
# Create mock reader with some state
mock_reader = MagicMock()
feed1 = MagicMock()
feed1.url = "https://example.com/feed.rss"
mock_reader.get_feeds.return_value = [feed1]
def get_tag_side_effect(
feed_or_key: tuple | str,
tag: str | None = None,
default: str | None = None,
) -> list[Any] | str | None:
if feed_or_key == ():
return []
if tag == "webhook":
return "https://discord.com/api/webhooks/123/abc"
return default
mock_reader.get_tag.side_effect = get_tag_side_effect
# Perform backup with commit and push
commit_state_change(mock_reader, "Initial backup")
# Verify commit exists in local backup repo
result: subprocess.CompletedProcess[str] = subprocess.run( # noqa: S603
[git_executable, "-C", str(backup_path), "log", "--oneline"],
capture_output=True,
text=True,
check=True,
)
assert "Initial backup" in result.stdout
# Verify origin remote is configured correctly
result = subprocess.run( # noqa: S603
[git_executable, "-C", str(backup_path), "remote", "get-url", "origin"],
capture_output=True,
text=True,
check=True,
)
assert result.stdout.strip() == str(bare_repo_path)
# Verify commit was pushed to the bare remote
result = subprocess.run( # noqa: S603
[git_executable, "-C", str(bare_repo_path), "log", "--oneline", "master"],
capture_output=True,
text=True,
check=True,
)
assert "Initial backup" in result.stdout
# Verify state.json content in the remote
result = subprocess.run( # noqa: S603
[git_executable, "-C", str(bare_repo_path), "show", "master:state.json"],
capture_output=True,
text=True,
check=True,
)
state_data: dict[str, Any] = json.loads(result.stdout)
assert state_data["feeds"][0]["url"] == "https://example.com/feed.rss"
assert state_data["feeds"][0]["webhook"] == "https://discord.com/api/webhooks/123/abc"
# Perform a second backup to verify subsequent pushes work
feed2 = MagicMock()
feed2.url = "https://another.com/feed.xml"
mock_reader.get_feeds.return_value = [feed1, feed2]
commit_state_change(mock_reader, "Add second feed")
# Verify both commits are in the remote
result = subprocess.run( # noqa: S603
[git_executable, "-C", str(bare_repo_path), "log", "--oneline", "master"],
capture_output=True,
text=True,
check=True,
)
assert "Initial backup" in result.stdout
assert "Add second feed" in result.stdout
# Integration tests for embed-related endpoint backups
client: TestClient = TestClient(app)
test_webhook_name: str = "Test Backup Webhook"
test_webhook_url: str = "https://discord.com/api/webhooks/999999999/testbackupwebhook"

View file

@ -4,13 +4,19 @@ import re
import urllib.parse
from dataclasses import dataclass
from dataclasses import field
from datetime import UTC
from datetime import datetime
from typing import TYPE_CHECKING
from typing import cast
from unittest.mock import MagicMock
from unittest.mock import patch
from fastapi.testclient import TestClient
import discord_rss_bot.main as main_module
from discord_rss_bot.main import app
from discord_rss_bot.main import create_html_for_feed
from discord_rss_bot.main import get_reader_dependency
if TYPE_CHECKING:
from pathlib import Path
@ -154,6 +160,9 @@ def test_get() -> None:
response: Response = client.get(url="/webhooks")
assert response.status_code == 200, f"/webhooks failed: {response.text}"
response = client.get(url="/webhook_entries", params={"webhook_url": webhook_url})
assert response.status_code == 200, f"/webhook_entries failed: {response.text}"
response: Response = client.get(url="/whitelist", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/whitelist failed: {response.text}"
@ -299,6 +308,147 @@ def test_change_feed_url() -> None:
client.post(url="/remove", data={"feed_url": new_feed_url})
def test_change_feed_url_marks_entries_as_read() -> None:
"""After changing a feed URL all entries on the new feed should be marked read to prevent resending."""
new_feed_url = "https://lovinator.space/rss_test_small.xml"
# Ensure feeds do not already exist.
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": new_feed_url})
# Ensure webhook exists.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
client.post(url="/add_webhook", data={"webhook_name": webhook_name, "webhook_url": webhook_url})
# Add the original feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Patch reader on the main module so we can observe calls.
mock_entry_a = MagicMock()
mock_entry_a.id = "entry-a"
mock_entry_b = MagicMock()
mock_entry_b.id = "entry-b"
real_reader = main_module.get_reader_dependency()
# Use a no-redirect client so the POST response is inspected directly; the
# redirect target (/feed?feed_url=…) would 404 because change_feed_url is mocked.
no_redirect_client = TestClient(app, follow_redirects=False)
with (
patch.object(real_reader, "get_entries", return_value=[mock_entry_a, mock_entry_b]) as mock_get_entries,
patch.object(real_reader, "set_entry_read") as mock_set_read,
patch.object(real_reader, "update_feed") as mock_update_feed,
patch.object(real_reader, "change_feed_url"),
):
response = no_redirect_client.post(
url="/change_feed_url",
data={"old_feed_url": feed_url, "new_feed_url": new_feed_url},
)
assert response.status_code == 303, f"Expected 303 redirect, got {response.status_code}: {response.text}"
# update_feed should have been called with the new URL.
mock_update_feed.assert_called_once_with(new_feed_url)
# get_entries should have been called to fetch unread entries on the new URL.
mock_get_entries.assert_called_once_with(feed=new_feed_url, read=False)
# Every returned entry should have been marked as read.
assert mock_set_read.call_count == 2, f"Expected 2 set_entry_read calls, got {mock_set_read.call_count}"
mock_set_read.assert_any_call(mock_entry_a, True)
mock_set_read.assert_any_call(mock_entry_b, True)
# Cleanup.
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": new_feed_url})
def test_change_feed_url_empty_old_url_returns_400() -> None:
"""Submitting an empty old_feed_url should return HTTP 400."""
response: Response = client.post(
url="/change_feed_url",
data={"old_feed_url": " ", "new_feed_url": "https://example.com/feed.xml"},
)
assert response.status_code == 400, f"Expected 400 for empty old URL, got {response.status_code}"
def test_change_feed_url_empty_new_url_returns_400() -> None:
"""Submitting a blank new_feed_url should return HTTP 400."""
response: Response = client.post(
url="/change_feed_url",
data={"old_feed_url": feed_url, "new_feed_url": " "},
)
assert response.status_code == 400, f"Expected 400 for blank new URL, got {response.status_code}"
def test_change_feed_url_nonexistent_old_url_returns_404() -> None:
"""Trying to rename a feed that does not exist should return HTTP 404."""
non_existent = "https://does-not-exist.example.com/rss.xml"
# Make sure it really is absent.
client.post(url="/remove", data={"feed_url": non_existent})
response: Response = client.post(
url="/change_feed_url",
data={"old_feed_url": non_existent, "new_feed_url": "https://example.com/new.xml"},
)
assert response.status_code == 404, f"Expected 404 for non-existent feed, got {response.status_code}"
def test_change_feed_url_new_url_already_exists_returns_409() -> None:
"""Changing to a URL that is already tracked should return HTTP 409."""
second_feed_url = "https://lovinator.space/rss_test_small.xml"
# Ensure both feeds are absent.
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": second_feed_url})
# Ensure webhook exists.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
client.post(url="/add_webhook", data={"webhook_name": webhook_name, "webhook_url": webhook_url})
# Add both feeds.
client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
client.post(url="/add", data={"feed_url": second_feed_url, "webhook_dropdown": webhook_name})
# Try to rename one to the other.
response: Response = client.post(
url="/change_feed_url",
data={"old_feed_url": feed_url, "new_feed_url": second_feed_url},
)
assert response.status_code == 409, f"Expected 409 when new URL already exists, got {response.status_code}"
# Cleanup.
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": second_feed_url})
def test_change_feed_url_same_url_redirects_without_error() -> None:
"""Changing a feed's URL to itself should redirect cleanly without any error."""
# Ensure webhook exists.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
client.post(url="/add_webhook", data={"webhook_name": webhook_name, "webhook_url": webhook_url})
# Add the feed.
client.post(url="/remove", data={"feed_url": feed_url})
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Submit the same URL as both old and new.
response = client.post(
url="/change_feed_url",
data={"old_feed_url": feed_url, "new_feed_url": feed_url},
)
assert response.status_code == 200, f"Expected 200 redirect for same URL, got {response.status_code}"
# Feed should still be accessible.
response = client.get(url="/feed", params={"feed_url": feed_url})
assert response.status_code == 200, f"Feed should still exist after no-op URL change: {response.text}"
# Cleanup.
client.post(url="/remove", data={"feed_url": feed_url})
def test_delete_webhook() -> None:
"""Test the /delete_webhook page."""
# Remove the feed if it already exists before we run the test.
@ -335,6 +485,110 @@ def test_update_feed_not_found() -> None:
assert "Feed not found" in response.text
def test_post_entry_send_to_discord() -> None:
"""Test that /post_entry sends an entry to Discord and redirects to the feed page.
Regression test for the bug where the injected reader was not passed to
send_entry_to_discord, meaning the dependency-injected reader was silently ignored.
"""
# Ensure webhook and feed exist.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Retrieve an entry from the feed to get a valid entry ID.
reader: main_module.Reader = main_module.get_reader_dependency()
entries: list[Entry] = list(reader.get_entries(feed=feed_url, limit=1))
assert entries, "Feed should have at least one entry to send"
entry_to_send: main_module.Entry = entries[0]
encoded_id: str = urllib.parse.quote(entry_to_send.id)
no_redirect_client = TestClient(app, follow_redirects=False)
# Patch execute_webhook so no real HTTP requests are made to Discord.
with patch("discord_rss_bot.feeds.execute_webhook") as mock_execute:
response = no_redirect_client.get(
url="/post_entry",
params={"entry_id": encoded_id, "feed_url": urllib.parse.quote(feed_url)},
)
assert response.status_code == 303, f"Expected redirect after sending, got {response.status_code}: {response.text}"
location: str = response.headers.get("location", "")
assert "feed?feed_url=" in location, f"Should redirect to feed page, got: {location}"
assert mock_execute.called, "execute_webhook should have been called to deliver the entry to Discord"
# Cleanup.
client.post(url="/remove", data={"feed_url": feed_url})
def test_post_entry_unknown_id_returns_404() -> None:
"""Test that /post_entry returns 404 when the entry ID does not exist."""
response: Response = client.get(
url="/post_entry",
params={"entry_id": "https://nonexistent.example.com/entry-that-does-not-exist"},
)
assert response.status_code == 404, f"Expected 404 for unknown entry, got {response.status_code}"
def test_post_entry_uses_feed_url_to_disambiguate_duplicate_ids() -> None:
"""When IDs collide across feeds, /post_entry should pick the entry from provided feed_url."""
@dataclass(slots=True)
class DummyFeed:
url: str
@dataclass(slots=True)
class DummyEntry:
id: str
feed: DummyFeed
feed_url: str
feed_a = "https://example.com/feed-a.xml"
feed_b = "https://example.com/feed-b.xml"
shared_id = "https://example.com/shared-entry-id"
entry_a: Entry = cast("Entry", DummyEntry(id=shared_id, feed=DummyFeed(feed_a), feed_url=feed_a))
entry_b: Entry = cast("Entry", DummyEntry(id=shared_id, feed=DummyFeed(feed_b), feed_url=feed_b))
class StubReader:
def get_entries(self, feed: str | None = None) -> list[Entry]:
if feed == feed_a:
return [entry_a]
if feed == feed_b:
return [entry_b]
return [entry_a, entry_b]
selected_feed_urls: list[str] = []
def fake_send_entry_to_discord(entry: Entry, reader: object) -> None:
selected_feed_urls.append(entry.feed.url)
app.dependency_overrides[get_reader_dependency] = StubReader
no_redirect_client = TestClient(app, follow_redirects=False)
try:
with patch("discord_rss_bot.main.send_entry_to_discord", side_effect=fake_send_entry_to_discord):
response: Response = no_redirect_client.get(
url="/post_entry",
params={"entry_id": urllib.parse.quote(shared_id), "feed_url": urllib.parse.quote(feed_b)},
)
assert response.status_code == 303, f"Expected redirect after sending, got {response.status_code}"
assert selected_feed_urls == [feed_b], f"Expected feed-b entry, got: {selected_feed_urls}"
location = response.headers.get("location", "")
assert urllib.parse.quote(feed_b) in location, f"Expected redirect to feed-b page, got: {location}"
finally:
app.dependency_overrides = {}
def test_navbar_backup_link_hidden_when_not_configured(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that the backup link is not shown in the navbar when GIT_BACKUP_PATH is not set."""
# Ensure GIT_BACKUP_PATH is not set
@ -573,11 +827,24 @@ def test_create_html_marks_entries_from_another_feed(monkeypatch: pytest.MonkeyP
original_feed_url="https://example.com/feed-b.xml",
)
monkeypatch.setattr("discord_rss_bot.main.replace_tags_in_text_message", lambda _entry: "Rendered content")
monkeypatch.setattr("discord_rss_bot.main.entry_is_blacklisted", lambda _entry: False)
monkeypatch.setattr("discord_rss_bot.main.entry_is_whitelisted", lambda _entry: False)
monkeypatch.setattr(
"discord_rss_bot.main.replace_tags_in_text_message",
lambda _entry, **_kwargs: "Rendered content",
)
monkeypatch.setattr("discord_rss_bot.main.entry_is_blacklisted", lambda _entry, **_kwargs: False)
monkeypatch.setattr("discord_rss_bot.main.entry_is_whitelisted", lambda _entry, **_kwargs: False)
html = create_html_for_feed(cast("list[Entry]", [same_feed_entry, other_feed_entry]), selected_feed_url)
same_feed_entry_typed: Entry = cast("Entry", same_feed_entry)
other_feed_entry_typed: Entry = cast("Entry", other_feed_entry)
html: str = create_html_for_feed(
reader=MagicMock(),
current_feed_url=selected_feed_url,
entries=[
same_feed_entry_typed,
other_feed_entry_typed,
],
)
assert "From another feed: https://example.com/feed-b.xml" in html
assert "From another feed: https://example.com/feed-a.xml" not in html
@ -620,6 +887,32 @@ def test_webhook_entries_no_feeds() -> None:
assert "No feeds found" in response.text or "Add feeds" in response.text, "Expected message about no feeds"
def test_webhook_entries_no_feeds_still_shows_webhook_settings() -> None:
"""The webhook detail view should show settings/actions even with no attached feeds."""
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
response = client.get(
url="/webhook_entries",
params={"webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
assert "Settings" in response.text, "Expected settings card on webhook detail view"
assert "Modify Webhook" in response.text, "Expected modify form on webhook detail view"
assert "Delete Webhook" in response.text, "Expected delete action on webhook detail view"
assert "Back to dashboard" in response.text, "Expected dashboard navigation link"
assert "All webhooks" in response.text, "Expected all webhooks navigation link"
assert f'name="old_hook" value="{webhook_url}"' in response.text, "Expected old_hook hidden input"
assert f'value="/webhook_entries?webhook_url={urllib.parse.quote(webhook_url)}"' in response.text, (
"Expected modify form to redirect back to the current webhook detail view"
)
def test_webhook_entries_with_feeds_no_entries() -> None:
"""Test webhook_entries endpoint when webhook has feeds but no entries yet."""
# Clean up and create fresh webhook
@ -681,6 +974,38 @@ def test_webhook_entries_with_entries() -> None:
assert webhook_name in response.text, "Webhook name not found in response"
# Should show entries (the feed has entries)
assert "total from" in response.text, "Expected to see entry count"
assert "Modify Webhook" in response.text, "Expected webhook settings to be visible"
assert "Attached feeds" in response.text, "Expected attached feeds section to be visible"
def test_webhook_entries_shows_attached_feed_link() -> None:
"""The webhook detail view should list attached feeds linking to their feed pages."""
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(
url="/add",
data={"feed_url": feed_url, "webhook_dropdown": webhook_name},
)
assert response.status_code == 200, f"Failed to add feed: {response.text}"
response = client.get(
url="/webhook_entries",
params={"webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
assert f"/feed?feed_url={urllib.parse.quote(feed_url)}" in response.text, (
"Expected attached feed to link to its feed detail page"
)
assert "Latest entries" in response.text, "Expected latest entries heading on webhook detail view"
client.post(url="/remove", data={"feed_url": feed_url})
def test_webhook_entries_multiple_feeds() -> None:
@ -716,6 +1041,75 @@ def test_webhook_entries_multiple_feeds() -> None:
client.post(url="/remove", data={"feed_url": feed_url})
def test_webhook_entries_sort_newest_and_non_null_published_first() -> None:
"""Webhook entries should be sorted newest-first with published=None entries placed last."""
@dataclass(slots=True)
class DummyFeed:
url: str
title: str | None = None
updates_enabled: bool = True
last_exception: None = None
@dataclass(slots=True)
class DummyEntry:
id: str
feed: DummyFeed
published: datetime | None
dummy_feed = DummyFeed(url="https://example.com/feed.xml", title="Example Feed")
# Intentionally unsorted input with two dated entries and two undated entries.
unsorted_entries: list[Entry] = [
cast("Entry", DummyEntry(id="old", feed=dummy_feed, published=datetime(2024, 1, 1, tzinfo=UTC))),
cast("Entry", DummyEntry(id="none-1", feed=dummy_feed, published=None)),
cast("Entry", DummyEntry(id="new", feed=dummy_feed, published=datetime(2024, 2, 1, tzinfo=UTC))),
cast("Entry", DummyEntry(id="none-2", feed=dummy_feed, published=None)),
]
class StubReader:
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return [dummy_feed]
def get_entries(self, **_kwargs: object) -> list[Entry]:
return unsorted_entries
observed_order: list[str] = []
def capture_entries(*, reader: object, entries: list[Entry], current_feed_url: str = "") -> str:
del reader, current_feed_url
observed_order.extend(entry.id for entry in entries)
return ""
app.dependency_overrides[get_reader_dependency] = StubReader
try:
with (
patch(
"discord_rss_bot.main.get_data_from_hook_url",
return_value=main_module.WebhookInfo(custom_name=webhook_name, url=webhook_url),
),
patch("discord_rss_bot.main.create_html_for_feed", side_effect=capture_entries),
):
response: Response = client.get(
url="/webhook_entries",
params={"webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
assert observed_order == ["new", "old", "none-1", "none-2"], (
"Expected newest published entries first and published=None entries last"
)
finally:
app.dependency_overrides = {}
def test_webhook_entries_pagination() -> None:
"""Test webhook_entries endpoint pagination functionality."""
# Clean up and create webhook
@ -783,3 +1177,445 @@ def test_webhook_entries_url_encoding() -> None:
# Clean up
client.post(url="/remove", data={"feed_url": feed_url})
def test_dashboard_webhook_name_links_to_webhook_detail() -> None:
"""Webhook names on the dashboard should open the webhook detail view."""
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
expected_link = f"/webhook_entries?webhook_url={urllib.parse.quote(webhook_url)}"
assert expected_link in response.text, "Expected dashboard webhook link to point to the webhook detail view"
client.post(url="/remove", data={"feed_url": feed_url})
def test_modify_webhook_redirects_back_to_webhook_detail() -> None:
"""Webhook updates from the detail view should redirect back to that view with the new URL."""
original_webhook_url = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz"
new_webhook_url = "https://discord.com/api/webhooks/1234567890/updated-token"
client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url})
client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url})
def test_modify_webhook_triggers_git_backup_commit() -> None:
"""Modifying a webhook URL should record a state change for git backup."""
original_webhook_url = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz"
new_webhook_url = "https://discord.com/api/webhooks/1234567890/updated-token"
client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url})
client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": original_webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
no_redirect_client = TestClient(app, follow_redirects=False)
with patch("discord_rss_bot.main.commit_state_change") as mock_commit_state_change:
response = no_redirect_client.post(
url="/modify_webhook",
data={
"old_hook": original_webhook_url,
"new_hook": new_webhook_url,
"redirect_to": f"/webhook_entries?webhook_url={urllib.parse.quote(original_webhook_url)}",
},
)
assert response.status_code == 303, f"Expected 303 redirect, got {response.status_code}: {response.text}"
assert mock_commit_state_change.call_count == 1, "Expected webhook modification to trigger git backup commit"
client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url})
response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": original_webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
no_redirect_client = TestClient(app, follow_redirects=False)
response = no_redirect_client.post(
url="/modify_webhook",
data={
"old_hook": original_webhook_url,
"new_hook": new_webhook_url,
"redirect_to": f"/webhook_entries?webhook_url={urllib.parse.quote(original_webhook_url)}",
},
)
assert response.status_code == 303, f"Expected 303 redirect, got {response.status_code}: {response.text}"
assert response.headers["location"] == (f"/webhook_entries?webhook_url={urllib.parse.quote(new_webhook_url)}"), (
f"Unexpected redirect location: {response.headers['location']}"
)
client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url})
def test_webhook_entries_mass_update_preview_shows_old_and_new_urls() -> None:
"""Preview should list old->new feed URLs for webhook bulk replacement."""
@dataclass(slots=True)
class DummyFeed:
url: str
title: str | None = None
updates_enabled: bool = True
last_exception: None = None
class StubReader:
def __init__(self) -> None:
self._feeds: list[DummyFeed] = [
DummyFeed(url="https://old.example.com/rss/a.xml", title="A"),
DummyFeed(url="https://old.example.com/rss/b.xml", title="B"),
DummyFeed(url="https://unchanged.example.com/rss/c.xml", title="C"),
]
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
if resource.startswith("https://old.example.com"):
return webhook_url
if resource.startswith("https://unchanged.example.com"):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return self._feeds
def get_entries(self, **_kwargs: object) -> list[Entry]:
return []
app.dependency_overrides[get_reader_dependency] = StubReader
try:
with (
patch(
"discord_rss_bot.main.get_data_from_hook_url",
return_value=main_module.WebhookInfo(custom_name=webhook_name, url=webhook_url),
),
patch(
"discord_rss_bot.main.resolve_final_feed_url",
side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
),
):
response: Response = client.get(
url="/webhook_entries",
params={
"webhook_url": webhook_url,
"replace_from": "old.example.com",
"replace_to": "new.example.com",
"resolve_urls": "true",
},
)
assert response.status_code == 200, f"Failed to get preview: {response.text}"
assert "Mass update feed URLs" in response.text
assert "old.example.com/rss/a.xml" in response.text
assert "new.example.com/rss/a.xml" in response.text
assert "Will update" in response.text
assert "Matched: 2" in response.text
assert "Will update: 2" in response.text
finally:
app.dependency_overrides = {}
def test_bulk_change_feed_urls_updates_matching_feeds() -> None:
"""Mass updater should change all matching feed URLs for a webhook."""
@dataclass(slots=True)
class DummyFeed:
url: str
class StubReader:
def __init__(self) -> None:
self._feeds = [
DummyFeed(url="https://old.example.com/rss/a.xml"),
DummyFeed(url="https://old.example.com/rss/b.xml"),
DummyFeed(url="https://unchanged.example.com/rss/c.xml"),
]
self.change_calls: list[tuple[str, str]] = []
self.updated_feeds: list[str] = []
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return self._feeds
def change_feed_url(self, old_url: str, new_url: str) -> None:
self.change_calls.append((old_url, new_url))
def update_feed(self, feed_url: str) -> None:
self.updated_feeds.append(feed_url)
def get_entries(self, **_kwargs: object) -> list[Entry]:
return []
def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001
return
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
no_redirect_client = TestClient(app, follow_redirects=False)
try:
with patch(
"discord_rss_bot.main.resolve_final_feed_url",
side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
):
response: Response = no_redirect_client.post(
url="/bulk_change_feed_urls",
data={
"webhook_url": webhook_url,
"replace_from": "old.example.com",
"replace_to": "new.example.com",
"resolve_urls": "true",
},
)
assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}"
assert "Updated%202%20feed%20URL%28s%29" in response.headers.get("location", "")
assert sorted(stub_reader.change_calls) == sorted([
("https://old.example.com/rss/a.xml", "https://new.example.com/rss/a.xml"),
("https://old.example.com/rss/b.xml", "https://new.example.com/rss/b.xml"),
])
assert sorted(stub_reader.updated_feeds) == sorted([
"https://new.example.com/rss/a.xml",
"https://new.example.com/rss/b.xml",
])
finally:
app.dependency_overrides = {}
def test_webhook_entries_mass_update_preview_fragment_endpoint() -> None:
"""HTMX preview endpoint should render only the mass-update preview fragment."""
@dataclass(slots=True)
class DummyFeed:
url: str
title: str | None = None
updates_enabled: bool = True
last_exception: None = None
class StubReader:
def __init__(self) -> None:
self._feeds: list[DummyFeed] = [
DummyFeed(url="https://old.example.com/rss/a.xml", title="A"),
DummyFeed(url="https://old.example.com/rss/b.xml", title="B"),
]
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if key == "webhook" and isinstance(resource, str):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return self._feeds
app.dependency_overrides[get_reader_dependency] = StubReader
try:
with patch(
"discord_rss_bot.main.resolve_final_feed_url",
side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
):
response: Response = client.get(
url="/webhook_entries_mass_update_preview",
params={
"webhook_url": webhook_url,
"replace_from": "old.example.com",
"replace_to": "new.example.com",
"resolve_urls": "true",
},
)
assert response.status_code == 200, f"Failed to get HTMX preview fragment: {response.text}"
assert "Will update: 2" in response.text
assert "<table" in response.text
assert "Mass update feed URLs" not in response.text, "Fragment should not include full page wrapper text"
finally:
app.dependency_overrides = {}
def test_bulk_change_feed_urls_force_update_overwrites_conflict() -> None: # noqa: C901
"""Force update should overwrite conflicting target URLs instead of skipping them."""
@dataclass(slots=True)
class DummyFeed:
url: str
class StubReader:
def __init__(self) -> None:
self._feeds = [
DummyFeed(url="https://old.example.com/rss/a.xml"),
DummyFeed(url="https://new.example.com/rss/a.xml"),
]
self.delete_calls: list[str] = []
self.change_calls: list[tuple[str, str]] = []
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return self._feeds
def delete_feed(self, feed_url: str) -> None:
self.delete_calls.append(feed_url)
def change_feed_url(self, old_url: str, new_url: str) -> None:
self.change_calls.append((old_url, new_url))
def update_feed(self, _feed_url: str) -> None:
return
def get_entries(self, **_kwargs: object) -> list[Entry]:
return []
def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001
return
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
no_redirect_client = TestClient(app, follow_redirects=False)
try:
with patch(
"discord_rss_bot.main.resolve_final_feed_url",
side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
):
response: Response = no_redirect_client.post(
url="/bulk_change_feed_urls",
data={
"webhook_url": webhook_url,
"replace_from": "old.example.com",
"replace_to": "new.example.com",
"resolve_urls": "true",
"force_update": "true",
},
)
assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}"
assert stub_reader.delete_calls == ["https://new.example.com/rss/a.xml"]
assert stub_reader.change_calls == [
(
"https://old.example.com/rss/a.xml",
"https://new.example.com/rss/a.xml",
),
]
assert "Force%20overwrote%201" in response.headers.get("location", "")
finally:
app.dependency_overrides = {}
def test_bulk_change_feed_urls_force_update_ignores_resolution_error() -> None:
"""Force update should proceed even when URL resolution returns an error (e.g. HTTP 404)."""
@dataclass(slots=True)
class DummyFeed:
url: str
class StubReader:
def __init__(self) -> None:
self._feeds = [
DummyFeed(url="https://old.example.com/rss/a.xml"),
]
self.change_calls: list[tuple[str, str]] = []
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return self._feeds
def change_feed_url(self, old_url: str, new_url: str) -> None:
self.change_calls.append((old_url, new_url))
def update_feed(self, _feed_url: str) -> None:
return
def get_entries(self, **_kwargs: object) -> list[Entry]:
return []
def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001
return
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
no_redirect_client = TestClient(app, follow_redirects=False)
try:
with patch(
"discord_rss_bot.main.resolve_final_feed_url",
return_value=("https://new.example.com/rss/a.xml", "HTTP 404"),
):
response: Response = no_redirect_client.post(
url="/bulk_change_feed_urls",
data={
"webhook_url": webhook_url,
"replace_from": "old.example.com",
"replace_to": "new.example.com",
"resolve_urls": "true",
"force_update": "true",
},
)
assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}"
assert stub_reader.change_calls == [
(
"https://old.example.com/rss/a.xml",
"https://new.example.com/rss/a.xml",
),
]
location = response.headers.get("location", "")
assert "Updated%201%20feed%20URL%28s%29" in location
assert "Failed%200" in location
finally:
app.dependency_overrides = {}
def test_reader_dependency_override_is_used() -> None:
"""Reader should be injectable and overridable via FastAPI dependency overrides."""
class StubReader:
def get_tag(self, _resource: str, _key: str, default: str | None = None) -> str | None:
"""Stub get_tag that always returns the default value.
Args:
_resource: Ignored.
_key: Ignored.
default: The value to return.
Returns:
The default value, simulating a missing tag.
"""
return default
app.dependency_overrides[get_reader_dependency] = StubReader
try:
response: Response = client.get(url="/add")
assert response.status_code == 200, f"Expected /add to render with overridden reader: {response.text}"
finally:
app.dependency_overrides = {}

View file

@ -46,7 +46,7 @@ def test_create_search_context() -> None:
reader.update_search()
# Create the search context.
context: dict = create_search_context("test", custom_reader=reader)
context: dict = create_search_context("test", reader=reader)
assert context is not None, f"The context should not be None. Got: {context}"
# Close the reader, so we can delete the directory.

View file

@ -22,12 +22,12 @@ def test_reader() -> None:
Path.mkdir(Path(temp_dir), exist_ok=True)
custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite")
custom_reader: Reader = get_reader(custom_location=str(custom_loc))
assert_msg = f"The custom reader should be an instance of Reader. But it was '{type(custom_reader)}'."
assert isinstance(custom_reader, Reader), assert_msg
reader: Reader = get_reader(custom_location=str(custom_loc))
assert_msg = f"The custom reader should be an instance of Reader. But it was '{type(reader)}'."
assert isinstance(reader, Reader), assert_msg
# Close the reader, so we can delete the directory.
custom_reader.close()
reader.close()
def test_data_dir() -> None:
@ -49,16 +49,16 @@ def test_get_webhook_for_entry() -> None:
Path.mkdir(Path(temp_dir), exist_ok=True)
custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite")
custom_reader: Reader = get_reader(custom_location=str(custom_loc))
reader: Reader = get_reader(custom_location=str(custom_loc))
# Add a feed to the database.
custom_reader.add_feed("https://www.reddit.com/r/movies.rss")
custom_reader.update_feed("https://www.reddit.com/r/movies.rss")
reader.add_feed("https://www.reddit.com/r/movies.rss")
reader.update_feed("https://www.reddit.com/r/movies.rss")
# Add a webhook to the database.
custom_reader.set_tag("https://www.reddit.com/r/movies.rss", "webhook", "https://example.com") # pyright: ignore[reportArgumentType]
our_tag = custom_reader.get_tag("https://www.reddit.com/r/movies.rss", "webhook") # pyright: ignore[reportArgumentType]
reader.set_tag("https://www.reddit.com/r/movies.rss", "webhook", "https://example.com") # pyright: ignore[reportArgumentType]
our_tag = reader.get_tag("https://www.reddit.com/r/movies.rss", "webhook") # pyright: ignore[reportArgumentType]
assert our_tag == "https://example.com", f"The tag should be 'https://example.com'. But it was '{our_tag}'."
# Close the reader, so we can delete the directory.
custom_reader.close()
reader.close()

View file

@ -37,7 +37,7 @@ def test_has_white_tags() -> None:
reader.update_feeds()
# Test feed without any whitelist tags
assert has_white_tags(custom_reader=get_reader(), feed=feed) is False, "Feed should not have any whitelist tags"
assert has_white_tags(reader=get_reader(), feed=feed) is False, "Feed should not have any whitelist tags"
check_if_has_tag(reader, feed, "whitelist_title")
check_if_has_tag(reader, feed, "whitelist_summary")
@ -56,9 +56,9 @@ def test_has_white_tags() -> None:
def check_if_has_tag(reader: Reader, feed: Feed, whitelist_name: str) -> None:
reader.set_tag(feed, whitelist_name, "a") # pyright: ignore[reportArgumentType]
assert has_white_tags(custom_reader=reader, feed=feed) is True, "Feed should have whitelist tags"
assert has_white_tags(reader=reader, feed=feed) is True, "Feed should have whitelist tags"
reader.delete_tag(feed, whitelist_name)
assert has_white_tags(custom_reader=reader, feed=feed) is False, "Feed should not have any whitelist tags"
assert has_white_tags(reader=reader, feed=feed) is False, "Feed should not have any whitelist tags"
def test_should_be_sent() -> None: