From 3f403c8edd6193923d5c64537d50f4c11ab3ae4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Hells=C3=A9n?= Date: Thu, 26 Jan 2023 17:06:55 +0100 Subject: [PATCH] Move things away from main.py --- discord_rss_bot/main.py | 152 ++++++------------------------------- discord_rss_bot/webhook.py | 59 ++++++++++++++ 2 files changed, 81 insertions(+), 130 deletions(-) create mode 100644 discord_rss_bot/webhook.py diff --git a/discord_rss_bot/main.py b/discord_rss_bot/main.py index 6e94a2e..f1172cf 100644 --- a/discord_rss_bot/main.py +++ b/discord_rss_bot/main.py @@ -42,7 +42,8 @@ from discord_rss_bot.filter.whitelist import get_whitelist_content, get_whitelis from discord_rss_bot.markdown import convert_html_to_md from discord_rss_bot.missing_tags import add_missing_tags from discord_rss_bot.search import create_html_for_search_results -from discord_rss_bot.settings import default_custom_message, get_reader, list_webhooks +from discord_rss_bot.settings import default_custom_message, get_reader +from discord_rss_bot.webhook import add_webhook, remove_webhook app: FastAPI = FastAPI() app.mount("/static", StaticFiles(directory="discord_rss_bot/static"), name="static") @@ -58,77 +59,32 @@ templates.env.filters["discord_markdown"] = convert_html_to_md @app.post("/add_webhook") -async def add_webhook(webhook_name=Form(), webhook_url=Form()): +async def post_add_webhook(webhook_name=Form(), webhook_url=Form()): """ Add a feed to the database. Args: webhook_name: The name of the webhook. webhook_url: The url of the webhook. - - Returns: - dict: The feed that was added. """ - # Remove leading and trailing whitespace. - clean_webhook_name: str = webhook_name.strip() - clean_webhook_url: str = webhook_url.strip() - - # Get current webhooks from the database if they exist otherwise use an empty list. - webhooks: list[dict[str, str]] = list_webhooks(reader) - - # Only add the webhook if it doesn't already exist. - if all(webhook["name"] != clean_webhook_name for webhook in webhooks): - # Create a dict with webhook name and URL. - new_webhook: dict[str, str] = {"name": clean_webhook_name, "url": clean_webhook_url} - - # Add the new webhook to the list of webhooks. - webhooks.append(new_webhook) - - # Add our new list of webhooks to the database. - reader.set_tag((), "webhooks", webhooks) # type: ignore - - add_missing_tags(reader) - + if add_webhook(reader, webhook_name, webhook_url): return RedirectResponse(url="/", status_code=303) - # TODO: Show this error on the page. - raise HTTPException(status_code=409, detail="Webhook already exists") - @app.post("/delete_webhook") -async def delete_webhook(webhook_url=Form()): +async def post_delete_webhook(webhook_url=Form()): """ Delete a webhook from the database. Args: webhook_url: The url of the webhook. - - Returns: - dict: The feed that was added. """ - # Get current webhooks from the database if they exist otherwise use an empty list. - webhooks: list[dict[str, str]] = list_webhooks(reader) - - # Only add the webhook if it doesn't already exist. - for webhook in webhooks: - if webhook["url"] in [webhook_url, webhook_url]: - # Add the new webhook to the list of webhooks. - webhooks.remove(webhook) - - # Check if it has been removed. - if webhook in webhooks: - raise HTTPException(status_code=500, detail="Webhook could not be deleted") - - # Add our new list of webhooks to the database. - reader.set_tag((), "webhooks", webhooks) # type: ignore - return RedirectResponse(url="/", status_code=303) - - # TODO: Show this error on the page. - raise HTTPException(status_code=404, detail="Webhook not found") + if remove_webhook(reader, webhook_url): + return RedirectResponse(url="/", status_code=303) @app.post("/add") -async def create_feed(feed_url=Form(), webhook_dropdown=Form()): +async def post_create_feed(feed_url=Form(), webhook_dropdown=Form()): """ Add a feed to the database. @@ -150,10 +106,8 @@ async def create_feed(feed_url=Form(), webhook_dropdown=Form()): entries: Iterable[Entry] = reader.get_entries(feed=clean_feed_url, read=False) for entry in entries: reader.set_entry_read(entry, True) - try: - hooks = reader.get_tag((), "webhooks") - except TagNotFoundError: - hooks = [] + + hooks = reader.get_tag((), "webhooks", []) webhook_url: str = "" if hooks: @@ -182,47 +136,35 @@ async def create_feed(feed_url=Form(), webhook_dropdown=Form()): @app.post("/pause") -async def pause_feed(feed_url=Form()): +async def post_pause_feed(feed_url=Form()): """Pause a feed. Args: - feed_url: The feed to pause. Defaults to Form(). - - Returns: - Redirect the URL to the feed we paused. + feed_url: The feed to pause. """ - - # Disable/pause the feed. reader.disable_feed_updates(feed_url) - # Clean URL is used to redirect to the feed page. clean_url: str = urllib.parse.quote(feed_url) return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303) @app.post("/unpause") -async def unpause_feed(feed_url=Form()): +async def post_unpause_feed(feed_url=Form()): """Unpause a feed. Args: - feed_url: The feed to unpause. Defaults to Form(). - - Returns: - Redirect to the feed we unpaused. + feed_url: The Feed to unpause. """ - - # Enable/unpause the feed. reader.enable_feed_updates(feed_url) - # Clean URL is used to redirect to the feed page. clean_url: str = urllib.parse.quote(feed_url) return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303) @app.post("/whitelist") -async def set_whitelist( +async def post_set_whitelist( whitelist_title=Form(None), whitelist_summary=Form(None), whitelist_content=Form(None), @@ -235,9 +177,6 @@ async def set_whitelist( whitelist_summary: Whitelisted words for when checking the title. whitelist_content: Whitelisted words for when checking the title. feed_url: The feed we should set the whitelist for. - - Returns: - Redirect back to the feed page. """ if whitelist_title: reader.set_tag(feed_url, "whitelist_title", whitelist_title) @@ -246,7 +185,6 @@ async def set_whitelist( if whitelist_content: reader.set_tag(feed_url, "whitelist_content", whitelist_content) - # Clean URL is used to redirect to the feed page. clean_url: str = urllib.parse.quote(feed_url) return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303) @@ -259,9 +197,6 @@ async def get_whitelist(feed_url, request: Request): Args: feed_url: What feed we should get the whitelist for. request: The HTTP request. - - Returns: - _description_ """ # Make feed_url a valid URL. url: str = urllib.parse.unquote(feed_url) @@ -284,7 +219,7 @@ async def get_whitelist(feed_url, request: Request): @app.post("/blacklist") -async def set_blacklist( +async def post_set_blacklist( blacklist_title=Form(None), blacklist_summary=Form(None), blacklist_content=Form(None), @@ -298,9 +233,6 @@ async def set_blacklist( blacklist_summary: Blacklisted words for when checking the summary. blacklist_content: Blacklisted words for when checking the content. feed_url: What feed we should set the blacklist for. - - Returns: - Redirect to the feed. """ # Add the blacklist to the feed. @@ -311,7 +243,6 @@ async def set_blacklist( if blacklist_content: reader.set_tag(feed_url, "blacklist_content", blacklist_content) - # Clean URL is used to redirect to the feed page. clean_url = urllib.parse.quote(feed_url) return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303) @@ -340,23 +271,19 @@ async def get_blacklist(feed_url, request: Request): @app.post("/custom") -async def set_custom(custom_message=Form(""), feed_url=Form()): +async def post_set_custom(custom_message=Form(""), feed_url=Form()): """ Set the custom message, this is used when sending the message. Args: custom_message: The custom message. feed_url: The feed we should set the custom message for. - - Returns: - Redirect to the feed. """ if custom_message := custom_message.strip(): reader.set_tag(feed_url, "custom_message", custom_message) # type: ignore else: reader.set_tag(feed_url, "custom_message", settings.default_custom_message) # type: ignore - # Clean URL is used to redirect to the feed page. clean_url: str = urllib.parse.quote(feed_url) return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303) @@ -441,7 +368,7 @@ async def get_embed_page(feed_url, request: Request): @app.post("/embed", response_class=HTMLResponse) -async def set_embed_page( +async def post_embed( feed_url=Form(), title=Form(""), description=Form(""), @@ -486,16 +413,14 @@ async def set_embed_page( # Save the data. save_embed(reader, feed_url, custom_embed) - # Clean URL is used to redirect to the feed page. clean_url: str = urllib.parse.quote(feed_url) return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303) @app.post("/use_embed") -async def set_should_use_embed(feed_url=Form()): +async def post_use_embed(feed_url=Form()): url: str = urllib.parse.unquote(feed_url) - print(f"Setting should_send_embed to True for {url}") feed: Feed = reader.get_feed(url) reader.set_tag(feed, "should_send_embed", True) # type: ignore @@ -503,9 +428,8 @@ async def set_should_use_embed(feed_url=Form()): @app.post("/use_text") -async def set_should_use_text(feed_url=Form()): +async def post_use_text(feed_url=Form()): url: str = urllib.parse.unquote(feed_url) - print(f"Setting should_send_embed to False for {url}") feed: Feed = reader.get_feed(url) reader.set_tag(feed, "should_send_embed", False) # type: ignore @@ -519,9 +443,6 @@ def get_add(request: Request): Args: request: The request. - - Returns: - HTMLResponse: The HTML response. """ context = make_context_index(request) return templates.TemplateResponse("add.html", context) @@ -535,9 +456,6 @@ async def get_feed(feed_url, request: Request): Args: request: The request. feed_url: The feed to add. - - Returns: - HTMLResponse: The HTML response. """ # Make feed_url a valid URL. url: str = urllib.parse.unquote(feed_url) @@ -576,9 +494,6 @@ def create_html_for_feed(entries: Iterable[Entry]) -> str: Args: search_results: The search results. custom_reader: The reader. If None, we will get the reader from the settings. - - Returns: - str: The HTML. """ html: str = "" for entry in entries: @@ -624,15 +539,12 @@ def create_html_for_feed(entries: Iterable[Entry]) -> str: @app.get("/add_webhook", response_class=HTMLResponse) -async def add_webhook_page(request: Request): +async def get_add_webhook(request: Request): """ Page for adding a new webhook. Args: request: The request. - - Returns: - HTMLResponse: The HTML response. """ return templates.TemplateResponse("add_webhook.html", {"request": request}) @@ -675,9 +587,6 @@ async def get_webhooks(request: Request): Args: request: The request. - - Returns: - HTMLResponse: The HTML response. """ hooks: Dict[str, str] = reader.get_tag((), "webhooks", "") # type: ignore hooks_with_data = [] @@ -703,9 +612,6 @@ def index(request: Request): Args: request: The request. - - Returns: - HTMLResponse: The HTML response. """ context = make_context_index(request) return templates.TemplateResponse("index.html", context) @@ -718,10 +624,6 @@ def make_context_index(request: Request): Used by / and /add. Args: request: The request. - - Returns: - dict: The context. - """ # Get webhooks name and url from the database. try: @@ -769,9 +671,6 @@ async def remove_feed(feed_url=Form()): Args: feed_url: The feed to add. - - Returns: - HTMLResponse: The HTML response. """ # Unquote the url unquoted_feed_url: str = urllib.parse.unquote(feed_url) @@ -793,9 +692,6 @@ async def search(request: Request, query: str): Args: request: The request. query: The query to search for. - - Returns: - HTMLResponse: The HTML response. """ reader.update_search() search_results: Iterable[EntrySearchResult] = reader.search_entries(query) @@ -815,11 +711,7 @@ async def search(request: Request, query: str): @app.get("/post_entry", response_class=HTMLResponse) async def post_entry(entry_id: str): """ - Send a feed to Discord. - - Returns: - HTMLResponse: The HTML response. - """ + Send a feed to Discord.""" # Unquote the entry id. unquoted_entry_id: str = urllib.parse.unquote(entry_id) diff --git a/discord_rss_bot/webhook.py b/discord_rss_bot/webhook.py new file mode 100644 index 0000000..971b392 --- /dev/null +++ b/discord_rss_bot/webhook.py @@ -0,0 +1,59 @@ +from fastapi import HTTPException +from reader import Reader + +from discord_rss_bot.missing_tags import add_missing_tags +from discord_rss_bot.settings import list_webhooks + + +def add_webhook(reader: Reader, webhook_name: str, webhook_url: str): + """Add new webhook. + + Args: + reader: The Reader to use + webhook_name: The name of the webhook, this will be shown on the webpage + webhook_url: The webhook URL to send entries to + + Raises: + HTTPException: This is raised when the webhook already exists + + Returns: + Returns True if everyting was succesful + """ + # Get current webhooks from the database if they exist otherwise use an empty list. + webhooks: list[dict[str, str]] = list_webhooks(reader) + + # Only add the webhook if it doesn't already exist. + if all(webhook["name"] != webhook_name.strip() for webhook in webhooks): + # Add the new webhook to the list of webhooks. + webhooks.append({"name": webhook_name.strip(), "url": webhook_url.strip()}) + + # Add our new list of webhooks to the database. + reader.set_tag((), "webhooks", webhooks) # type: ignore + + add_missing_tags(reader) + return True + + # TODO: Show this error on the page. + raise HTTPException(status_code=409, detail="Webhook already exists") + + +def remove_webhook(reader: Reader, webhook_url: str): + # Get current webhooks from the database if they exist otherwise use an empty list. + webhooks: list[dict[str, str]] = list_webhooks(reader) + + # Only add the webhook if it doesn't already exist. + for webhook in webhooks: + if webhook["url"] in [webhook_url, webhook_url]: + # Add the new webhook to the list of webhooks. + webhooks.remove(webhook) + + # Check if it has been removed. + if webhook in webhooks: + raise HTTPException(status_code=500, detail="Webhook could not be deleted") + + # Add our new list of webhooks to the database. + reader.set_tag((), "webhooks", webhooks) # type: ignore + return True + + # TODO: Show this error on the page. + raise HTTPException(status_code=404, detail="Webhook not found")