diff --git a/discord_rss_bot/blacklist.py b/discord_rss_bot/blacklist.py new file mode 100644 index 0000000..b48e09d --- /dev/null +++ b/discord_rss_bot/blacklist.py @@ -0,0 +1,129 @@ +import re + +from reader import Entry, Feed, Reader, TagNotFoundError + + +def is_word_in_text(word: str, text: str) -> bool: + """ + Args: + word: The word to search for. + text: The text to search in. + + Returns: + bool: If the word is in the text. + """ + pattern = rf"(^|[^\w]){word}([^\w]|$)" + pattern = re.compile(pattern, re.IGNORECASE) + matches = re.search(pattern, text) + return bool(matches) + + +def has_black_tags(custom_reader: Reader, feed: Feed) -> bool: + """ + Return True if the feed has any of the following tags: + - blacklist_title + - blacklist_summary + - blacklist_content + + Args: + custom_reader: The reader. + feed: The feed to check. + + Returns: + bool: If the feed has any of the tags. + """ + blacklist_title = get_blacklist_title(custom_reader, feed) + blacklist_summary = get_blacklist_summary(custom_reader, feed) + blacklist_content = get_blacklist_content(custom_reader, feed) + + if blacklist_title or blacklist_summary or blacklist_content: + return True + + +def if_in_blacklist(custom_reader: Reader, entry: Entry) -> bool: + """ + Return True if the entry is in the blacklist. + + Args: + custom_reader: The reader. + entry: The entry to check. + + Returns: + bool: If the entry is in the blacklist. + """ + feed: Feed = entry.feed + blacklist_title = get_blacklist_title(custom_reader, feed) + blacklist_summary = get_blacklist_summary(custom_reader, feed) + blacklist_content = get_blacklist_content(custom_reader, feed) + # TODO: Fix content + # TODO: Check author + + if blacklist_title: + if is_word_in_text(blacklist_title, entry.title): + return True + + if blacklist_summary: + if is_word_in_text(blacklist_summary, entry.summary): + return True + + # if blacklist_content.lower() in entry.content.lower(): + + +def get_blacklist_content(custom_reader, feed) -> str: + """ + Get the blacklist_content tag from the feed. + + Args: + custom_reader: The reader. + feed: The feed to get the tag from. + + Returns: + str: The blacklist_content tag. + """ + try: + blacklist_content = custom_reader.get_tag(feed, "blacklist_content") + except TagNotFoundError: + blacklist_content = "" + except ValueError: + blacklist_content = "" + return blacklist_content + + +def get_blacklist_summary(custom_reader, feed) -> str: + """ + Get the blacklist_summary tag from the feed. + + Args: + custom_reader: The reader. + feed: The feed to get the tag from. + + Returns: + str: The blacklist_summary tag. + """ + try: + blacklist_summary = custom_reader.get_tag(feed, "blacklist_summary") + except TagNotFoundError: + blacklist_summary = "" + except ValueError: + blacklist_summary = "" + return blacklist_summary + + +def get_blacklist_title(custom_reader, feed) -> str: + """ + Get the blacklist_title tag from the feed. + + Args: + custom_reader: The reader. + feed: The feed to get the tag from. + + Returns: + str: The blacklist_title tag. + """ + try: + blacklist_title = custom_reader.get_tag(feed, "blacklist_title") + except TagNotFoundError: + blacklist_title = "" + except ValueError: + blacklist_title = "" + return blacklist_title diff --git a/discord_rss_bot/feeds.py b/discord_rss_bot/feeds.py index 6d5cba9..b939d53 100644 --- a/discord_rss_bot/feeds.py +++ b/discord_rss_bot/feeds.py @@ -27,10 +27,12 @@ from discord_webhook import DiscordWebhook from reader import Entry, Reader, TagNotFoundError from requests import Response +from discord_rss_bot.blacklist import if_in_blacklist from discord_rss_bot.settings import get_reader +from discord_rss_bot.whitelist import has_white_tags, if_in_whitelist -def send_to_discord(custom_reader: Reader | None = None, feed=None, do_once=False) -> None: +def send_to_discord(custom_reader: Reader | None = None, do_once=False) -> None: """ Send entries to Discord. @@ -38,7 +40,6 @@ def send_to_discord(custom_reader: Reader | None = None, feed=None, do_once=Fals Args: custom_reader: If we should use a custom reader instead of the default one. - feed: The entry to send. do_once: If we should only send one entry. This is used in the test. Returns: @@ -47,13 +48,11 @@ def send_to_discord(custom_reader: Reader | None = None, feed=None, do_once=Fals # Get the default reader if we didn't get a custom one. reader: Reader = get_reader() if custom_reader is None else custom_reader - # If we should get all entries, or just the entries from a specific feed. - if feed is None: - reader.update_feeds() - entries: Iterable[Entry] = reader.get_entries(read=False) - else: - reader.update_feed(feed) - entries: Iterable[Entry] = reader.get_entries(feed=feed, read=False) + # Update the feeds. + reader.update_feeds() + + # Get all the entries, we will loop through them and check if they should be sent. + entries: Iterable[Entry] = reader.get_entries(read=False) for entry in entries: # Set the webhook to read, so we don't send it again. @@ -69,86 +68,39 @@ def send_to_discord(custom_reader: Reader | None = None, feed=None, do_once=Fals webhook_message: str = f":robot: :mega: {entry.title}\n{entry.link}" webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True) - try: - whitelist_title = reader.get_tag(feed, "whitelist_title") - except TagNotFoundError: - whitelist_title = "" - except ValueError: - whitelist_title = "" + blacklisted = if_in_blacklist(reader, entry) + whitelisted = if_in_whitelist(reader, entry) - try: - whitelist_summary = reader.get_tag(feed, "whitelist_summary") - except TagNotFoundError: - whitelist_summary = "" - except ValueError: - whitelist_summary = "" + if_whitelist_tags = has_white_tags(reader, feed) - try: - whitelist_content = reader.get_tag(feed, "whitelist_content") - except TagNotFoundError: - whitelist_content = "" - except ValueError: - whitelist_content = "" - - try: - blacklist_title = reader.get_tag(feed, "blacklist_title") - except TagNotFoundError: - blacklist_title = "" - except ValueError: - blacklist_title = "" - - try: - blacklist_summary = reader.get_tag(feed, "blacklist_summary") - except TagNotFoundError: - blacklist_summary = "" - except ValueError: - blacklist_summary = "" - - try: - blacklist_content = reader.get_tag(feed, "blacklist_content") - except TagNotFoundError: - blacklist_content = "" - except ValueError: - blacklist_content = "" - - # Check if the entry should be sent. If on the blacklist, mark as read and continue. - if whitelist_title: - if whitelist_title.lower() in entry.title.lower(): - print(f"Whitelisted because of title: {entry.title}") + # Check if the entry has a whitelist + if if_whitelist_tags: + # Only send the entry if it is whitelisted, otherwise, mark it as read and continue. + if whitelisted: response: Response = webhook.execute() - - if not response.ok: - print(f"Error: {response.status_code} {response.reason}") - reader.set_entry_read(entry, False) # type: ignore - if whitelist_summary: - if whitelist_summary.lower() in entry.summary.lower(): - print(f"Whitelisted because of summary: {entry.title}") - response: Response = webhook.execute() - - if not response.ok: - print(f"Error: {response.status_code} {response.reason}") - reader.set_entry_read(entry, False) # type: ignore - # if whitelist_content.lower() in entry.content.lower(): - - if blacklist_title: - if blacklist_title.lower() in entry.title.lower(): - print(f"Blacklisted because of title: {entry.title}") reader.set_entry_read(entry, True) # type: ignore - if blacklist_summary: - if blacklist_summary.lower() in entry.summary.lower(): - print(f"Blacklisted because of summary: {entry.title}") + if not response.ok: + print(f"Error sending to Discord: {response.text}") + reader.set_entry_read(entry, False) # type: ignore + else: reader.set_entry_read(entry, True) # type: ignore - # if blacklist_content.lower() in entry.content.lower(): + continue - else: - response: Response = webhook.execute() + # Check if the entry is blacklisted, if it is, mark it as read and continue. + if blacklisted: + print(f"Blacklisted entry: {entry.title}, not sending to Discord.") + reader.set_entry_read(entry, True) # type: ignore + continue - if not response.ok: - print(f"Error: {response.status_code} {response.reason}") - reader.set_entry_read(entry, False) # type: ignore + # It was not blacklisted, and not forced through whitelist, so we will send it to Discord. + response: Response = webhook.execute() + if not response.ok: + print(f"Error sending to Discord: {response.text}") + reader.set_entry_read(entry, False) # type: ignore # If we only want to send one entry, we will break the loop. This is used when testing this function. if do_once: break + # Update the search index. reader.update_search() diff --git a/discord_rss_bot/main.py b/discord_rss_bot/main.py index b5f800a..ee5d918 100644 --- a/discord_rss_bot/main.py +++ b/discord_rss_bot/main.py @@ -28,7 +28,6 @@ Functions: """ import urllib.parse from datetime import datetime -from enum import Enum from typing import Any, Iterable import uvicorn @@ -371,14 +370,14 @@ def make_context_index(request) -> dict: hooks = [] feed_list = [] + broken_feed = [] feeds: Iterable[Feed] = reader.get_feeds() for feed in feeds: try: hook = reader.get_tag(feed.url, "webhook") feed_list.append({"feed": feed, "webhook": hook}) except TagNotFoundError: - # TODO: Show this error on the page. - # Don't crash if a feed doesn't have a webhook for some reason. + broken_feed.append({"feed": feed, "webhook": None}) continue # Sort feed_list by when the feed was added. @@ -392,6 +391,7 @@ def make_context_index(request) -> dict: "feed_count": feed_count, "entry_count": entry_count, "webhooks": hooks, + "broken_feed": broken_feed, } return context diff --git a/discord_rss_bot/whitelist.py b/discord_rss_bot/whitelist.py new file mode 100644 index 0000000..0b314cf --- /dev/null +++ b/discord_rss_bot/whitelist.py @@ -0,0 +1,129 @@ +import re + +from reader import Entry, Feed, Reader, TagNotFoundError + + +def is_word_in_text(word: str, text: str) -> bool: + """ + Args: + word: The word to search for. + text: The text to search in. + + Returns: + bool: If the word is in the text. + """ + pattern = rf"(^|[^\w]){word}([^\w]|$)" + pattern = re.compile(pattern, re.IGNORECASE) + matches = re.search(pattern, text) + return bool(matches) + + +def has_white_tags(custom_reader: Reader, feed: Feed) -> bool: + """ + Return True if the feed has any of the following tags: + - whitelist_title + - whitelist_summary + - whitelist_content + + Args: + custom_reader: The reader. + feed: The feed to check. + + Returns: + bool: If the feed has any of the tags. + """ + whitelist_title = get_whitelist_title(custom_reader, feed) + whitelist_summary = get_whitelist_summary(custom_reader, feed) + whitelist_content = get_whitelist_content(custom_reader, feed) + + if whitelist_title or whitelist_summary or whitelist_content: + return True + + +def if_in_whitelist(custom_reader: Reader, entry: Entry) -> bool: + """ + Return True if the entry is in the whitelist. + + Args: + custom_reader: The reader. + entry: The entry to check. + + Returns: + bool: If the entry is in the whitelist. + """ + feed: Feed = entry.feed + whitelist_title = get_whitelist_title(custom_reader, feed) + whitelist_summary = get_whitelist_summary(custom_reader, feed) + whitelist_content = get_whitelist_content(custom_reader, feed) + # TODO: Fix content + # TODO: Check author + + if whitelist_title: + if is_word_in_text(whitelist_title, entry.title): + return True + + if whitelist_summary: + if is_word_in_text(whitelist_summary, entry.summary): + return True + + # if whitelist_content.lower() in entry.content.lower(): + + +def get_whitelist_content(custom_reader, feed) -> str: + """ + Get the whitelist_content tag from the feed. + + Args: + custom_reader: The reader. + feed: The feed to get the tag from. + + Returns: + str: The whitelist_content tag. + """ + try: + whitelist_content = custom_reader.get_tag(feed, "whitelist_content") + except TagNotFoundError: + whitelist_content = "" + except ValueError: + whitelist_content = "" + return whitelist_content + + +def get_whitelist_summary(custom_reader, feed) -> str: + """ + Get the whitelist_summary tag from the feed. + + Args: + custom_reader: The reader. + feed: The feed to get the tag from. + + Returns: + str: The whitelist_summary tag. + """ + try: + whitelist_summary = custom_reader.get_tag(feed, "whitelist_summary") + except TagNotFoundError: + whitelist_summary = "" + except ValueError: + whitelist_summary = "" + return whitelist_summary + + +def get_whitelist_title(custom_reader, feed) -> str: + """ + Get the whitelist_title tag from the feed. + + Args: + custom_reader: The reader. + feed: The feed to get the tag from. + + Returns: + str: The whitelist_title tag. + """ + try: + whitelist_title = custom_reader.get_tag(feed, "whitelist_title") + except TagNotFoundError: + whitelist_title = "" + except ValueError: + whitelist_title = "" + return whitelist_title