diff --git a/discord_rss_bot/custom_message.py b/discord_rss_bot/custom_message.py index 7dde38a..bafd04d 100644 --- a/discord_rss_bot/custom_message.py +++ b/discord_rss_bot/custom_message.py @@ -72,16 +72,16 @@ def try_to_replace(custom_message: str, template: str, replace_with: str) -> str return custom_message -def replace_tags_in_text_message(feed: Feed, entry: Entry) -> str: +def replace_tags_in_text_message(entry: Entry) -> str: """Replace tags in custom_message. Args: - feed: The feed to get the tags from. entry: The entry to get the tags from. Returns: Returns the custom_message with the tags replaced. """ + feed: Feed = entry.feed custom_reader: Reader = get_reader() custom_message: str = get_custom_message(feed=feed, custom_reader=custom_reader) diff --git a/discord_rss_bot/feeds.py b/discord_rss_bot/feeds.py index 71f01a2..0767a17 100644 --- a/discord_rss_bot/feeds.py +++ b/discord_rss_bot/feeds.py @@ -11,24 +11,6 @@ from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent from discord_rss_bot.settings import default_custom_message, get_reader -def get_entry_from_id(entry_id: str, custom_reader: Reader | None = None) -> Entry | None: - """ - Get an entry from an ID. - - Args: - entry_id: The ID of the entry. - custom_reader: If we should use a custom reader instead of the default one. - - Returns: - Entry: The entry with the ID. None if it doesn't exist. - """ - # Get the default reader if we didn't get a custom one. - reader: Reader = get_reader() if custom_reader is None else custom_reader - - # Get the entry from the ID, or return None if it doesn't exist. - return next((entry for entry in reader.get_entries() if entry.id == entry_id), None) - - def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None): """ Send a single entry to Discord. @@ -46,7 +28,7 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None): # Try to get the custom message for the feed. If the user has none, we will use the default message. if custom_message.get_custom_message(reader, entry.feed) != "": - webhook_message = custom_message.replace_tags_in_text_message(entry=entry, feed=entry.feed) # type: ignore + webhook_message = custom_message.replace_tags_in_text_message(entry=entry) else: webhook_message: str = default_custom_message @@ -144,7 +126,7 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non # If the user has set the custom message to an empty string, we will use the default message, otherwise we # will use the custom message. if custom_message.get_custom_message(reader, entry.feed) != "": - webhook_message = custom_message.replace_tags_in_text_message(entry=entry, feed=entry.feed) + webhook_message = custom_message.replace_tags_in_text_message(entry) else: webhook_message: str = default_custom_message diff --git a/discord_rss_bot/main.py b/discord_rss_bot/main.py index 3b2df97..cdda8e7 100644 --- a/discord_rss_bot/main.py +++ b/discord_rss_bot/main.py @@ -3,7 +3,7 @@ import urllib.parse from dataclasses import dataclass from datetime import datetime from functools import lru_cache -from typing import Dict, Iterable +from typing import Iterable import httpx import uvicorn @@ -13,17 +13,7 @@ from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from httpx import Response -from reader import ( - Entry, - EntryCounts, - EntrySearchCounts, - EntrySearchResult, - Feed, - FeedCounts, - FeedNotFoundError, - Reader, - TagNotFoundError, -) +from reader import Entry, Feed, FeedNotFoundError, Reader, TagNotFoundError from starlette.responses import RedirectResponse from discord_rss_bot import settings @@ -36,7 +26,7 @@ from discord_rss_bot.custom_message import ( replace_tags_in_text_message, save_embed, ) -from discord_rss_bot.feeds import create_feed, get_entry_from_id, send_entry_to_discord, send_to_discord +from discord_rss_bot.feeds import create_feed, send_entry_to_discord, send_to_discord from discord_rss_bot.filter.blacklist import get_blacklist_content, get_blacklist_summary, get_blacklist_title from discord_rss_bot.filter.whitelist import get_whitelist_content, get_whitelist_summary, get_whitelist_title from discord_rss_bot.markdown import convert_html_to_md @@ -59,7 +49,7 @@ templates.env.filters["discord_markdown"] = convert_html_to_md @app.post("/add_webhook") -async def post_add_webhook(webhook_name=Form(), webhook_url=Form()): +async def post_add_webhook(webhook_name: str = Form(), webhook_url: str = Form()): """ Add a feed to the database. @@ -72,7 +62,7 @@ async def post_add_webhook(webhook_name=Form(), webhook_url=Form()): @app.post("/delete_webhook") -async def post_delete_webhook(webhook_url=Form()): +async def post_delete_webhook(webhook_url: str = Form()): """ Delete a webhook from the database. @@ -84,49 +74,48 @@ async def post_delete_webhook(webhook_url=Form()): @app.post("/add") -async def post_create_feed(feed_url=Form(), webhook_dropdown=Form()): +async def post_create_feed(feed_url: str = Form(), webhook_dropdown: str = Form()): """ Add a feed to the database. Args: feed_url: The feed to add. webhook_dropdown: The webhook to use. - - Returns: - dict: The feed that was added. """ create_feed(reader, feed_url, webhook_dropdown) return RedirectResponse(url=f"/feed/?feed_url={feed_url}", status_code=303) @app.post("/pause") -async def post_pause_feed(feed_url=Form()): +async def post_pause_feed(feed_url: str = Form()): """Pause a feed. Args: feed_url: The feed to pause. """ - reader.disable_feed_updates(feed_url) - return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(feed_url)}", status_code=303) + clean_feed_url: str = feed_url.strip() + reader.disable_feed_updates(clean_feed_url) + return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.post("/unpause") -async def post_unpause_feed(feed_url=Form()): +async def post_unpause_feed(feed_url: str = Form()): """Unpause a feed. Args: feed_url: The Feed to unpause. """ - reader.enable_feed_updates(feed_url) - return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(feed_url)}", status_code=303) + clean_feed_url: str = feed_url.strip() + reader.enable_feed_updates(clean_feed_url) + return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.post("/whitelist") async def post_set_whitelist( - whitelist_title=Form(None), - whitelist_summary=Form(None), - whitelist_content=Form(None), - feed_url=Form(), + whitelist_title: str = Form(None), + whitelist_summary: str = Form(None), + whitelist_content: str = Form(None), + feed_url: str = Form(), ): """Set what the whitelist should be sent, if you have this set only words in the whitelist will be sent. @@ -136,25 +125,26 @@ async def post_set_whitelist( whitelist_content: Whitelisted words for when checking the title. feed_url: The feed we should set the whitelist for. """ + clean_feed_url: str = feed_url.strip() if whitelist_title: - reader.set_tag(feed_url, "whitelist_title", whitelist_title) + reader.set_tag(clean_feed_url, "whitelist_title", whitelist_title) # type: ignore if whitelist_summary: - reader.set_tag(feed_url, "whitelist_summary", whitelist_summary) + reader.set_tag(clean_feed_url, "whitelist_summary", whitelist_summary) # type: ignore if whitelist_content: - reader.set_tag(feed_url, "whitelist_content", whitelist_content) + reader.set_tag(clean_feed_url, "whitelist_content", whitelist_content) # type: ignore - return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(feed_url)}", status_code=303) + return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.get("/whitelist", response_class=HTMLResponse) -async def get_whitelist(feed_url, request: Request): +async def get_whitelist(feed_url: str, request: Request): """Get the whitelist. Args: feed_url: What feed we should get the whitelist for. - request: The HTTP request. """ - feed: Feed = reader.get_feed(urllib.parse.unquote(feed_url)) + clean_feed_url: str = feed_url.strip() + feed: Feed = reader.get_feed(urllib.parse.unquote(clean_feed_url)) # Get previous data, this is used when creating the form. whitelist_title: str = get_whitelist_title(reader, feed) @@ -173,10 +163,10 @@ async def get_whitelist(feed_url, request: Request): @app.post("/blacklist") async def post_set_blacklist( - blacklist_title=Form(None), - blacklist_summary=Form(None), - blacklist_content=Form(None), - feed_url=Form(), + blacklist_title: str = Form(None), + blacklist_summary: str = Form(None), + blacklist_content: str = Form(None), + feed_url: str = Form(), ): """Set the blacklist, if this is set we will check if words are in the title, summary or content and then don't send that entry. @@ -187,18 +177,19 @@ async def post_set_blacklist( blacklist_content: Blacklisted words for when checking the content. feed_url: What feed we should set the blacklist for. """ + clean_feed_url = feed_url.strip() if blacklist_title: - reader.set_tag(feed_url, "blacklist_title", blacklist_title) + reader.set_tag(clean_feed_url, "blacklist_title", blacklist_title) # type: ignore if blacklist_summary: - reader.set_tag(feed_url, "blacklist_summary", blacklist_summary) + reader.set_tag(clean_feed_url, "blacklist_summary", blacklist_summary) # type: ignore if blacklist_content: - reader.set_tag(feed_url, "blacklist_content", blacklist_content) + reader.set_tag(clean_feed_url, "blacklist_content", blacklist_content) # type: ignore return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(feed_url)}", status_code=303) @app.get("/blacklist", response_class=HTMLResponse) -async def get_blacklist(feed_url, request: Request): +async def get_blacklist(feed_url: str, request: Request): feed: Feed = reader.get_feed(urllib.parse.unquote(feed_url)) # Get previous data, this is used when creating the form. @@ -217,7 +208,7 @@ async def get_blacklist(feed_url, request: Request): @app.post("/custom") -async def post_set_custom(custom_message=Form(""), feed_url=Form()): +async def post_set_custom(custom_message: str = Form(""), feed_url: str = Form()): """ Set the custom message, this is used when sending the message. @@ -225,63 +216,44 @@ async def post_set_custom(custom_message=Form(""), feed_url=Form()): custom_message: The custom message. feed_url: The feed we should set the custom message for. """ - if custom_message := custom_message.strip(): - reader.set_tag(feed_url, "custom_message", custom_message) # type: ignore + if custom_message: + reader.set_tag(feed_url, "custom_message", custom_message.strip()) # type: ignore else: reader.set_tag(feed_url, "custom_message", settings.default_custom_message) # type: ignore - clean_url: str = urllib.parse.quote(feed_url) - - return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303) + return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(feed_url)}", status_code=303) @app.get("/custom", response_class=HTMLResponse) -async def get_custom(feed_url, request: Request): +async def get_custom(feed_url: str, request: Request): """Get the custom message. This is used when sending the message to Discord. Args: feed_url: What feed we should get the custom message for. - request: The HTTP request. - - Returns: - custom.html """ + feed: Feed = reader.get_feed(urllib.parse.unquote(feed_url.strip())) - # Make feed_url a valid URL. - url: str = urllib.parse.unquote(feed_url) - - feed: Feed = reader.get_feed(url) - - # Get previous data, this is used when creating the form. - custom_message: str = get_custom_message(reader, feed) - - context = {"request": request, "feed": feed, "custom_message": custom_message} + context = { + "request": request, + "feed": feed, + "custom_message": get_custom_message(reader, feed), + } # Get the first entry, this is used to show the user what the custom message will look like. - entries: Iterable[Entry] = reader.get_entries(feed=feed, limit=1) - - for entry in entries: - # Append to context. + for entry in reader.get_entries(feed=feed, limit=1): context["entry"] = entry + return templates.TemplateResponse("custom.html", context) @app.get("/embed", response_class=HTMLResponse) -async def get_embed_page(feed_url, request: Request): +async def get_embed_page(feed_url: str, request: Request): """Get the custom message. This is used when sending the message to Discord. Args: feed_url: What feed we should get the custom message for. - request: The HTTP request. - - Returns: - custom.html """ - - # Make feed_url a valid URL. - url: str = urllib.parse.unquote(feed_url) - - feed: Feed = reader.get_feed(url) + feed: Feed = reader.get_feed(urllib.parse.unquote(feed_url.strip())) # Get previous data, this is used when creating the form. embed: CustomEmbed = get_embed(reader, feed) @@ -300,14 +272,10 @@ async def get_embed_page(feed_url, request: Request): "footer_text": embed.footer_text, "footer_icon_url": embed.footer_icon_url, } - - # Get the first entry, this is used to show the user what the custom message will look like. - entries: Iterable[Entry] = reader.get_entries(feed=feed, limit=1) - - if custom_embed := get_embed(reader, feed_url): + if custom_embed := get_embed(reader, feed): context["custom_embed"] = custom_embed - for entry in entries: + for entry in reader.get_entries(feed=feed, limit=1): # Append to context. context["entry"] = entry return templates.TemplateResponse("embed.html", context) @@ -315,36 +283,27 @@ async def get_embed_page(feed_url, request: Request): @app.post("/embed", response_class=HTMLResponse) async def post_embed( - feed_url=Form(), - title=Form(""), - description=Form(""), - color=Form(""), - image_url=Form(""), - thumbnail_url=Form(""), - author_name=Form(""), - author_url=Form(""), - author_icon_url=Form(""), - footer_text=Form(""), - footer_icon_url=Form(""), + feed_url: str = Form(), + title: str = Form(""), + description: str = Form(""), + color: str = Form(""), + image_url: str = Form(""), + thumbnail_url: str = Form(""), + author_name: str = Form(""), + author_url: str = Form(""), + author_icon_url: str = Form(""), + footer_text: str = Form(""), + footer_icon_url: str = Form(""), ): """Set the embed settings. Args: feed_url: What feed we should get the custom message for. - request: The HTTP request. - - Returns: - custom.html """ - - # Make feed_url a valid URL. - url: str = urllib.parse.unquote(feed_url) - - feed: Feed = reader.get_feed(url) + clean_feed_url: str = feed_url.strip() + feed: Feed = reader.get_feed(urllib.parse.unquote(clean_feed_url)) custom_embed: CustomEmbed = get_embed(reader, feed) - - # Get the data from the form. custom_embed.title = title custom_embed.description = description custom_embed.color = color @@ -357,62 +316,55 @@ async def post_embed( custom_embed.footer_icon_url = footer_icon_url # Save the data. - save_embed(reader, feed_url, custom_embed) + save_embed(reader, feed, custom_embed) - clean_url: str = urllib.parse.quote(feed_url) - - return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303) + return RedirectResponse(url=f"/feed/?feed_url={clean_feed_url}", status_code=303) @app.post("/use_embed") -async def post_use_embed(feed_url=Form()): - url: str = urllib.parse.unquote(feed_url) +async def post_use_embed(feed_url: str = Form()): + """Use embed instead of text. - feed: Feed = reader.get_feed(url) - reader.set_tag(feed, "should_send_embed", True) # type: ignore - return RedirectResponse(url=f"/feed/?feed_url={feed_url}", status_code=303) + Args: + feed_url: The feed to change. + """ + clean_feed_url: str = feed_url.strip() + reader.set_tag(clean_feed_url, "should_send_embed", True) # type: ignore + return RedirectResponse(url=f"/feed/?feed_url={clean_feed_url}", status_code=303) @app.post("/use_text") -async def post_use_text(feed_url=Form()): - url: str = urllib.parse.unquote(feed_url) +async def post_use_text(feed_url: str = Form()): + """Use text instead of embed. - feed: Feed = reader.get_feed(url) - reader.set_tag(feed, "should_send_embed", False) # type: ignore - return RedirectResponse(url=f"/feed/?feed_url={feed_url}", status_code=303) + Args: + feed_url: The feed to change. + """ + clean_feed_url: str = feed_url.strip() + reader.set_tag(clean_feed_url, "should_send_embed", False) # type: ignore + return RedirectResponse(url=f"/feed/?feed_url={clean_feed_url}", status_code=303) @app.get("/add", response_class=HTMLResponse) def get_add(request: Request): - """ - Page for adding a new feed. - - Args: - request: The request. - """ - context = make_context_index(request) - return templates.TemplateResponse("add.html", context) + """Page for adding a new feed.""" + return templates.TemplateResponse("add.html", make_context_index(request)) @app.get("/feed", response_class=HTMLResponse) -async def get_feed(feed_url, request: Request): +async def get_feed(feed_url: str, request: Request): """ Get a feed by URL. Args: - request: The request. feed_url: The feed to add. """ - # Make feed_url a valid URL. - url: str = urllib.parse.unquote(feed_url) + clean_feed_url: str = urllib.parse.unquote(feed_url.strip()) - feed: Feed = reader.get_feed(url) + feed: Feed = reader.get_feed(clean_feed_url) # Get entries from the feed. - entries: Iterable[Entry] = reader.get_entries(feed=url) - - # Get the entries in the feed. - feed_counts: FeedCounts = reader.get_feed_counts(feed=url) + entries: Iterable[Entry] = reader.get_entries(feed=clean_feed_url) # Create the html for the entries. html: str = create_html_for_feed(entries) @@ -427,7 +379,7 @@ async def get_feed(feed_url, request: Request): "request": request, "feed": feed, "entries": entries, - "feed_counts": feed_counts, + "feed_counts": reader.get_feed_counts(feed=clean_feed_url), "html": html, "should_send_embed": should_send_embed, } @@ -443,28 +395,22 @@ def create_html_for_feed(entries: Iterable[Entry]) -> str: """ html: str = "" for entry in entries: - - # Get first image. first_image = "" first_image_text = "" if images := get_images_from_entry(entry=entry): first_image: str = images[0][0] first_image_text: str = images[0][1] - # Get the text from the entry. - text = replace_tags_in_text_message(entry.feed, entry) - if not text: - text = "