Files
discord-rss-bot/discord_rss_bot/feeds.py
Joakim Hellsén 901d6cb1a6
All checks were successful
Test and build Docker image / docker (push) Successful in 1m33s
Honor 429 Too Many Requests and 503 Service Unavailable responses
2025-05-05 01:19:52 +02:00

519 lines
19 KiB
Python

from __future__ import annotations
import datetime
import logging
import os
import pprint
import re
from typing import TYPE_CHECKING, Any
from urllib.parse import ParseResult, urlparse
import tldextract
from discord_webhook import DiscordEmbed, DiscordWebhook
from fastapi import HTTPException
from reader import Entry, EntryNotFoundError, Feed, FeedExistsError, Reader, ReaderError, StorageError, TagNotFoundError
from discord_rss_bot.custom_message import (
CustomEmbed,
get_custom_message,
replace_tags_in_embed,
replace_tags_in_text_message,
)
from discord_rss_bot.filter.blacklist import entry_should_be_skipped
from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent
from discord_rss_bot.hoyolab_api import (
create_hoyolab_webhook,
extract_post_id_from_hoyolab_url,
fetch_hoyolab_post,
is_c3kay_feed,
)
from discord_rss_bot.is_url_valid import is_url_valid
from discord_rss_bot.missing_tags import add_missing_tags
from discord_rss_bot.settings import default_custom_message, get_reader
if TYPE_CHECKING:
from collections.abc import Iterable
from requests import Response
logger: logging.Logger = logging.getLogger(__name__)
def extract_domain(url: str) -> str: # noqa: PLR0911
"""Extract the domain name from a URL.
Args:
url: The URL to extract the domain from.
Returns:
str: The domain name, formatted for display.
"""
# Check for empty URL first
if not url:
return "Other"
try:
# Special handling for YouTube feeds
if "youtube.com/feeds/videos.xml" in url:
return "YouTube"
# Special handling for Reddit feeds
if "reddit.com" in url or (".rss" in url and "r/" in url):
return "Reddit"
# Parse the URL and extract the domain
parsed_url: ParseResult = urlparse(url)
domain: str = parsed_url.netloc
# If we couldn't extract a domain, return "Other"
if not domain:
return "Other"
# Remove www. prefix if present
domain = re.sub(r"^www\.", "", domain)
# Special handling for common domains
domain_mapping: dict[str, str] = {"github.com": "GitHub"}
if domain in domain_mapping:
return domain_mapping[domain]
# Use tldextract to get the domain (SLD)
ext = tldextract.extract(url)
if ext.domain:
return ext.domain.capitalize()
return domain.capitalize()
except (ValueError, AttributeError, TypeError) as e:
logger.warning("Error extracting domain from %s: %s", url, e)
return "Other"
def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> str | None: # noqa: PLR0912
"""Send a single entry to Discord.
Args:
entry: The entry to send to Discord.
custom_reader: The reader to use. If None, the default reader will be used.
Returns:
str | None: The error message if there was an error, otherwise None.
"""
# Get the default reader if we didn't get a custom one.
reader: Reader = get_reader() if custom_reader is None else custom_reader
# Get the webhook URL for the entry.
webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook", ""))
if not webhook_url:
return "No webhook URL found."
# Check if this is a c3kay feed
if is_c3kay_feed(entry.feed.url):
entry_link: str | None = entry.link
if entry_link:
post_id: str | None = extract_post_id_from_hoyolab_url(entry_link)
if post_id:
post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id)
if post_data:
webhook = create_hoyolab_webhook(webhook_url, entry, post_data)
execute_webhook(webhook, entry)
return None
logger.warning(
"Failed to create Hoyolab webhook for feed %s, falling back to regular processing",
entry.feed.url,
)
else:
logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url)
webhook_message: str = ""
# Try to get the custom message for the feed. If the user has none, we will use the default message.
# This has to be a string for some reason so don't change it to "not custom_message.get_custom_message()"
if get_custom_message(reader, entry.feed) != "": # noqa: PLC1901
webhook_message: str = replace_tags_in_text_message(entry=entry)
if not webhook_message:
webhook_message = "No message found."
# Create the webhook.
try:
should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed"))
except TagNotFoundError:
logger.exception("No should_send_embed tag found for feed: %s", entry.feed.url)
should_send_embed = True
except StorageError:
logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url)
should_send_embed = True
# YouTube feeds should never use embeds
if is_youtube_feed(entry.feed.url):
should_send_embed = False
if should_send_embed:
webhook = create_embed_webhook(webhook_url, entry)
else:
webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True)
execute_webhook(webhook, entry)
return None
def set_description(custom_embed: CustomEmbed, discord_embed: DiscordEmbed) -> None:
"""Set the description of the embed.
Args:
custom_embed (custom_message.CustomEmbed): The custom embed to get the description from.
discord_embed (DiscordEmbed): The Discord embed to set the description on.
"""
# Its actually 2048, but we will use 2000 to be safe.
max_description_length: int = 2000
embed_description: str = custom_embed.description
embed_description = (
f"{embed_description[:max_description_length]}..."
if len(embed_description) > max_description_length
else embed_description
)
discord_embed.set_description(embed_description) if embed_description else None
def set_title(custom_embed: CustomEmbed, discord_embed: DiscordEmbed) -> None:
"""Set the title of the embed.
Args:
custom_embed: The custom embed to get the title from.
discord_embed: The Discord embed to set the title on.
"""
# Its actually 256, but we will use 200 to be safe.
max_title_length: int = 200
embed_title: str = custom_embed.title
embed_title = f"{embed_title[:max_title_length]}..." if len(embed_title) > max_title_length else embed_title
discord_embed.set_title(embed_title) if embed_title else None
def create_embed_webhook(webhook_url: str, entry: Entry) -> DiscordWebhook:
"""Create a webhook with an embed.
Args:
webhook_url (str): The webhook URL.
entry (Entry): The entry to send to Discord.
Returns:
DiscordWebhook: The webhook with the embed.
"""
webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, rate_limit_retry=True)
feed: Feed = entry.feed
# Get the embed data from the database.
custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry)
discord_embed: DiscordEmbed = DiscordEmbed()
set_description(custom_embed=custom_embed, discord_embed=discord_embed)
set_title(custom_embed=custom_embed, discord_embed=discord_embed)
custom_embed_author_url: str | None = custom_embed.author_url
if not is_url_valid(custom_embed_author_url):
custom_embed_author_url = None
custom_embed_color: str | None = custom_embed.color or None
if custom_embed_color and custom_embed_color.startswith("#"):
custom_embed_color = custom_embed_color[1:]
discord_embed.set_color(int(custom_embed_color, 16))
if custom_embed.author_name and not custom_embed_author_url and not custom_embed.author_icon_url:
discord_embed.set_author(name=custom_embed.author_name)
if custom_embed.author_name and custom_embed_author_url and not custom_embed.author_icon_url:
discord_embed.set_author(name=custom_embed.author_name, url=custom_embed_author_url)
if custom_embed.author_name and not custom_embed_author_url and custom_embed.author_icon_url:
discord_embed.set_author(name=custom_embed.author_name, icon_url=custom_embed.author_icon_url)
if custom_embed.author_name and custom_embed_author_url and custom_embed.author_icon_url:
discord_embed.set_author(
name=custom_embed.author_name,
url=custom_embed_author_url,
icon_url=custom_embed.author_icon_url,
)
if custom_embed.thumbnail_url:
discord_embed.set_thumbnail(url=custom_embed.thumbnail_url)
if custom_embed.image_url:
discord_embed.set_image(url=custom_embed.image_url)
if custom_embed.footer_text:
discord_embed.set_footer(text=custom_embed.footer_text)
if custom_embed.footer_icon_url and custom_embed.footer_text:
discord_embed.set_footer(text=custom_embed.footer_text, icon_url=custom_embed.footer_icon_url)
if custom_embed.footer_icon_url and not custom_embed.footer_text:
discord_embed.set_footer(text="-", icon_url=custom_embed.footer_icon_url)
webhook.add_embed(discord_embed)
return webhook
def get_webhook_url(reader: Reader, entry: Entry) -> str:
"""Get the webhook URL for the entry.
Args:
reader: The reader to use.
entry: The entry to get the webhook URL for.
Returns:
str: The webhook URL.
"""
try:
webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook"))
except TagNotFoundError:
logger.exception("No webhook URL found for feed: %s", entry.feed.url)
return ""
except StorageError:
logger.exception("Storage error getting webhook URL for feed: %s", entry.feed.url)
return ""
return webhook_url
def set_entry_as_read(reader: Reader, entry: Entry) -> None:
"""Set the webhook to read, so we don't send it again.
Args:
reader: The reader to use.
entry: The entry to set as read.
"""
try:
reader.set_entry_read(entry, True)
except EntryNotFoundError:
logger.exception("Error setting entry to read: %s", entry.id)
except StorageError:
logger.exception("Error setting entry to read: %s", entry.id)
def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: # noqa: PLR0912
"""Send entries to Discord.
If response was not ok, we will log the error and mark the entry as unread, so it will be sent again next time.
Args:
custom_reader: If we should use a custom reader instead of the default one.
feed: The feed to send to Discord.
do_once: If we should only send one entry. This is used in the test.
"""
# Get the default reader if we didn't get a custom one.
reader: Reader = get_reader() if custom_reader is None else custom_reader
# Check for new entries for every feed.
reader.update_feeds(
scheduled=True,
workers=os.cpu_count() or 1,
)
# Loop through the unread entries.
entries: Iterable[Entry] = reader.get_entries(feed=feed, read=False)
for entry in entries:
set_entry_as_read(reader, entry)
if entry.added < datetime.datetime.now(tz=entry.added.tzinfo) - datetime.timedelta(days=1):
logger.info("Entry is older than 24 hours: %s from %s", entry.id, entry.feed.url)
continue
webhook_url: str = get_webhook_url(reader, entry)
if not webhook_url:
logger.info("No webhook URL found for feed: %s", entry.feed.url)
continue
should_send_embed: bool = should_send_embed_check(reader, entry)
# Youtube feeds only need to send the link
if is_youtube_feed(entry.feed.url):
should_send_embed = False
if should_send_embed:
webhook = create_embed_webhook(webhook_url, entry)
else:
# If the user has set the custom message to an empty string, we will use the default message, otherwise we
# will use the custom message.
if get_custom_message(reader, entry.feed) != "": # noqa: PLC1901
webhook_message = replace_tags_in_text_message(entry)
else:
webhook_message: str = str(default_custom_message)
webhook_message = truncate_webhook_message(webhook_message)
# Create the webhook.
webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True)
# Check if the entry is blacklisted, and if it is, we will skip it.
if entry_should_be_skipped(reader, entry):
logger.info("Entry was blacklisted: %s", entry.id)
continue
# Check if the feed has a whitelist, and if it does, check if the entry is whitelisted.
if has_white_tags(reader, entry.feed) and not should_be_sent(reader, entry):
logger.info("Entry was not whitelisted: %s", entry.id)
continue
# Use a custom webhook for Hoyolab feeds.
if is_c3kay_feed(entry.feed.url):
entry_link: str | None = entry.link
if entry_link:
post_id: str | None = extract_post_id_from_hoyolab_url(entry_link)
if post_id:
post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id)
if post_data:
webhook = create_hoyolab_webhook(webhook_url, entry, post_data)
execute_webhook(webhook, entry)
return
logger.warning(
"Failed to create Hoyolab webhook for feed %s, falling back to regular processing",
entry.feed.url,
)
else:
logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url)
# Send the entry to Discord as it is not blacklisted or feed has a whitelist.
execute_webhook(webhook, entry)
# If we only want to send one entry, we will break the loop. This is used when testing this function.
if do_once:
logger.info("Sent one entry to Discord. Breaking the loop.")
break
def execute_webhook(webhook: DiscordWebhook, entry: Entry) -> None:
"""Execute the webhook.
Args:
webhook (DiscordWebhook): The webhook to execute.
entry (Entry): The entry to send to Discord.
"""
response: Response = webhook.execute()
if response.status_code not in {200, 204}:
msg: str = f"Error sending entry to Discord: {response.text}\n{pprint.pformat(webhook.json)}"
if entry:
msg += f"\n{entry}"
logger.error(msg)
else:
logger.info("Sent entry to Discord: %s", entry.id)
def is_youtube_feed(feed_url: str) -> bool:
"""Check if the feed is a YouTube feed.
Args:
feed_url: The feed URL to check.
Returns:
bool: True if the feed is a YouTube feed, False otherwise.
"""
return "youtube.com/feeds/videos.xml" in feed_url
def should_send_embed_check(reader: Reader, entry: Entry) -> bool:
"""Check if we should send an embed to Discord.
Args:
reader (Reader): The reader to use.
entry (Entry): The entry to check.
Returns:
bool: True if we should send an embed, False otherwise.
"""
# YouTube feeds should never use embeds - only links
if is_youtube_feed(entry.feed.url):
return False
try:
should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed"))
except TagNotFoundError:
logger.exception("No should_send_embed tag found for feed: %s", entry.feed.url)
should_send_embed = True
except ReaderError:
logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url)
should_send_embed = True
return should_send_embed
def truncate_webhook_message(webhook_message: str) -> str:
"""Truncate the webhook message if it is too long.
Args:
webhook_message (str): The webhook message to truncate.
Returns:
str: The truncated webhook message.
"""
max_content_length: int = 4000
if len(webhook_message) > max_content_length:
half_length = (max_content_length - 3) // 2 # Subtracting 3 for the "..." in the middle
webhook_message = f"{webhook_message[:half_length]}...{webhook_message[-half_length:]}"
return webhook_message
def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None:
"""Add a new feed, update it and mark every entry as read.
Args:
reader: The reader to use.
feed_url: The feed to add.
webhook_dropdown: The webhook we should send entries to.
Raises:
HTTPException: If webhook_dropdown does not equal a webhook or default_custom_message not found.
"""
clean_feed_url: str = feed_url.strip()
webhook_url: str = ""
if hooks := reader.get_tag((), "webhooks", []):
# Get the webhook URL from the dropdown.
for hook in hooks:
if not isinstance(hook, dict):
logger.error("Webhook is not a dict: %s", hook)
continue
if hook["name"] == webhook_dropdown: # pyright: ignore[reportArgumentType]
webhook_url = hook["url"]
break
if not webhook_url:
raise HTTPException(status_code=404, detail="Webhook not found")
try:
reader.add_feed(clean_feed_url)
except FeedExistsError:
# Add the webhook to an already added feed if it doesn't have a webhook instead of trying to create a new.
try:
reader.get_tag(clean_feed_url, "webhook")
except TagNotFoundError:
reader.set_tag(clean_feed_url, "webhook", webhook_url) # pyright: ignore[reportArgumentType]
except ReaderError as e:
raise HTTPException(status_code=404, detail=f"Error adding feed: {e}") from e
try:
reader.update_feed(clean_feed_url)
except ReaderError as e:
raise HTTPException(status_code=404, detail=f"Error updating feed: {e}") from e
# Mark every entry as read, so we don't send all the old entries to Discord.
entries: Iterable[Entry] = reader.get_entries(feed=clean_feed_url, read=False)
for entry in entries:
reader.set_entry_read(entry, True)
if not default_custom_message:
# TODO(TheLovinator): Show this error on the page.
raise HTTPException(status_code=404, detail="Default custom message couldn't be found.")
# This is the webhook that will be used to send the feed to Discord.
reader.set_tag(clean_feed_url, "webhook", webhook_url) # pyright: ignore[reportArgumentType]
# This is the default message that will be sent to Discord.
reader.set_tag(clean_feed_url, "custom_message", default_custom_message) # pyright: ignore[reportArgumentType]
# Update the full-text search index so our new feed is searchable.
reader.update_search()
add_missing_tags(reader)