from __future__ import annotations import datetime import logging import os import pprint import re from typing import TYPE_CHECKING from typing import Any from urllib.parse import ParseResult from urllib.parse import urlparse import tldextract from discord_webhook import DiscordEmbed from discord_webhook import DiscordWebhook from fastapi import HTTPException from markdownify import markdownify from reader import Entry from reader import EntryNotFoundError from reader import Feed from reader import FeedExistsError from reader import FeedNotFoundError from reader import Reader from reader import ReaderError from reader import StorageError from discord_rss_bot.custom_message import CustomEmbed from discord_rss_bot.custom_message import get_custom_message from discord_rss_bot.custom_message import replace_tags_in_embed from discord_rss_bot.custom_message import replace_tags_in_text_message from discord_rss_bot.filter.blacklist import entry_should_be_skipped from discord_rss_bot.filter.whitelist import has_white_tags from discord_rss_bot.filter.whitelist import should_be_sent from discord_rss_bot.hoyolab_api import create_hoyolab_webhook from discord_rss_bot.hoyolab_api import extract_post_id_from_hoyolab_url from discord_rss_bot.hoyolab_api import fetch_hoyolab_post from discord_rss_bot.hoyolab_api import is_c3kay_feed from discord_rss_bot.is_url_valid import is_url_valid from discord_rss_bot.settings import default_custom_message from discord_rss_bot.settings import get_reader if TYPE_CHECKING: from collections.abc import Iterable from requests import Response logger: logging.Logger = logging.getLogger(__name__) def extract_domain(url: str) -> str: # noqa: PLR0911 """Extract the domain name from a URL. Args: url: The URL to extract the domain from. Returns: str: The domain name, formatted for display. """ # Check for empty URL first if not url: return "Other" try: # Special handling for YouTube feeds if "youtube.com/feeds/videos.xml" in url: return "YouTube" # Special handling for Reddit feeds if "reddit.com" in url and ".rss" in url: return "Reddit" # Parse the URL and extract the domain parsed_url: ParseResult = urlparse(url) domain: str = parsed_url.netloc # If we couldn't extract a domain, return "Other" if not domain: return "Other" # Remove www. prefix if present domain = re.sub(r"^www\.", "", domain) # Special handling for common domains domain_mapping: dict[str, str] = {"github.com": "GitHub"} if domain in domain_mapping: return domain_mapping[domain] # Use tldextract to get the domain (SLD) ext = tldextract.extract(url) if ext.domain: return ext.domain.capitalize() return domain.capitalize() except (ValueError, AttributeError, TypeError) as e: logger.warning("Error extracting domain from %s: %s", url, e) return "Other" def send_entry_to_discord(entry: Entry, reader: Reader) -> str | None: # noqa: C901 """Send a single entry to Discord. Args: entry: The entry to send to Discord. reader: The reader to use. Returns: str | None: The error message if there was an error, otherwise None. """ # Get the webhook URL for the entry. webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook", "")) if not webhook_url: return "No webhook URL found." # If https://discord.com/quests/ is in the URL, send a separate message with the URL. send_discord_quest_notification(entry, webhook_url, reader=reader) # Check if this is a c3kay feed if is_c3kay_feed(entry.feed.url): entry_link: str | None = entry.link if entry_link: post_id: str | None = extract_post_id_from_hoyolab_url(entry_link) if post_id: post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id) if post_data: webhook = create_hoyolab_webhook(webhook_url, entry, post_data) execute_webhook(webhook, entry, reader=reader) return None logger.warning( "Failed to create Hoyolab webhook for feed %s, falling back to regular processing", entry.feed.url, ) else: logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url) webhook_message: str = "" # Try to get the custom message for the feed. If the user has none, we will use the default message. # This has to be a string for some reason so don't change it to "not custom_message.get_custom_message()" if get_custom_message(reader, entry.feed) != "": # noqa: PLC1901 webhook_message: str = replace_tags_in_text_message(entry=entry, reader=reader) if not webhook_message: webhook_message = "No message found." # Create the webhook. try: should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed", True)) except StorageError: logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url) should_send_embed = True # YouTube feeds should never use embeds if is_youtube_feed(entry.feed.url): should_send_embed = False if should_send_embed: webhook = create_embed_webhook(webhook_url, entry, reader=reader) else: webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True) execute_webhook(webhook, entry, reader=reader) return None def send_discord_quest_notification(entry: Entry, webhook_url: str, reader: Reader) -> None: """Send a separate message to Discord if the entry is a quest notification.""" quest_regex: re.Pattern[str] = re.compile(r"https://discord\.com/quests/\d+") def send_notification(quest_url: str) -> None: """Helper function to send quest notification to Discord.""" logger.info("Sending quest notification to Discord: %s", quest_url) webhook = DiscordWebhook( url=webhook_url, content=quest_url, rate_limit_retry=True, ) execute_webhook(webhook, entry, reader=reader) # Iterate through the content of the entry for content in entry.content: if content.type == "text" and content.value: match = quest_regex.search(content.value) if match: send_notification(match.group(0)) return elif content.type == "text/html" and content.value: # Convert HTML to text and check for quest links text_value = markdownify( html=content.value, strip=["img", "table", "td", "tr", "tbody", "thead"], escape_misc=False, heading_style="ATX", ) match: re.Match[str] | None = quest_regex.search(text_value) if match: send_notification(match.group(0)) return logger.info("No quest notification found in entry: %s", entry.id) def set_description(custom_embed: CustomEmbed, discord_embed: DiscordEmbed) -> None: """Set the description of the embed. Args: custom_embed (custom_message.CustomEmbed): The custom embed to get the description from. discord_embed (DiscordEmbed): The Discord embed to set the description on. """ # Its actually 2048, but we will use 2000 to be safe. max_description_length: int = 2000 embed_description: str = custom_embed.description embed_description = ( f"{embed_description[:max_description_length]}..." if len(embed_description) > max_description_length else embed_description ) discord_embed.set_description(embed_description) if embed_description else None def set_title(custom_embed: CustomEmbed, discord_embed: DiscordEmbed) -> None: """Set the title of the embed. Args: custom_embed: The custom embed to get the title from. discord_embed: The Discord embed to set the title on. """ # Its actually 256, but we will use 200 to be safe. max_title_length: int = 200 embed_title: str = custom_embed.title embed_title = f"{embed_title[:max_title_length]}..." if len(embed_title) > max_title_length else embed_title discord_embed.set_title(embed_title) if embed_title else None def create_embed_webhook( # noqa: C901 webhook_url: str, entry: Entry, reader: Reader, ) -> DiscordWebhook: """Create a webhook with an embed. Args: webhook_url (str): The webhook URL. entry (Entry): The entry to send to Discord. reader (Reader): The Reader instance to use for getting embed data. Returns: DiscordWebhook: The webhook with the embed. """ webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, rate_limit_retry=True) feed: Feed = entry.feed # Get the embed data from the database. custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry, reader=reader) discord_embed: DiscordEmbed = DiscordEmbed() set_description(custom_embed=custom_embed, discord_embed=discord_embed) set_title(custom_embed=custom_embed, discord_embed=discord_embed) custom_embed_author_url: str | None = custom_embed.author_url if not is_url_valid(custom_embed_author_url): custom_embed_author_url = None custom_embed_color: str | None = custom_embed.color or None if custom_embed_color and custom_embed_color.startswith("#"): custom_embed_color = custom_embed_color[1:] discord_embed.set_color(int(custom_embed_color, 16)) if custom_embed.author_name and not custom_embed_author_url and not custom_embed.author_icon_url: discord_embed.set_author(name=custom_embed.author_name) if custom_embed.author_name and custom_embed_author_url and not custom_embed.author_icon_url: discord_embed.set_author(name=custom_embed.author_name, url=custom_embed_author_url) if custom_embed.author_name and not custom_embed_author_url and custom_embed.author_icon_url: discord_embed.set_author(name=custom_embed.author_name, icon_url=custom_embed.author_icon_url) if custom_embed.author_name and custom_embed_author_url and custom_embed.author_icon_url: discord_embed.set_author( name=custom_embed.author_name, url=custom_embed_author_url, icon_url=custom_embed.author_icon_url, ) if custom_embed.thumbnail_url: discord_embed.set_thumbnail(url=custom_embed.thumbnail_url) if custom_embed.image_url: discord_embed.set_image(url=custom_embed.image_url) if custom_embed.footer_text: discord_embed.set_footer(text=custom_embed.footer_text) if custom_embed.footer_icon_url and custom_embed.footer_text: discord_embed.set_footer(text=custom_embed.footer_text, icon_url=custom_embed.footer_icon_url) if custom_embed.footer_icon_url and not custom_embed.footer_text: discord_embed.set_footer(text="-", icon_url=custom_embed.footer_icon_url) webhook.add_embed(discord_embed) return webhook def get_webhook_url(reader: Reader, entry: Entry) -> str: """Get the webhook URL for the entry. Args: reader: The reader to use. entry: The entry to get the webhook URL for. Returns: str: The webhook URL. """ try: webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook", "")) except StorageError: logger.exception("Storage error getting webhook URL for feed: %s", entry.feed.url) return "" if not webhook_url: logger.error("No webhook URL found for feed: %s", entry.feed.url) return "" return webhook_url def set_entry_as_read(reader: Reader, entry: Entry) -> None: """Set the webhook to read, so we don't send it again. Args: reader: The reader to use. entry: The entry to set as read. """ try: reader.set_entry_read(entry, True) except EntryNotFoundError: logger.exception("Error setting entry to read: %s", entry.id) except StorageError: logger.exception("Error setting entry to read: %s", entry.id) def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: # noqa: C901, PLR0912 """Send entries to Discord. If response was not ok, we will log the error and mark the entry as unread, so it will be sent again next time. Args: reader: If we should use a custom reader instead of the default one. feed: The feed to send to Discord. do_once: If we should only send one entry. This is used in the test. """ logger.info("Starting to send entries to Discord.") # Get the default reader if we didn't get a custom one. effective_reader: Reader = get_reader() if reader is None else reader # Check for new entries for every feed. effective_reader.update_feeds( scheduled=True, workers=os.cpu_count() or 1, ) # Loop through the unread entries. entries: Iterable[Entry] = effective_reader.get_entries(feed=feed, read=False) for entry in entries: set_entry_as_read(effective_reader, entry) if entry.added < datetime.datetime.now(tz=entry.added.tzinfo) - datetime.timedelta(days=1): logger.info("Entry is older than 24 hours: %s from %s", entry.id, entry.feed.url) continue webhook_url: str = get_webhook_url(effective_reader, entry) if not webhook_url: logger.info("No webhook URL found for feed: %s", entry.feed.url) continue should_send_embed: bool = should_send_embed_check(effective_reader, entry) # Youtube feeds only need to send the link if is_youtube_feed(entry.feed.url): should_send_embed = False if should_send_embed: webhook = create_embed_webhook(webhook_url, entry, reader=effective_reader) else: # If the user has set the custom message to an empty string, we will use the default message, otherwise we # will use the custom message. if get_custom_message(effective_reader, entry.feed) != "": # noqa: PLC1901 webhook_message = replace_tags_in_text_message(entry, reader=effective_reader) else: webhook_message: str = str(default_custom_message) webhook_message = truncate_webhook_message(webhook_message) # Create the webhook. webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True) # Check if the entry is blacklisted, and if it is, we will skip it. if entry_should_be_skipped(effective_reader, entry): logger.info("Entry was blacklisted: %s", entry.id) continue # Check if the feed has a whitelist, and if it does, check if the entry is whitelisted. if has_white_tags(effective_reader, entry.feed) and not should_be_sent(effective_reader, entry): logger.info("Entry was not whitelisted: %s", entry.id) continue # Use a custom webhook for Hoyolab feeds. if is_c3kay_feed(entry.feed.url): entry_link: str | None = entry.link if entry_link: post_id: str | None = extract_post_id_from_hoyolab_url(entry_link) if post_id: post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id) if post_data: webhook = create_hoyolab_webhook(webhook_url, entry, post_data) execute_webhook(webhook, entry, reader=effective_reader) return logger.warning( "Failed to create Hoyolab webhook for feed %s, falling back to regular processing", entry.feed.url, ) else: logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url) # Send the entry to Discord as it is not blacklisted or feed has a whitelist. execute_webhook(webhook, entry, reader=effective_reader) # If we only want to send one entry, we will break the loop. This is used when testing this function. if do_once: logger.info("Sent one entry to Discord. Breaking the loop.") break def execute_webhook(webhook: DiscordWebhook, entry: Entry, reader: Reader) -> None: """Execute the webhook. Args: webhook (DiscordWebhook): The webhook to execute. entry (Entry): The entry to send to Discord. reader (Reader): The Reader instance to use for checking feed status. """ # If the feed has been paused or deleted, we will not send the entry to Discord. entry_feed: Feed = entry.feed if entry_feed.updates_enabled is False: logger.warning("Feed is paused, not sending entry to Discord: %s", entry_feed.url) return try: reader.get_feed(entry_feed.url) except FeedNotFoundError: logger.warning("Feed not found in reader, not sending entry to Discord: %s", entry_feed.url) return response: Response = webhook.execute() if response.status_code not in {200, 204}: msg: str = f"Error sending entry to Discord: {response.text}\n{pprint.pformat(webhook.json)}" if entry: msg += f"\n{entry}" logger.error(msg) else: logger.info("Sent entry to Discord: %s", entry.id) def is_youtube_feed(feed_url: str) -> bool: """Check if the feed is a YouTube feed. Args: feed_url: The feed URL to check. Returns: bool: True if the feed is a YouTube feed, False otherwise. """ return "youtube.com/feeds/videos.xml" in feed_url def should_send_embed_check(reader: Reader, entry: Entry) -> bool: """Check if we should send an embed to Discord. Args: reader (Reader): The reader to use. entry (Entry): The entry to check. Returns: bool: True if we should send an embed, False otherwise. """ # YouTube feeds should never use embeds - only links if is_youtube_feed(entry.feed.url): return False try: should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed", True)) except ReaderError: logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url) should_send_embed = True return should_send_embed def truncate_webhook_message(webhook_message: str) -> str: """Truncate the webhook message if it is too long. Args: webhook_message (str): The webhook message to truncate. Returns: str: The truncated webhook message. """ max_content_length: int = 4000 if len(webhook_message) > max_content_length: half_length = (max_content_length - 3) // 2 # Subtracting 3 for the "..." in the middle webhook_message = f"{webhook_message[:half_length]}...{webhook_message[-half_length:]}" return webhook_message def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None: # noqa: C901 """Add a new feed, update it and mark every entry as read. Args: reader: The reader to use. feed_url: The feed to add. webhook_dropdown: The webhook we should send entries to. Raises: HTTPException: If webhook_dropdown does not equal a webhook or default_custom_message not found. """ clean_feed_url: str = feed_url.strip() webhook_url: str = "" if hooks := reader.get_tag((), "webhooks", []): # Get the webhook URL from the dropdown. for hook in hooks: if not isinstance(hook, dict): logger.error("Webhook is not a dict: %s", hook) continue if hook["name"] == webhook_dropdown: # pyright: ignore[reportArgumentType] webhook_url = hook["url"] break if not webhook_url: raise HTTPException(status_code=404, detail="Webhook not found") try: reader.add_feed(clean_feed_url) except FeedExistsError: # Add the webhook to an already added feed if it doesn't have a webhook instead of trying to create a new. if not reader.get_tag(clean_feed_url, "webhook", ""): reader.set_tag(clean_feed_url, "webhook", webhook_url) # pyright: ignore[reportArgumentType] except ReaderError as e: raise HTTPException(status_code=404, detail=f"Error adding feed: {e}") from e try: reader.update_feed(clean_feed_url) except ReaderError as e: raise HTTPException(status_code=404, detail=f"Error updating feed: {e}") from e # Mark every entry as read, so we don't send all the old entries to Discord. entries: Iterable[Entry] = reader.get_entries(feed=clean_feed_url, read=False) for entry in entries: reader.set_entry_read(entry, True) if not default_custom_message: # TODO(TheLovinator): Show this error on the page. raise HTTPException(status_code=404, detail="Default custom message couldn't be found.") # This is the webhook that will be used to send the feed to Discord. reader.set_tag(clean_feed_url, "webhook", webhook_url) # pyright: ignore[reportArgumentType] # This is the default message that will be sent to Discord. reader.set_tag(clean_feed_url, "custom_message", default_custom_message) # pyright: ignore[reportArgumentType] # Update the full-text search index so our new feed is searchable. reader.update_search()