diff --git a/discord_rss_bot/custom_message.py b/discord_rss_bot/custom_message.py index fc561e1..875c0a1 100644 --- a/discord_rss_bot/custom_message.py +++ b/discord_rss_bot/custom_message.py @@ -1,3 +1,6 @@ +import json +from dataclasses import dataclass + from bs4 import BeautifulSoup from reader import Entry, Feed, Reader, TagNotFoundError @@ -5,6 +8,20 @@ from discord_rss_bot.markdown import convert_html_to_md from discord_rss_bot.settings import get_reader +@dataclass() +class CustomEmbed: + title: str + description: str + color: str + author_name: str + author_url: str + author_icon_url: str + image_url: str + thumbnail_url: str + footer_text: str + footer_icon_url: str + + def get_images_from_entry(entry: Entry): """Get images from a entry. @@ -16,7 +33,7 @@ def get_images_from_entry(entry: Entry): """ def return_image(found_images): - soup: BeautifulSoup = BeautifulSoup(found_images, "html.parser") + soup: BeautifulSoup = BeautifulSoup(found_images, features="lxml") images = soup.find_all("img") for image in images: image_src = image["src"] or "" @@ -43,9 +60,8 @@ def try_to_replace(custom_message: str, template: str, replace_with: str) -> str Args: custom_message: The custom_message to replace tags in. - feed: The feed to get the tags from. - entry: The entry to get the tags from. - tag: The tag to replace. + template: The tag to replace. + replace_with: What to replace the tag with. Returns: Returns the custom_message with the tag replaced. @@ -56,7 +72,7 @@ def try_to_replace(custom_message: str, template: str, replace_with: str) -> str return custom_message -def replace_tags(feed: Feed, entry: Entry) -> str: +def replace_tags_in_text_message(feed: Feed, entry: Entry) -> str: """Replace tags in custom_message. Args: @@ -122,6 +138,82 @@ def replace_tags(feed: Feed, entry: Entry) -> str: return custom_message.replace("\\n", "\n") +def replace_tags_in_embed(feed: Feed, entry: Entry) -> CustomEmbed: + """Replace tags in embed. + + Args: + feed: The feed to get the tags from. + entry: The entry to get the tags from. + + Returns: + Returns the embed with the tags replaced. + """ + + custom_reader: Reader = get_reader() + embed: CustomEmbed = get_embed(feed=feed, custom_reader=custom_reader) + + summary = "" + content = "" + if entry.summary: + summary: str = entry.summary + summary = convert_html_to_md(summary) + + if entry.content: + for content_item in entry.content: + content: str = content_item.value + content = convert_html_to_md(content) + + if images := get_images_from_entry(entry=entry): + first_image: str = images[0][0] + else: + first_image = "" + + list_of_replacements = [ + {"{{feed_author}}": feed.author}, + {"{{feed_added}}": feed.added}, + {"{{feed_last_exception}}": feed.last_exception}, + {"{{feed_last_updated}}": feed.last_updated}, + {"{{feed_link}}": feed.link}, + {"{{feed_subtitle}}": feed.subtitle}, + {"{{feed_title}}": feed.title}, + {"{{feed_updated}}": feed.updated}, + {"{{feed_updates_enabled}}": str(feed.updates_enabled)}, + {"{{feed_url}}": feed.url}, + {"{{feed_user_title}}": feed.user_title}, + {"{{feed_version}}": feed.version}, + {"{{entry_added}}": entry.added}, + {"{{entry_author}}": entry.author}, + {"{{entry_content}}": content}, + {"{{entry_content_raw}}": entry.content[0].value if entry.content else ""}, + {"{{entry_id}}": entry.id}, + {"{{entry_important}}": str(entry.important)}, + {"{{entry_link}}": entry.link}, + {"{{entry_published}}": entry.published}, + {"{{entry_read}}": str(entry.read)}, + {"{{entry_read_modified}}": entry.read_modified}, + {"{{entry_summary}}": summary}, + {"{{entry_summary_raw}}": entry.summary or ""}, + {"{{entry_title}}": entry.title}, + {"{{entry_updated}}": entry.updated}, + {"{{image_1}}": first_image}, + ] + + for replacement in list_of_replacements: + for template, replace_with in replacement.items(): + embed.title = try_to_replace(embed.title, template, replace_with) + embed.description = try_to_replace(embed.description, template, replace_with) + embed.color = try_to_replace(embed.color, template, replace_with) + embed.author_name = try_to_replace(embed.author_name, template, replace_with) + embed.author_url = try_to_replace(embed.author_url, template, replace_with) + embed.author_icon_url = try_to_replace(embed.author_icon_url, template, replace_with) + embed.image_url = try_to_replace(embed.image_url, template, replace_with) + embed.thumbnail_url = try_to_replace(embed.thumbnail_url, template, replace_with) + embed.footer_text = try_to_replace(embed.footer_text, template, replace_with) + embed.footer_icon_url = try_to_replace(embed.footer_icon_url, template, replace_with) + + return embed + + def get_custom_message(custom_reader: Reader, feed: Feed) -> str: """Get custom_message tag from feed. @@ -140,3 +232,102 @@ def get_custom_message(custom_reader: Reader, feed: Feed) -> str: custom_message = "" return custom_message + + +def save_embed(custom_reader: Reader, feed: Feed, embed: CustomEmbed) -> None: + """Set embed tag in feed. + + Args: + custom_reader: What Reader to use. + feed: The feed to set the tag in. + embed: The embed to set. + """ + embed_dict: dict[str, str] = { + "title": embed.title, + "description": embed.description, + "color": embed.color.replace("#", "").replace("0x", ""), + "author_name": embed.author_name, + "author_url": embed.author_url, + "author_icon_url": embed.author_icon_url, + "image_url": embed.image_url, + "thumbnail_url": embed.thumbnail_url, + "footer_text": embed.footer_text, + "footer_icon_url": embed.footer_icon_url, + } + + custom_reader.set_tag(feed, "embed", json.dumps(embed_dict)) # type: ignore + + +def get_embed(custom_reader: Reader, feed: Feed) -> CustomEmbed: + """Get embed tag from feed. + + Args: + custom_reader: What Reader to use. + feed: The feed to get the tag from. + + Returns: + Returns the contents from the embed tag. + """ + embed_json: dict[str, str] = {} + try: + embed: str = str(custom_reader.get_tag(feed, "embed")) + except TagNotFoundError: + embed = "" + except ValueError: + embed = "" + + if embed: + try: + embed_json = json.loads(embed) + except json.decoder.JSONDecodeError: + embed_json = "" # type: ignore + + if embed_json: + return get_embed_data(embed_json) + + return CustomEmbed( + title="", + description="", + color="", + author_name="", + author_url="", + author_icon_url="", + image_url="", + thumbnail_url="", + footer_text="", + footer_icon_url="", + ) + + +def get_embed_data(embed_data) -> CustomEmbed: + """Get embed data from embed_data. + + Args: + embed_data: The embed_data to get the data from. + + Returns: + Returns the embed data. + """ + title: str = embed_data.get("title", "") + description: str = embed_data.get("description", "") + color: str = embed_data.get("color", "") + author_name: str = embed_data.get("author_name", "") + author_url: str = embed_data.get("author_url", "") + author_icon_url: str = embed_data.get("author_icon_url", "") + image_url: str = embed_data.get("image_url", "") + thumbnail_url: str = embed_data.get("thumbnail_url", "") + footer_text: str = embed_data.get("footer_text", "") + footer_icon_url: str = embed_data.get("footer_icon_url", "") + + return CustomEmbed( + title=title, + description=description, + color=color, + author_name=author_name, + author_url=author_url, + author_icon_url=author_icon_url, + image_url=image_url, + thumbnail_url=thumbnail_url, + footer_text=footer_text, + footer_icon_url=footer_icon_url, + ) diff --git a/discord_rss_bot/feeds.py b/discord_rss_bot/feeds.py index 506cf67..f433cbb 100644 --- a/discord_rss_bot/feeds.py +++ b/discord_rss_bot/feeds.py @@ -1,6 +1,6 @@ from typing import Iterable -from discord_webhook import DiscordWebhook +from discord_webhook import DiscordEmbed, DiscordWebhook from reader import Entry, Feed, Reader from requests import Response @@ -45,18 +45,61 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None): # Try to get the custom message for the feed. If the user has none, we will use the default message. if custom_message.get_custom_message(reader, entry.feed) != "": - webhook_message = custom_message.replace_tags(entry=entry, feed=entry.feed) # type: ignore + webhook_message = custom_message.replace_tags_in_text_message(entry=entry, feed=entry.feed) # type: ignore else: webhook_message: str = default_custom_message # Create the webhook. - webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True) + if bool(reader.get_tag(entry.feed, "should_send_embed")): + webhook = create_embed_webhook(webhook_url, entry) + else: + webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True) response: Response = webhook.execute() if not response.ok: return f"Error sending entry to Discord: {response.text}" +def create_embed_webhook(webhook_url: str, entry: Entry) -> DiscordWebhook: + webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, rate_limit_retry=True) + feed: Feed = entry.feed + + # Get the embed data from the database. + custom_embed: custom_message.CustomEmbed = custom_message.replace_tags_in_embed(feed=feed, entry=entry) + + discord_embed: DiscordEmbed = DiscordEmbed() + + if custom_embed.title: + discord_embed.set_title(custom_embed.title) + if custom_embed.description: + discord_embed.set_description(custom_embed.description) + if custom_embed.color: + discord_embed.set_color(custom_embed.color) + if custom_embed.author_name and not custom_embed.author_url and not custom_embed.author_icon_url: + discord_embed.set_author(name=custom_embed.author_name) + if custom_embed.author_name and custom_embed.author_url and not custom_embed.author_icon_url: + discord_embed.set_author(name=custom_embed.author_name, url=custom_embed.author_url) + if custom_embed.author_name and not custom_embed.author_url and custom_embed.author_icon_url: + discord_embed.set_author(name=custom_embed.author_name, icon_url=custom_embed.author_icon_url) + if custom_embed.author_name and custom_embed.author_url and custom_embed.author_icon_url: + discord_embed.set_author(name=custom_embed.author_name, url=custom_embed.author_url, icon_url=custom_embed.author_icon_url) # noqa: E501 + if custom_embed.thumbnail_url: + discord_embed.set_thumbnail(url=custom_embed.thumbnail_url) + if custom_embed.image_url: + discord_embed.set_image(url=custom_embed.image_url) + if custom_embed.footer_text: + discord_embed.set_footer(text=custom_embed.footer_text) + if custom_embed.footer_icon_url and custom_embed.footer_text: + discord_embed.set_footer(text=custom_embed.footer_text, icon_url=custom_embed.footer_icon_url) + if custom_embed.footer_icon_url and not custom_embed.footer_text: + # TODO: Can this be done without a text? + discord_embed.set_footer(icon_url=custom_embed.footer_icon_url) + + webhook.add_embed(discord_embed) + + return webhook + + def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = None, do_once: bool = False) -> None: """ Send entries to Discord. @@ -93,15 +136,18 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non if not webhook_url: continue - # If the user has set the custom message to an empty string, we will use the default message, otherwise we will - # use the custom message. - if custom_message.get_custom_message(reader, entry.feed) != "": - webhook_message = custom_message.replace_tags(entry=entry, feed=entry.feed) # type: ignore + if bool(reader.get_tag(entry.feed, "should_send_embed")): + webhook = create_embed_webhook(webhook_url, entry) else: - webhook_message: str = default_custom_message + # If the user has set the custom message to an empty string, we will use the default message, otherwise we + # will use the custom message. + if custom_message.get_custom_message(reader, entry.feed) != "": + webhook_message = custom_message.replace_tags_in_text_message(entry=entry, feed=entry.feed) + else: + webhook_message: str = default_custom_message - # Create the webhook. - webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True) + # Create the webhook. + webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True) # Check if the feed has a whitelist, and if it does, check if the entry is whitelisted. if feed is not None and has_white_tags(reader, feed): diff --git a/discord_rss_bot/main.py b/discord_rss_bot/main.py index 8c17978..e951361 100644 --- a/discord_rss_bot/main.py +++ b/discord_rss_bot/main.py @@ -13,11 +13,19 @@ from starlette.responses import RedirectResponse from discord_rss_bot import settings from discord_rss_bot.custom_filters import encode_url, entry_is_blacklisted, entry_is_whitelisted -from discord_rss_bot.custom_message import get_custom_message, get_images_from_entry, replace_tags +from discord_rss_bot.custom_message import ( + CustomEmbed, + get_custom_message, + get_embed, + get_images_from_entry, + replace_tags_in_text_message, + save_embed, +) from discord_rss_bot.feeds import get_entry_from_id, send_entry_to_discord, send_to_discord from discord_rss_bot.filter.blacklist import get_blacklist_content, get_blacklist_summary, get_blacklist_title from discord_rss_bot.filter.whitelist import get_whitelist_content, get_whitelist_summary, get_whitelist_title from discord_rss_bot.markdown import convert_html_to_md +from discord_rss_bot.missing_tags import add_missing_tags from discord_rss_bot.search import create_html_for_search_results from discord_rss_bot.settings import default_custom_message, get_reader, list_webhooks @@ -367,6 +375,125 @@ async def get_custom(feed_url, request: Request): return templates.TemplateResponse("custom.html", context) +@app.get("/embed", response_class=HTMLResponse) +async def get_embed_page(feed_url, request: Request): + """Get the custom message. This is used when sending the message to Discord. + + Args: + feed_url: What feed we should get the custom message for. + request: The HTTP request. + + Returns: + custom.html + """ + + # Make feed_url a valid URL. + url: str = urllib.parse.unquote(feed_url) + + feed: Feed = reader.get_feed(url) + + # Get previous data, this is used when creating the form. + embed: CustomEmbed = get_embed(reader, feed) + + context = { + "request": request, + "feed": feed, + "title": embed.title, + "description": embed.description, + "color": embed.color, + "image_url": embed.image_url, + "thumbnail_url": embed.thumbnail_url, + "author_name": embed.author_name, + "author_url": embed.author_url, + "author_icon_url": embed.author_icon_url, + "footer_text": embed.footer_text, + "footer_icon_url": embed.footer_icon_url, + } + + # Get the first entry, this is used to show the user what the custom message will look like. + entries: Iterable[Entry] = reader.get_entries(feed=feed, limit=1) + + if custom_embed := get_embed(reader, feed_url): + context["custom_embed"] = custom_embed + + for entry in entries: + # Append to context. + context["entry"] = entry + return templates.TemplateResponse("embed.html", context) + + +@app.post("/embed", response_class=HTMLResponse) +async def set_embed_page( + feed_url=Form(), + title=Form(""), + description=Form(""), + color=Form(""), + image_url=Form(""), + thumbnail_url=Form(""), + author_name=Form(""), + author_url=Form(""), + author_icon_url=Form(""), + footer_text=Form(""), + footer_icon_url=Form(""), +): + """Set the embed settings. + + Args: + feed_url: What feed we should get the custom message for. + request: The HTTP request. + + Returns: + custom.html + """ + + # Make feed_url a valid URL. + url: str = urllib.parse.unquote(feed_url) + + feed: Feed = reader.get_feed(url) + + custom_embed: CustomEmbed = get_embed(reader, feed) + + # Get the data from the form. + custom_embed.title = title + custom_embed.description = description + custom_embed.color = color + custom_embed.image_url = image_url + custom_embed.thumbnail_url = thumbnail_url + custom_embed.author_name = author_name + custom_embed.author_url = author_url + custom_embed.author_icon_url = author_icon_url + custom_embed.footer_text = footer_text + custom_embed.footer_icon_url = footer_icon_url + + # Save the data. + save_embed(reader, feed_url, custom_embed) + + # Clean URL is used to redirect to the feed page. + clean_url: str = urllib.parse.quote(feed_url) + + return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303) + + +@app.post("/use_embed") +async def set_should_use_embed(feed_url=Form()): + url: str = urllib.parse.unquote(feed_url) + print(f"Setting should_send_embed to True for {url}") + + feed: Feed = reader.get_feed(url) + reader.set_tag(feed, "should_send_embed", True) # type: ignore + return RedirectResponse(url=f"/feed/?feed_url={feed_url}", status_code=303) + + +@app.post("/use_text") +async def set_should_use_text(feed_url=Form()): + url: str = urllib.parse.unquote(feed_url) + print(f"Setting should_send_embed to False for {url}") + + feed: Feed = reader.get_feed(url) + reader.set_tag(feed, "should_send_embed", False) # type: ignore + return RedirectResponse(url=f"/feed/?feed_url={feed_url}", status_code=303) + + @app.get("/add", response_class=HTMLResponse) def get_add(request: Request): """ @@ -408,7 +535,16 @@ async def get_feed(feed_url, request: Request): # Create the html for the entries. html: str = create_html_for_feed(entries) - context = {"request": request, "feed": feed, "entries": entries, "feed_counts": feed_counts, "html": html} + should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed")) + + context = { + "request": request, + "feed": feed, + "entries": entries, + "feed_counts": feed_counts, + "html": html, + "should_send_embed": should_send_embed, + } return templates.TemplateResponse("feed.html", context) @@ -433,7 +569,7 @@ def create_html_for_feed(entries: Iterable[Entry]) -> str: first_image_text: str = images[0][1] # Get the text from the entry. - text = replace_tags(entry.feed, entry) + text = replace_tags_in_text_message(entry.feed, entry) if not text: text = "