This commit is contained in:
Joakim Hellsén 2026-03-15 19:37:55 +01:00
commit 71695c2987
Signed by: Joakim Hellsén
SSH key fingerprint: SHA256:/9h/CsExpFp+PRhsfA0xznFx2CGfTT5R/kpuFfUgEQk
15 changed files with 186 additions and 176 deletions

View file

@ -8,15 +8,11 @@ from discord_rss_bot.filter.blacklist import entry_should_be_skipped
from discord_rss_bot.filter.blacklist import feed_has_blacklist_tags from discord_rss_bot.filter.blacklist import feed_has_blacklist_tags
from discord_rss_bot.filter.whitelist import has_white_tags from discord_rss_bot.filter.whitelist import has_white_tags
from discord_rss_bot.filter.whitelist import should_be_sent from discord_rss_bot.filter.whitelist import should_be_sent
from discord_rss_bot.settings import get_reader
if TYPE_CHECKING: if TYPE_CHECKING:
from reader import Entry from reader import Entry
from reader import Reader from reader import Reader
# Our reader
reader: Reader = get_reader()
@lru_cache @lru_cache
def encode_url(url_to_quote: str) -> str: def encode_url(url_to_quote: str) -> str:
@ -34,11 +30,12 @@ def encode_url(url_to_quote: str) -> str:
return urllib.parse.quote(string=url_to_quote) if url_to_quote else "" return urllib.parse.quote(string=url_to_quote) if url_to_quote else ""
def entry_is_whitelisted(entry_to_check: Entry) -> bool: def entry_is_whitelisted(entry_to_check: Entry, reader: Reader) -> bool:
"""Check if the entry is whitelisted. """Check if the entry is whitelisted.
Args: Args:
entry_to_check: The feed to check. entry_to_check: The feed to check.
reader: Custom Reader instance.
Returns: Returns:
bool: True if the feed is whitelisted, False otherwise. bool: True if the feed is whitelisted, False otherwise.
@ -47,11 +44,12 @@ def entry_is_whitelisted(entry_to_check: Entry) -> bool:
return bool(has_white_tags(reader, entry_to_check.feed) and should_be_sent(reader, entry_to_check)) return bool(has_white_tags(reader, entry_to_check.feed) and should_be_sent(reader, entry_to_check))
def entry_is_blacklisted(entry_to_check: Entry) -> bool: def entry_is_blacklisted(entry_to_check: Entry, reader: Reader) -> bool:
"""Check if the entry is blacklisted. """Check if the entry is blacklisted.
Args: Args:
entry_to_check: The feed to check. entry_to_check: The feed to check.
reader: Custom Reader instance.
Returns: Returns:
bool: True if the feed is blacklisted, False otherwise. bool: True if the feed is blacklisted, False otherwise.

View file

@ -12,7 +12,6 @@ from bs4 import Tag
from markdownify import markdownify from markdownify import markdownify
from discord_rss_bot.is_url_valid import is_url_valid from discord_rss_bot.is_url_valid import is_url_valid
from discord_rss_bot.settings import get_reader
if TYPE_CHECKING: if TYPE_CHECKING:
from reader import Entry from reader import Entry
@ -118,18 +117,18 @@ def format_entry_html_for_discord(text: str) -> str:
return _restore_discord_timestamp_tags(formatted_text, replacements) return _restore_discord_timestamp_tags(formatted_text, replacements)
def replace_tags_in_text_message(entry: Entry) -> str: def replace_tags_in_text_message(entry: Entry, reader: Reader) -> str:
"""Replace tags in custom_message. """Replace tags in custom_message.
Args: Args:
entry: The entry to get the tags from. entry: The entry to get the tags from.
reader: Custom Reader instance.
Returns: Returns:
Returns the custom_message with the tags replaced. Returns the custom_message with the tags replaced.
""" """
feed: Feed = entry.feed feed: Feed = entry.feed
custom_reader: Reader = get_reader() custom_message: str = get_custom_message(feed=feed, reader=reader)
custom_message: str = get_custom_message(feed=feed, custom_reader=custom_reader)
content = "" content = ""
if entry.content: if entry.content:
@ -231,18 +230,18 @@ def get_first_image(summary: str | None, content: str | None) -> str:
return "" return ""
def replace_tags_in_embed(feed: Feed, entry: Entry) -> CustomEmbed: def replace_tags_in_embed(feed: Feed, entry: Entry, reader: Reader) -> CustomEmbed:
"""Replace tags in embed. """Replace tags in embed.
Args: Args:
feed: The feed to get the tags from. feed: The feed to get the tags from.
entry: The entry to get the tags from. entry: The entry to get the tags from.
reader: Custom Reader instance.
Returns: Returns:
Returns the embed with the tags replaced. Returns the embed with the tags replaced.
""" """
custom_reader: Reader = get_reader() embed: CustomEmbed = get_embed(feed=feed, reader=reader)
embed: CustomEmbed = get_embed(feed=feed, custom_reader=custom_reader)
content = "" content = ""
if entry.content: if entry.content:
@ -333,29 +332,29 @@ def _replace_embed_tags(embed: CustomEmbed, template: str, replace_with: str) ->
embed.footer_icon_url = try_to_replace(embed.footer_icon_url, template, replace_with) embed.footer_icon_url = try_to_replace(embed.footer_icon_url, template, replace_with)
def get_custom_message(custom_reader: Reader, feed: Feed) -> str: def get_custom_message(reader: Reader, feed: Feed) -> str:
"""Get custom_message tag from feed. """Get custom_message tag from feed.
Args: Args:
custom_reader: What Reader to use. reader: What Reader to use.
feed: The feed to get the tag from. feed: The feed to get the tag from.
Returns: Returns:
Returns the contents from the custom_message tag. Returns the contents from the custom_message tag.
""" """
try: try:
custom_message: str = str(custom_reader.get_tag(feed, "custom_message", "")) custom_message: str = str(reader.get_tag(feed, "custom_message", ""))
except ValueError: except ValueError:
custom_message = "" custom_message = ""
return custom_message return custom_message
def save_embed(custom_reader: Reader, feed: Feed, embed: CustomEmbed) -> None: def save_embed(reader: Reader, feed: Feed, embed: CustomEmbed) -> None:
"""Set embed tag in feed. """Set embed tag in feed.
Args: Args:
custom_reader: What Reader to use. reader: What Reader to use.
feed: The feed to set the tag in. feed: The feed to set the tag in.
embed: The embed to set. embed: The embed to set.
""" """
@ -371,20 +370,20 @@ def save_embed(custom_reader: Reader, feed: Feed, embed: CustomEmbed) -> None:
"footer_text": embed.footer_text, "footer_text": embed.footer_text,
"footer_icon_url": embed.footer_icon_url, "footer_icon_url": embed.footer_icon_url,
} }
custom_reader.set_tag(feed, "embed", json.dumps(embed_dict)) # pyright: ignore[reportArgumentType] reader.set_tag(feed, "embed", json.dumps(embed_dict)) # pyright: ignore[reportArgumentType]
def get_embed(custom_reader: Reader, feed: Feed) -> CustomEmbed: def get_embed(reader: Reader, feed: Feed) -> CustomEmbed:
"""Get embed tag from feed. """Get embed tag from feed.
Args: Args:
custom_reader: What Reader to use. reader: What Reader to use.
feed: The feed to get the tag from. feed: The feed to get the tag from.
Returns: Returns:
Returns the contents from the embed tag. Returns the contents from the embed tag.
""" """
embed = custom_reader.get_tag(feed, "embed", "") embed = reader.get_tag(feed, "embed", "")
if embed: if embed:
if not isinstance(embed, str): if not isinstance(embed, str):

View file

@ -96,26 +96,26 @@ def extract_domain(url: str) -> str: # noqa: PLR0911
return "Other" return "Other"
def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> str | None: # noqa: C901 def send_entry_to_discord(entry: Entry, reader: Reader | None = None) -> str | None: # noqa: C901
"""Send a single entry to Discord. """Send a single entry to Discord.
Args: Args:
entry: The entry to send to Discord. entry: The entry to send to Discord.
custom_reader: The reader to use. If None, the default reader will be used. reader: The reader to use. If None, the default reader will be used.
Returns: Returns:
str | None: The error message if there was an error, otherwise None. str | None: The error message if there was an error, otherwise None.
""" """
# Get the default reader if we didn't get a custom one. # Get the default reader if we didn't get a custom one.
reader: Reader = get_reader() if custom_reader is None else custom_reader effective_reader: Reader = get_reader() if reader is None else reader
# Get the webhook URL for the entry. # Get the webhook URL for the entry.
webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook", "")) webhook_url: str = str(effective_reader.get_tag(entry.feed_url, "webhook", ""))
if not webhook_url: if not webhook_url:
return "No webhook URL found." return "No webhook URL found."
# If https://discord.com/quests/<quest_id> is in the URL, send a separate message with the URL. # If https://discord.com/quests/<quest_id> is in the URL, send a separate message with the URL.
send_discord_quest_notification(entry, webhook_url) send_discord_quest_notification(entry, webhook_url, reader=effective_reader)
# Check if this is a c3kay feed # Check if this is a c3kay feed
if is_c3kay_feed(entry.feed.url): if is_c3kay_feed(entry.feed.url):
@ -126,7 +126,7 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) ->
post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id) post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id)
if post_data: if post_data:
webhook = create_hoyolab_webhook(webhook_url, entry, post_data) webhook = create_hoyolab_webhook(webhook_url, entry, post_data)
execute_webhook(webhook, entry) execute_webhook(webhook, entry, reader=effective_reader)
return None return None
logger.warning( logger.warning(
"Failed to create Hoyolab webhook for feed %s, falling back to regular processing", "Failed to create Hoyolab webhook for feed %s, falling back to regular processing",
@ -139,15 +139,15 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) ->
# Try to get the custom message for the feed. If the user has none, we will use the default message. # Try to get the custom message for the feed. If the user has none, we will use the default message.
# This has to be a string for some reason so don't change it to "not custom_message.get_custom_message()" # This has to be a string for some reason so don't change it to "not custom_message.get_custom_message()"
if get_custom_message(reader, entry.feed) != "": # noqa: PLC1901 if get_custom_message(effective_reader, entry.feed) != "": # noqa: PLC1901
webhook_message: str = replace_tags_in_text_message(entry=entry) webhook_message: str = replace_tags_in_text_message(entry=entry, reader=effective_reader)
if not webhook_message: if not webhook_message:
webhook_message = "No message found." webhook_message = "No message found."
# Create the webhook. # Create the webhook.
try: try:
should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed", True)) should_send_embed = bool(effective_reader.get_tag(entry.feed, "should_send_embed", True))
except StorageError: except StorageError:
logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url) logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url)
should_send_embed = True should_send_embed = True
@ -157,15 +157,15 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) ->
should_send_embed = False should_send_embed = False
if should_send_embed: if should_send_embed:
webhook = create_embed_webhook(webhook_url, entry) webhook = create_embed_webhook(webhook_url, entry, reader=effective_reader)
else: else:
webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True) webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True)
execute_webhook(webhook, entry) execute_webhook(webhook, entry, reader=effective_reader)
return None return None
def send_discord_quest_notification(entry: Entry, webhook_url: str) -> None: def send_discord_quest_notification(entry: Entry, webhook_url: str, reader: Reader) -> None:
"""Send a separate message to Discord if the entry is a quest notification.""" """Send a separate message to Discord if the entry is a quest notification."""
quest_regex: re.Pattern[str] = re.compile(r"https://discord\.com/quests/\d+") quest_regex: re.Pattern[str] = re.compile(r"https://discord\.com/quests/\d+")
@ -177,7 +177,7 @@ def send_discord_quest_notification(entry: Entry, webhook_url: str) -> None:
content=quest_url, content=quest_url,
rate_limit_retry=True, rate_limit_retry=True,
) )
execute_webhook(webhook, entry) execute_webhook(webhook, entry, reader=reader)
# Iterate through the content of the entry # Iterate through the content of the entry
for content in entry.content: for content in entry.content:
@ -235,12 +235,17 @@ def set_title(custom_embed: CustomEmbed, discord_embed: DiscordEmbed) -> None:
discord_embed.set_title(embed_title) if embed_title else None discord_embed.set_title(embed_title) if embed_title else None
def create_embed_webhook(webhook_url: str, entry: Entry) -> DiscordWebhook: # noqa: C901 def create_embed_webhook( # noqa: C901
webhook_url: str,
entry: Entry,
reader: Reader,
) -> DiscordWebhook:
"""Create a webhook with an embed. """Create a webhook with an embed.
Args: Args:
webhook_url (str): The webhook URL. webhook_url (str): The webhook URL.
entry (Entry): The entry to send to Discord. entry (Entry): The entry to send to Discord.
reader (Reader): The Reader instance to use for getting embed data.
Returns: Returns:
DiscordWebhook: The webhook with the embed. DiscordWebhook: The webhook with the embed.
@ -249,7 +254,7 @@ def create_embed_webhook(webhook_url: str, entry: Entry) -> DiscordWebhook: # n
feed: Feed = entry.feed feed: Feed = entry.feed
# Get the embed data from the database. # Get the embed data from the database.
custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry) custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry, reader=reader)
discord_embed: DiscordEmbed = DiscordEmbed() discord_embed: DiscordEmbed = DiscordEmbed()
@ -337,53 +342,53 @@ def set_entry_as_read(reader: Reader, entry: Entry) -> None:
logger.exception("Error setting entry to read: %s", entry.id) logger.exception("Error setting entry to read: %s", entry.id)
def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: # noqa: C901, PLR0912 def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: # noqa: C901, PLR0912
"""Send entries to Discord. """Send entries to Discord.
If response was not ok, we will log the error and mark the entry as unread, so it will be sent again next time. If response was not ok, we will log the error and mark the entry as unread, so it will be sent again next time.
Args: Args:
custom_reader: If we should use a custom reader instead of the default one. reader: If we should use a custom reader instead of the default one.
feed: The feed to send to Discord. feed: The feed to send to Discord.
do_once: If we should only send one entry. This is used in the test. do_once: If we should only send one entry. This is used in the test.
""" """
logger.info("Starting to send entries to Discord.") logger.info("Starting to send entries to Discord.")
# Get the default reader if we didn't get a custom one. # Get the default reader if we didn't get a custom one.
reader: Reader = get_reader() if custom_reader is None else custom_reader effective_reader: Reader = get_reader() if reader is None else reader
# Check for new entries for every feed. # Check for new entries for every feed.
reader.update_feeds( effective_reader.update_feeds(
scheduled=True, scheduled=True,
workers=os.cpu_count() or 1, workers=os.cpu_count() or 1,
) )
# Loop through the unread entries. # Loop through the unread entries.
entries: Iterable[Entry] = reader.get_entries(feed=feed, read=False) entries: Iterable[Entry] = effective_reader.get_entries(feed=feed, read=False)
for entry in entries: for entry in entries:
set_entry_as_read(reader, entry) set_entry_as_read(effective_reader, entry)
if entry.added < datetime.datetime.now(tz=entry.added.tzinfo) - datetime.timedelta(days=1): if entry.added < datetime.datetime.now(tz=entry.added.tzinfo) - datetime.timedelta(days=1):
logger.info("Entry is older than 24 hours: %s from %s", entry.id, entry.feed.url) logger.info("Entry is older than 24 hours: %s from %s", entry.id, entry.feed.url)
continue continue
webhook_url: str = get_webhook_url(reader, entry) webhook_url: str = get_webhook_url(effective_reader, entry)
if not webhook_url: if not webhook_url:
logger.info("No webhook URL found for feed: %s", entry.feed.url) logger.info("No webhook URL found for feed: %s", entry.feed.url)
continue continue
should_send_embed: bool = should_send_embed_check(reader, entry) should_send_embed: bool = should_send_embed_check(effective_reader, entry)
# Youtube feeds only need to send the link # Youtube feeds only need to send the link
if is_youtube_feed(entry.feed.url): if is_youtube_feed(entry.feed.url):
should_send_embed = False should_send_embed = False
if should_send_embed: if should_send_embed:
webhook = create_embed_webhook(webhook_url, entry) webhook = create_embed_webhook(webhook_url, entry, reader=effective_reader)
else: else:
# If the user has set the custom message to an empty string, we will use the default message, otherwise we # If the user has set the custom message to an empty string, we will use the default message, otherwise we
# will use the custom message. # will use the custom message.
if get_custom_message(reader, entry.feed) != "": # noqa: PLC1901 if get_custom_message(effective_reader, entry.feed) != "": # noqa: PLC1901
webhook_message = replace_tags_in_text_message(entry) webhook_message = replace_tags_in_text_message(entry, reader=effective_reader)
else: else:
webhook_message: str = str(default_custom_message) webhook_message: str = str(default_custom_message)
@ -393,12 +398,12 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non
webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True) webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True)
# Check if the entry is blacklisted, and if it is, we will skip it. # Check if the entry is blacklisted, and if it is, we will skip it.
if entry_should_be_skipped(reader, entry): if entry_should_be_skipped(effective_reader, entry):
logger.info("Entry was blacklisted: %s", entry.id) logger.info("Entry was blacklisted: %s", entry.id)
continue continue
# Check if the feed has a whitelist, and if it does, check if the entry is whitelisted. # Check if the feed has a whitelist, and if it does, check if the entry is whitelisted.
if has_white_tags(reader, entry.feed) and not should_be_sent(reader, entry): if has_white_tags(effective_reader, entry.feed) and not should_be_sent(effective_reader, entry):
logger.info("Entry was not whitelisted: %s", entry.id) logger.info("Entry was not whitelisted: %s", entry.id)
continue continue
@ -411,7 +416,7 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non
post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id) post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id)
if post_data: if post_data:
webhook = create_hoyolab_webhook(webhook_url, entry, post_data) webhook = create_hoyolab_webhook(webhook_url, entry, post_data)
execute_webhook(webhook, entry) execute_webhook(webhook, entry, reader=effective_reader)
return return
logger.warning( logger.warning(
"Failed to create Hoyolab webhook for feed %s, falling back to regular processing", "Failed to create Hoyolab webhook for feed %s, falling back to regular processing",
@ -421,7 +426,7 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non
logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url) logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url)
# Send the entry to Discord as it is not blacklisted or feed has a whitelist. # Send the entry to Discord as it is not blacklisted or feed has a whitelist.
execute_webhook(webhook, entry) execute_webhook(webhook, entry, reader=effective_reader)
# If we only want to send one entry, we will break the loop. This is used when testing this function. # If we only want to send one entry, we will break the loop. This is used when testing this function.
if do_once: if do_once:
@ -429,16 +434,15 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non
break break
def execute_webhook(webhook: DiscordWebhook, entry: Entry) -> None: def execute_webhook(webhook: DiscordWebhook, entry: Entry, reader: Reader) -> None:
"""Execute the webhook. """Execute the webhook.
Args: Args:
webhook (DiscordWebhook): The webhook to execute. webhook (DiscordWebhook): The webhook to execute.
entry (Entry): The entry to send to Discord. entry (Entry): The entry to send to Discord.
reader (Reader): The Reader instance to use for checking feed status.
""" """
reader: Reader = get_reader()
# If the feed has been paused or deleted, we will not send the entry to Discord. # If the feed has been paused or deleted, we will not send the entry to Discord.
entry_feed: Feed = entry.feed entry_feed: Feed = entry.feed
if entry_feed.updates_enabled is False: if entry_feed.updates_enabled is False:

View file

@ -11,7 +11,7 @@ if TYPE_CHECKING:
from reader import Reader from reader import Reader
def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool: def feed_has_blacklist_tags(reader: Reader, feed: Feed) -> bool:
"""Return True if the feed has blacklist tags. """Return True if the feed has blacklist tags.
The following tags are checked: The following tags are checked:
@ -25,21 +25,21 @@ def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool:
- regex_blacklist_title - regex_blacklist_title
Args: Args:
custom_reader: The reader. reader: The reader.
feed: The feed to check. feed: The feed to check.
Returns: Returns:
bool: If the feed has any of the tags. bool: If the feed has any of the tags.
""" """
blacklist_author: str = str(custom_reader.get_tag(feed, "blacklist_author", "")).strip() blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", "")).strip()
blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", "")).strip() blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", "")).strip()
blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", "")).strip() blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", "")).strip()
blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", "")).strip() blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", "")).strip()
regex_blacklist_author: str = str(custom_reader.get_tag(feed, "regex_blacklist_author", "")).strip() regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", "")).strip()
regex_blacklist_content: str = str(custom_reader.get_tag(feed, "regex_blacklist_content", "")).strip() regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", "")).strip()
regex_blacklist_summary: str = str(custom_reader.get_tag(feed, "regex_blacklist_summary", "")).strip() regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", "")).strip()
regex_blacklist_title: str = str(custom_reader.get_tag(feed, "regex_blacklist_title", "")).strip() regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", "")).strip()
return bool( return bool(
blacklist_title blacklist_title
@ -53,11 +53,11 @@ def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool:
) )
def entry_should_be_skipped(custom_reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 def entry_should_be_skipped(reader: Reader, entry: Entry) -> bool: # noqa: PLR0911
"""Return True if the entry is in the blacklist. """Return True if the entry is in the blacklist.
Args: Args:
custom_reader: The reader. reader: The reader.
entry: The entry to check. entry: The entry to check.
Returns: Returns:
@ -65,15 +65,15 @@ def entry_should_be_skipped(custom_reader: Reader, entry: Entry) -> bool: # noq
""" """
feed = entry.feed feed = entry.feed
blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", "")).strip() blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", "")).strip()
blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", "")).strip() blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", "")).strip()
blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", "")).strip() blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", "")).strip()
blacklist_author: str = str(custom_reader.get_tag(feed, "blacklist_author", "")).strip() blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", "")).strip()
regex_blacklist_title: str = str(custom_reader.get_tag(feed, "regex_blacklist_title", "")).strip() regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", "")).strip()
regex_blacklist_summary: str = str(custom_reader.get_tag(feed, "regex_blacklist_summary", "")).strip() regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", "")).strip()
regex_blacklist_content: str = str(custom_reader.get_tag(feed, "regex_blacklist_content", "")).strip() regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", "")).strip()
regex_blacklist_author: str = str(custom_reader.get_tag(feed, "regex_blacklist_author", "")).strip() regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", "")).strip()
# TODO(TheLovinator): Also add support for entry_text and more. # TODO(TheLovinator): Also add support for entry_text and more.
# Check regular blacklist # Check regular blacklist

View file

@ -11,7 +11,7 @@ if TYPE_CHECKING:
from reader import Reader from reader import Reader
def has_white_tags(custom_reader: Reader, feed: Feed) -> bool: def has_white_tags(reader: Reader, feed: Feed) -> bool:
"""Return True if the feed has whitelist tags. """Return True if the feed has whitelist tags.
The following tags are checked: The following tags are checked:
@ -25,21 +25,21 @@ def has_white_tags(custom_reader: Reader, feed: Feed) -> bool:
- whitelist_title - whitelist_title
Args: Args:
custom_reader: The reader. reader: The reader.
feed: The feed to check. feed: The feed to check.
Returns: Returns:
bool: If the feed has any of the tags. bool: If the feed has any of the tags.
""" """
whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")).strip() whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", "")).strip()
whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")).strip() whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", "")).strip()
whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")).strip() whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", "")).strip()
whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", "")).strip() whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", "")).strip()
regex_whitelist_title: str = str(custom_reader.get_tag(feed, "regex_whitelist_title", "")).strip() regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", "")).strip()
regex_whitelist_summary: str = str(custom_reader.get_tag(feed, "regex_whitelist_summary", "")).strip() regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", "")).strip()
regex_whitelist_content: str = str(custom_reader.get_tag(feed, "regex_whitelist_content", "")).strip() regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", "")).strip()
regex_whitelist_author: str = str(custom_reader.get_tag(feed, "regex_whitelist_author", "")).strip() regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", "")).strip()
return bool( return bool(
whitelist_title whitelist_title
@ -53,11 +53,11 @@ def has_white_tags(custom_reader: Reader, feed: Feed) -> bool:
) )
def should_be_sent(custom_reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 def should_be_sent(reader: Reader, entry: Entry) -> bool: # noqa: PLR0911
"""Return True if the entry is in the whitelist. """Return True if the entry is in the whitelist.
Args: Args:
custom_reader: The reader. reader: The reader.
entry: The entry to check. entry: The entry to check.
Returns: Returns:
@ -65,16 +65,16 @@ def should_be_sent(custom_reader: Reader, entry: Entry) -> bool: # noqa: PLR091
""" """
feed: Feed = entry.feed feed: Feed = entry.feed
# Regular whitelist tags # Regular whitelist tags
whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")).strip() whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", "")).strip()
whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")).strip() whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", "")).strip()
whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")).strip() whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", "")).strip()
whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", "")).strip() whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", "")).strip()
# Regex whitelist tags # Regex whitelist tags
regex_whitelist_title: str = str(custom_reader.get_tag(feed, "regex_whitelist_title", "")).strip() regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", "")).strip()
regex_whitelist_summary: str = str(custom_reader.get_tag(feed, "regex_whitelist_summary", "")).strip() regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", "")).strip()
regex_whitelist_content: str = str(custom_reader.get_tag(feed, "regex_whitelist_content", "")).strip() regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", "")).strip()
regex_whitelist_author: str = str(custom_reader.get_tag(feed, "regex_whitelist_author", "")).strip() regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", "")).strip()
# Check regular whitelist # Check regular whitelist
if entry.title and whitelist_title and is_word_in_text(whitelist_title, entry.title): if entry.title and whitelist_title and is_word_in_text(whitelist_title, entry.title):

View file

@ -179,8 +179,6 @@ templates: Jinja2Templates = Jinja2Templates(directory="discord_rss_bot/template
# Add the filters to the Jinja2 environment so they can be used in html templates. # Add the filters to the Jinja2 environment so they can be used in html templates.
templates.env.filters["encode_url"] = lambda url: urllib.parse.quote(url) if url else "" templates.env.filters["encode_url"] = lambda url: urllib.parse.quote(url) if url else ""
templates.env.filters["entry_is_whitelisted"] = entry_is_whitelisted
templates.env.filters["entry_is_blacklisted"] = entry_is_blacklisted
templates.env.filters["discord_markdown"] = markdownify templates.env.filters["discord_markdown"] = markdownify
templates.env.filters["relative_time"] = relative_time templates.env.filters["relative_time"] = relative_time
templates.env.globals["get_backup_path"] = get_backup_path templates.env.globals["get_backup_path"] = get_backup_path
@ -938,7 +936,7 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
except EntryNotFoundError as e: except EntryNotFoundError as e:
current_entries = list(reader.get_entries(feed=clean_feed_url)) current_entries = list(reader.get_entries(feed=clean_feed_url))
msg: str = f"{e}\n\n{[entry.id for entry in current_entries]}" msg: str = f"{e}\n\n{[entry.id for entry in current_entries]}"
html: str = create_html_for_feed(current_entries, clean_feed_url) html: str = create_html_for_feed(reader=reader, entries=current_entries, current_feed_url=clean_feed_url)
# Get feed and global intervals for error case too # Get feed and global intervals for error case too
feed_interval: int | None = None feed_interval: int | None = None
@ -988,7 +986,7 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
last_entry = entries[-1] last_entry = entries[-1]
# Create the html for the entries. # Create the html for the entries.
html: str = create_html_for_feed(entries, clean_feed_url) html: str = create_html_for_feed(reader=reader, entries=entries, current_feed_url=clean_feed_url)
should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed", True)) should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed", True))
@ -1024,10 +1022,15 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
return templates.TemplateResponse(request=request, name="feed.html", context=context) return templates.TemplateResponse(request=request, name="feed.html", context=context)
def create_html_for_feed(entries: Iterable[Entry], current_feed_url: str = "") -> str: # noqa: C901, PLR0914 def create_html_for_feed( # noqa: C901, PLR0914
reader: Reader,
entries: Iterable[Entry],
current_feed_url: str = "",
) -> str:
"""Create HTML for the search results. """Create HTML for the search results.
Args: Args:
reader: The Reader instance to use.
entries: The entries to create HTML for. entries: The entries to create HTML for.
current_feed_url: The feed URL currently being viewed in /feed. current_feed_url: The feed URL currently being viewed in /feed.
@ -1045,17 +1048,19 @@ def create_html_for_feed(entries: Iterable[Entry], current_feed_url: str = "") -
first_image = get_first_image(summary, content) first_image = get_first_image(summary, content)
text: str = replace_tags_in_text_message(entry) or "<div class='text-muted'>No content available.</div>" text: str = replace_tags_in_text_message(entry, reader=reader) or (
"<div class='text-muted'>No content available.</div>"
)
published = "" published = ""
if entry.published: if entry.published:
published: str = entry.published.strftime("%Y-%m-%d %H:%M:%S") published: str = entry.published.strftime("%Y-%m-%d %H:%M:%S")
blacklisted: str = "" blacklisted: str = ""
if entry_is_blacklisted(entry): if entry_is_blacklisted(entry, reader=reader):
blacklisted = "<span class='badge bg-danger'>Blacklisted</span>" blacklisted = "<span class='badge bg-danger'>Blacklisted</span>"
whitelisted: str = "" whitelisted: str = ""
if entry_is_whitelisted(entry): if entry_is_whitelisted(entry, reader=reader):
whitelisted = "<span class='badge bg-success'>Whitelisted</span>" whitelisted = "<span class='badge bg-success'>Whitelisted</span>"
source_feed_url: str = getattr(entry, "original_feed_url", None) or entry.feed.url source_feed_url: str = getattr(entry, "original_feed_url", None) or entry.feed.url
@ -1414,7 +1419,7 @@ async def search(
HTMLResponse: The search page. HTMLResponse: The search page.
""" """
reader.update_search() reader.update_search()
context = create_search_context(query) context = create_search_context(query, reader=reader)
return templates.TemplateResponse(request=request, name="search.html", context={"request": request, **context}) return templates.TemplateResponse(request=request, name="search.html", context={"request": request, **context})
@ -1437,7 +1442,7 @@ async def post_entry(
if entry is None: if entry is None:
return HTMLResponse(status_code=404, content=f"Entry '{entry_id}' not found.") return HTMLResponse(status_code=404, content=f"Entry '{entry_id}' not found.")
if result := send_entry_to_discord(entry=entry): if result := send_entry_to_discord(entry=entry, reader=reader):
return result return result
# Redirect to the feed page. # Redirect to the feed page.
@ -1601,7 +1606,7 @@ async def get_webhook_entries( # noqa: C901, PLR0914
last_entry = paginated_entries[-1] last_entry = paginated_entries[-1]
# Create the html for the entries # Create the html for the entries
html: str = create_html_for_feed(paginated_entries) html: str = create_html_for_feed(reader=reader, entries=paginated_entries)
# Check if there are more entries available # Check if there are more entries available
total_entries: int = len(all_entries) total_entries: int = len(all_entries)

View file

@ -3,8 +3,6 @@ from __future__ import annotations
import urllib.parse import urllib.parse
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from discord_rss_bot.settings import get_reader
if TYPE_CHECKING: if TYPE_CHECKING:
from collections.abc import Iterable from collections.abc import Iterable
@ -14,19 +12,16 @@ if TYPE_CHECKING:
from reader import Reader from reader import Reader
def create_search_context(query: str, custom_reader: Reader | None = None) -> dict: def create_search_context(query: str, reader: Reader) -> dict:
"""Build context for search.html template. """Build context for search.html template.
If custom_reader is None, use the default reader from settings.
Args: Args:
query (str): The search query. query (str): The search query.
custom_reader (Reader | None): Optional custom Reader instance. reader (Reader): Custom Reader instance.
Returns: Returns:
dict: Context dictionary for rendering the search results. dict: Context dictionary for rendering the search results.
""" """
reader: Reader = get_reader() if custom_reader is None else custom_reader
search_results: Iterable[EntrySearchResult] = reader.search_entries(query) search_results: Iterable[EntrySearchResult] = reader.search_entries(query)
results: list[dict] = [] results: list[dict] = []

View file

@ -38,7 +38,7 @@ def test_has_black_tags() -> None:
# Test feed without any blacklist tags # Test feed without any blacklist tags
assert_msg: str = "Feed should not have any blacklist tags" assert_msg: str = "Feed should not have any blacklist tags"
assert feed_has_blacklist_tags(custom_reader=get_reader(), feed=feed) is False, assert_msg assert feed_has_blacklist_tags(reader=get_reader(), feed=feed) is False, assert_msg
check_if_has_tag(reader, feed, "blacklist_title") check_if_has_tag(reader, feed, "blacklist_title")
check_if_has_tag(reader, feed, "blacklist_summary") check_if_has_tag(reader, feed, "blacklist_summary")
@ -58,11 +58,11 @@ def test_has_black_tags() -> None:
def check_if_has_tag(reader: Reader, feed: Feed, blacklist_name: str) -> None: def check_if_has_tag(reader: Reader, feed: Feed, blacklist_name: str) -> None:
reader.set_tag(feed, blacklist_name, "a") # pyright: ignore[reportArgumentType] reader.set_tag(feed, blacklist_name, "a") # pyright: ignore[reportArgumentType]
assert_msg: str = f"Feed should have blacklist tags: {blacklist_name}" assert_msg: str = f"Feed should have blacklist tags: {blacklist_name}"
assert feed_has_blacklist_tags(custom_reader=reader, feed=feed) is True, assert_msg assert feed_has_blacklist_tags(reader=reader, feed=feed) is True, assert_msg
asset_msg: str = f"Feed should not have any blacklist tags: {blacklist_name}" asset_msg: str = f"Feed should not have any blacklist tags: {blacklist_name}"
reader.delete_tag(feed, blacklist_name) reader.delete_tag(feed, blacklist_name)
assert feed_has_blacklist_tags(custom_reader=reader, feed=feed) is False, asset_msg assert feed_has_blacklist_tags(reader=reader, feed=feed) is False, asset_msg
def test_should_be_skipped() -> None: def test_should_be_skipped() -> None:

View file

@ -45,39 +45,39 @@ def test_entry_is_whitelisted() -> None:
Path.mkdir(Path(temp_dir), exist_ok=True) Path.mkdir(Path(temp_dir), exist_ok=True)
custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite") custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite")
custom_reader: Reader = get_reader(custom_location=str(custom_loc)) reader: Reader = get_reader(custom_location=str(custom_loc))
# Add a feed to the database. # Add a feed to the database.
custom_reader.add_feed("https://lovinator.space/rss_test.xml") reader.add_feed("https://lovinator.space/rss_test.xml")
custom_reader.update_feed("https://lovinator.space/rss_test.xml") reader.update_feed("https://lovinator.space/rss_test.xml")
# whitelist_title # whitelist_title
custom_reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
for entry in custom_reader.get_entries(): for entry in reader.get_entries():
if entry_is_whitelisted(entry) is True: if entry_is_whitelisted(entry, reader=reader) is True:
assert entry.title == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.title}" assert entry.title == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.title}"
break break
custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_title") reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_title")
# whitelist_summary # whitelist_summary
custom_reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
for entry in custom_reader.get_entries(): for entry in reader.get_entries():
if entry_is_whitelisted(entry) is True: if entry_is_whitelisted(entry, reader=reader) is True:
assert entry.summary == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.summary}" assert entry.summary == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.summary}"
break break
custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_summary") reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_summary")
# whitelist_content # whitelist_content
custom_reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
for entry in custom_reader.get_entries(): for entry in reader.get_entries():
if entry_is_whitelisted(entry) is True: if entry_is_whitelisted(entry, reader=reader) is True:
assert_msg = f"Expected: <p>ffdnfdnfdnfdnfdndfn</p>, Got: {entry.content[0].value}" assert_msg = f"Expected: <p>ffdnfdnfdnfdnfdndfn</p>, Got: {entry.content[0].value}"
assert entry.content[0].value == "<p>ffdnfdnfdnfdnfdndfn</p>", assert_msg assert entry.content[0].value == "<p>ffdnfdnfdnfdnfdndfn</p>", assert_msg
break break
custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_content") reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_content")
# Close the reader, so we can delete the directory. # Close the reader, so we can delete the directory.
custom_reader.close() reader.close()
def test_entry_is_blacklisted() -> None: def test_entry_is_blacklisted() -> None:
@ -87,36 +87,36 @@ def test_entry_is_blacklisted() -> None:
Path.mkdir(Path(temp_dir), exist_ok=True) Path.mkdir(Path(temp_dir), exist_ok=True)
custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite") custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite")
custom_reader: Reader = get_reader(custom_location=str(custom_loc)) reader: Reader = get_reader(custom_location=str(custom_loc))
# Add a feed to the database. # Add a feed to the database.
custom_reader.add_feed("https://lovinator.space/rss_test.xml") reader.add_feed("https://lovinator.space/rss_test.xml")
custom_reader.update_feed("https://lovinator.space/rss_test.xml") reader.update_feed("https://lovinator.space/rss_test.xml")
# blacklist_title # blacklist_title
custom_reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
for entry in custom_reader.get_entries(): for entry in reader.get_entries():
if entry_is_blacklisted(entry) is True: if entry_is_blacklisted(entry, reader=reader) is True:
assert entry.title == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.title}" assert entry.title == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.title}"
break break
custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_title") reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_title")
# blacklist_summary # blacklist_summary
custom_reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
for entry in custom_reader.get_entries(): for entry in reader.get_entries():
if entry_is_blacklisted(entry) is True: if entry_is_blacklisted(entry, reader=reader) is True:
assert entry.summary == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.summary}" assert entry.summary == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.summary}"
break break
custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_summary") reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_summary")
# blacklist_content # blacklist_content
custom_reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
for entry in custom_reader.get_entries(): for entry in reader.get_entries():
if entry_is_blacklisted(entry) is True: if entry_is_blacklisted(entry, reader=reader) is True:
assert_msg = f"Expected: <p>ffdnfdnfdnfdnfdndfn</p>, Got: {entry.content[0].value}" assert_msg = f"Expected: <p>ffdnfdnfdnfdnfdndfn</p>, Got: {entry.content[0].value}"
assert entry.content[0].value == "<p>ffdnfdnfdnfdnfdndfn</p>", assert_msg assert entry.content[0].value == "<p>ffdnfdnfdnfdnfdndfn</p>", assert_msg
break break
custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_content") reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_content")
# Close the reader, so we can delete the directory. # Close the reader, so we can delete the directory.
custom_reader.close() reader.close()

View file

@ -102,12 +102,10 @@ def test_format_entry_html_for_discord_does_not_preserve_invalid_timestamp_style
@patch("discord_rss_bot.custom_message.get_custom_message") @patch("discord_rss_bot.custom_message.get_custom_message")
@patch("discord_rss_bot.custom_message.get_reader")
def test_replace_tags_in_text_message_preserves_timestamp_tags( def test_replace_tags_in_text_message_preserves_timestamp_tags(
mock_get_reader: MagicMock,
mock_get_custom_message: MagicMock, mock_get_custom_message: MagicMock,
) -> None: ) -> None:
mock_get_reader.return_value = MagicMock() mock_reader = MagicMock()
mock_get_custom_message.return_value = "{{entry_summary}}" mock_get_custom_message.return_value = "{{entry_summary}}"
summary_parts: list[str] = [ summary_parts: list[str] = [
f"<p>Format {index}: ({timestamp_tag.replace('<', '&lt;').replace('>', '&gt;')})</p>" f"<p>Format {index}: ({timestamp_tag.replace('<', '&lt;').replace('>', '&gt;')})</p>"
@ -116,19 +114,17 @@ def test_replace_tags_in_text_message_preserves_timestamp_tags(
entry_ns: SimpleNamespace = make_entry("".join(summary_parts)) entry_ns: SimpleNamespace = make_entry("".join(summary_parts))
entry: Entry = typing.cast("Entry", entry_ns) entry: Entry = typing.cast("Entry", entry_ns)
rendered: str = replace_tags_in_text_message(entry) rendered: str = replace_tags_in_text_message(entry, reader=mock_reader)
for timestamp_tag in TIMESTAMP_FORMATS: for timestamp_tag in TIMESTAMP_FORMATS:
assert timestamp_tag in rendered assert timestamp_tag in rendered
@patch("discord_rss_bot.custom_message.get_embed") @patch("discord_rss_bot.custom_message.get_embed")
@patch("discord_rss_bot.custom_message.get_reader")
def test_replace_tags_in_embed_preserves_timestamp_tags( def test_replace_tags_in_embed_preserves_timestamp_tags(
mock_get_reader: MagicMock,
mock_get_embed: MagicMock, mock_get_embed: MagicMock,
) -> None: ) -> None:
mock_get_reader.return_value = MagicMock() mock_reader = MagicMock()
mock_get_embed.return_value = CustomEmbed(description="{{entry_summary}}") mock_get_embed.return_value = CustomEmbed(description="{{entry_summary}}")
summary_parts: list[str] = [ summary_parts: list[str] = [
f"<p>Format {index}: ({timestamp_tag.replace('<', '&lt;').replace('>', '&gt;')})</p>" f"<p>Format {index}: ({timestamp_tag.replace('<', '&lt;').replace('>', '&gt;')})</p>"
@ -138,7 +134,7 @@ def test_replace_tags_in_embed_preserves_timestamp_tags(
entry: Entry = typing.cast("Entry", entry_ns) entry: Entry = typing.cast("Entry", entry_ns)
embed: CustomEmbed = replace_tags_in_embed(entry_ns.feed, entry) embed: CustomEmbed = replace_tags_in_embed(entry_ns.feed, entry, reader=mock_reader)
for timestamp_tag in TIMESTAMP_FORMATS: for timestamp_tag in TIMESTAMP_FORMATS:
assert timestamp_tag in embed.description assert timestamp_tag in embed.description

View file

@ -55,7 +55,7 @@ def test_send_to_discord() -> None:
assert reader.get_tag(feed, "webhook") == webhook_url, f"The webhook URL should be '{webhook_url}'." assert reader.get_tag(feed, "webhook") == webhook_url, f"The webhook URL should be '{webhook_url}'."
# Send the feed to Discord. # Send the feed to Discord.
send_to_discord(custom_reader=reader, feed=feed, do_once=True) send_to_discord(reader=reader, feed=feed, do_once=True)
# Close the reader, so we can delete the directory. # Close the reader, so we can delete the directory.
reader.close() reader.close()
@ -200,7 +200,7 @@ def test_send_entry_to_discord_youtube_feed(
assert webhook_call_kwargs["url"] == "https://discord.com/api/webhooks/123/abc" assert webhook_call_kwargs["url"] == "https://discord.com/api/webhooks/123/abc"
# Verify execute_webhook was called # Verify execute_webhook was called
mock_execute_webhook.assert_called_once_with(mock_webhook, mock_entry) mock_execute_webhook.assert_called_once_with(mock_webhook, mock_entry, reader=mock_reader)
def test_extract_domain_youtube_feed() -> None: def test_extract_domain_youtube_feed() -> None:

View file

@ -718,11 +718,24 @@ def test_create_html_marks_entries_from_another_feed(monkeypatch: pytest.MonkeyP
original_feed_url="https://example.com/feed-b.xml", original_feed_url="https://example.com/feed-b.xml",
) )
monkeypatch.setattr("discord_rss_bot.main.replace_tags_in_text_message", lambda _entry: "Rendered content") monkeypatch.setattr(
monkeypatch.setattr("discord_rss_bot.main.entry_is_blacklisted", lambda _entry: False) "discord_rss_bot.main.replace_tags_in_text_message",
monkeypatch.setattr("discord_rss_bot.main.entry_is_whitelisted", lambda _entry: False) lambda _entry, **_kwargs: "Rendered content",
)
monkeypatch.setattr("discord_rss_bot.main.entry_is_blacklisted", lambda _entry, **_kwargs: False)
monkeypatch.setattr("discord_rss_bot.main.entry_is_whitelisted", lambda _entry, **_kwargs: False)
html = create_html_for_feed(cast("list[Entry]", [same_feed_entry, other_feed_entry]), selected_feed_url) same_feed_entry_typed: Entry = cast("Entry", same_feed_entry)
other_feed_entry_typed: Entry = cast("Entry", other_feed_entry)
html: str = create_html_for_feed(
reader=MagicMock(),
current_feed_url=selected_feed_url,
entries=[
same_feed_entry_typed,
other_feed_entry_typed,
],
)
assert "From another feed: https://example.com/feed-b.xml" in html assert "From another feed: https://example.com/feed-b.xml" in html
assert "From another feed: https://example.com/feed-a.xml" not in html assert "From another feed: https://example.com/feed-a.xml" not in html

View file

@ -46,7 +46,7 @@ def test_create_search_context() -> None:
reader.update_search() reader.update_search()
# Create the search context. # Create the search context.
context: dict = create_search_context("test", custom_reader=reader) context: dict = create_search_context("test", reader=reader)
assert context is not None, f"The context should not be None. Got: {context}" assert context is not None, f"The context should not be None. Got: {context}"
# Close the reader, so we can delete the directory. # Close the reader, so we can delete the directory.

View file

@ -22,12 +22,12 @@ def test_reader() -> None:
Path.mkdir(Path(temp_dir), exist_ok=True) Path.mkdir(Path(temp_dir), exist_ok=True)
custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite") custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite")
custom_reader: Reader = get_reader(custom_location=str(custom_loc)) reader: Reader = get_reader(custom_location=str(custom_loc))
assert_msg = f"The custom reader should be an instance of Reader. But it was '{type(custom_reader)}'." assert_msg = f"The custom reader should be an instance of Reader. But it was '{type(reader)}'."
assert isinstance(custom_reader, Reader), assert_msg assert isinstance(reader, Reader), assert_msg
# Close the reader, so we can delete the directory. # Close the reader, so we can delete the directory.
custom_reader.close() reader.close()
def test_data_dir() -> None: def test_data_dir() -> None:
@ -49,16 +49,16 @@ def test_get_webhook_for_entry() -> None:
Path.mkdir(Path(temp_dir), exist_ok=True) Path.mkdir(Path(temp_dir), exist_ok=True)
custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite") custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite")
custom_reader: Reader = get_reader(custom_location=str(custom_loc)) reader: Reader = get_reader(custom_location=str(custom_loc))
# Add a feed to the database. # Add a feed to the database.
custom_reader.add_feed("https://www.reddit.com/r/movies.rss") reader.add_feed("https://www.reddit.com/r/movies.rss")
custom_reader.update_feed("https://www.reddit.com/r/movies.rss") reader.update_feed("https://www.reddit.com/r/movies.rss")
# Add a webhook to the database. # Add a webhook to the database.
custom_reader.set_tag("https://www.reddit.com/r/movies.rss", "webhook", "https://example.com") # pyright: ignore[reportArgumentType] reader.set_tag("https://www.reddit.com/r/movies.rss", "webhook", "https://example.com") # pyright: ignore[reportArgumentType]
our_tag = custom_reader.get_tag("https://www.reddit.com/r/movies.rss", "webhook") # pyright: ignore[reportArgumentType] our_tag = reader.get_tag("https://www.reddit.com/r/movies.rss", "webhook") # pyright: ignore[reportArgumentType]
assert our_tag == "https://example.com", f"The tag should be 'https://example.com'. But it was '{our_tag}'." assert our_tag == "https://example.com", f"The tag should be 'https://example.com'. But it was '{our_tag}'."
# Close the reader, so we can delete the directory. # Close the reader, so we can delete the directory.
custom_reader.close() reader.close()

View file

@ -37,7 +37,7 @@ def test_has_white_tags() -> None:
reader.update_feeds() reader.update_feeds()
# Test feed without any whitelist tags # Test feed without any whitelist tags
assert has_white_tags(custom_reader=get_reader(), feed=feed) is False, "Feed should not have any whitelist tags" assert has_white_tags(reader=get_reader(), feed=feed) is False, "Feed should not have any whitelist tags"
check_if_has_tag(reader, feed, "whitelist_title") check_if_has_tag(reader, feed, "whitelist_title")
check_if_has_tag(reader, feed, "whitelist_summary") check_if_has_tag(reader, feed, "whitelist_summary")
@ -56,9 +56,9 @@ def test_has_white_tags() -> None:
def check_if_has_tag(reader: Reader, feed: Feed, whitelist_name: str) -> None: def check_if_has_tag(reader: Reader, feed: Feed, whitelist_name: str) -> None:
reader.set_tag(feed, whitelist_name, "a") # pyright: ignore[reportArgumentType] reader.set_tag(feed, whitelist_name, "a") # pyright: ignore[reportArgumentType]
assert has_white_tags(custom_reader=reader, feed=feed) is True, "Feed should have whitelist tags" assert has_white_tags(reader=reader, feed=feed) is True, "Feed should have whitelist tags"
reader.delete_tag(feed, whitelist_name) reader.delete_tag(feed, whitelist_name)
assert has_white_tags(custom_reader=reader, feed=feed) is False, "Feed should not have any whitelist tags" assert has_white_tags(reader=reader, feed=feed) is False, "Feed should not have any whitelist tags"
def test_should_be_sent() -> None: def test_should_be_sent() -> None: