From 948a5a2af91afb8f529bed6fefa69e89a301ed35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Hells=C3=A9n?= Date: Sat, 18 Mar 2023 01:50:45 +0100 Subject: [PATCH] Use Ruff and fix all its warnings and errors --- .github/workflows/build.yml | 8 +- discord_rss_bot/custom_filters.py | 8 +- discord_rss_bot/custom_message.py | 45 +---- discord_rss_bot/feeds.py | 42 +++-- discord_rss_bot/filter/__init__.py | 0 discord_rss_bot/filter/blacklist.py | 26 +-- discord_rss_bot/filter/utils.py | 3 +- discord_rss_bot/filter/whitelist.py | 24 +-- discord_rss_bot/healthcheck.py | 5 +- discord_rss_bot/main.py | 199 ++++++++++++++------- discord_rss_bot/search.py | 8 +- discord_rss_bot/settings.py | 16 +- discord_rss_bot/templates/add_webhook.html | 68 +++---- discord_rss_bot/templates/custom.html | 164 +++++++++-------- discord_rss_bot/templates/webhooks.html | 19 +- discord_rss_bot/webhook.py | 23 ++- poetry.lock | 8 +- pyproject.toml | 82 ++++++++- tests/__init__.py | 0 tests/test_blacklist.py | 5 +- tests/test_custom_filter.py | 16 +- tests/test_feeds.py | 9 +- tests/test_main.py | 10 +- tests/test_markdown.py | 4 +- tests/test_search.py | 10 +- tests/test_settings.py | 10 +- tests/test_whitelist.py | 5 +- 27 files changed, 504 insertions(+), 313 deletions(-) create mode 100644 discord_rss_bot/filter/__init__.py create mode 100644 tests/__init__.py diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index ed381e6..d0f44c2 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -2,9 +2,9 @@ name: Test code on: push: - branches: [ master ] + branches: [master] pull_request: - branches: [ master ] + branches: [master] workflow_dispatch: jobs: @@ -16,7 +16,7 @@ jobs: - uses: actions/setup-python@v4 with: python-version: "3.11" - cache: 'poetry' + cache: "poetry" - run: poetry install - run: poetry run pytest env: @@ -41,4 +41,4 @@ jobs: context: . push: ${{ github.event_name != 'pull_request' }} tags: ${{ steps.meta.outputs.tags }} - labels: ${{ steps.meta.outputs.labels }} \ No newline at end of file + labels: ${{ steps.meta.outputs.labels }} diff --git a/discord_rss_bot/custom_filters.py b/discord_rss_bot/custom_filters.py index 38dc078..7b1f19c 100644 --- a/discord_rss_bot/custom_filters.py +++ b/discord_rss_bot/custom_filters.py @@ -11,7 +11,7 @@ from discord_rss_bot.settings import get_reader reader: Reader = get_reader() -@lru_cache() +@lru_cache def encode_url(url_to_quote: str) -> str: """%-escape the URL so it can be used in a URL. @@ -28,8 +28,7 @@ def encode_url(url_to_quote: str) -> str: def entry_is_whitelisted(entry_to_check: Entry) -> bool: - """ - Check if the entry is whitelisted. + """Check if the entry is whitelisted. Args: entry_to_check: The feed to check. @@ -42,8 +41,7 @@ def entry_is_whitelisted(entry_to_check: Entry) -> bool: def entry_is_blacklisted(entry_to_check: Entry) -> bool: - """ - Check if the entry is blacklisted. + """Check if the entry is blacklisted. Args: entry_to_check: The feed to check. diff --git a/discord_rss_bot/custom_message.py b/discord_rss_bot/custom_message.py index da80949..010425d 100644 --- a/discord_rss_bot/custom_message.py +++ b/discord_rss_bot/custom_message.py @@ -22,31 +22,6 @@ class CustomEmbed: footer_icon_url: str -def return_image(found_images) -> list[tuple[str, str]] | None: - soup: BeautifulSoup = BeautifulSoup(found_images, features="lxml") - images = soup.find_all("img") - for image in images: - image_src: str = str(image["src"]) or "" - image_alt: str = "Link to image" - if image.get("alt"): - image_alt = image.get("alt") - return [(image_src, image_alt)] - - -def get_first_image_html(html: str): - """Get images from a entry. - - Args: - html: The HTML to get the images from. - - Returns: - Returns a list of images. - """ - if images := BeautifulSoup(html, features="lxml").find_all("img"): - return images[0].attrs["src"] - return None - - def try_to_replace(custom_message: str, template: str, replace_with: str) -> str: """Try to replace a tag in custom_message. @@ -84,7 +59,7 @@ def replace_tags_in_text_message(entry: Entry) -> str: summary: str = entry.summary or "" - first_image = get_image(summary, content) + first_image = get_first_image(summary, content) summary = convert_html_to_md(summary) content = convert_html_to_md(content) @@ -127,8 +102,8 @@ def replace_tags_in_text_message(entry: Entry) -> str: return custom_message.replace("\\n", "\n") -def get_image(summary, content): - """Get image from summary or content +def get_first_image(summary, content): + """Get image from summary or content. Args: summary: The summary from the entry @@ -137,12 +112,10 @@ def get_image(summary, content): Returns: The first image """ - if content: - if images := BeautifulSoup(content, features="lxml").find_all("img"): - return images[0].attrs["src"] - if summary: - if images := BeautifulSoup(summary, features="lxml").find_all("img"): - return images[0].attrs["src"] + if content and (images := BeautifulSoup(content, features="lxml").find_all("img")): + return images[0].attrs["src"] + if summary and (images := BeautifulSoup(summary, features="lxml").find_all("img")): + return images[0].attrs["src"] return "" @@ -156,7 +129,6 @@ def replace_tags_in_embed(feed: Feed, entry: Entry) -> CustomEmbed: Returns: Returns the embed with the tags replaced. """ - custom_reader: Reader = get_reader() embed: CustomEmbed = get_embed(feed=feed, custom_reader=custom_reader) @@ -167,7 +139,7 @@ def replace_tags_in_embed(feed: Feed, entry: Entry) -> CustomEmbed: summary: str = entry.summary or "" - first_image = get_image(summary, content) + first_image = get_first_image(summary, content) summary = convert_html_to_md(summary) content = convert_html_to_md(content) @@ -274,7 +246,6 @@ def get_embed(custom_reader: Reader, feed: Feed) -> CustomEmbed: Returns: Returns the contents from the embed tag. """ - if embed := custom_reader.get_tag(feed, "embed", ""): if type(embed) != str: return get_embed_data(embed) diff --git a/discord_rss_bot/feeds.py b/discord_rss_bot/feeds.py index 75406f5..bae5edc 100644 --- a/discord_rss_bot/feeds.py +++ b/discord_rss_bot/feeds.py @@ -1,22 +1,29 @@ -from typing import Iterable +from typing import TYPE_CHECKING from discord_webhook import DiscordEmbed, DiscordWebhook from fastapi import HTTPException from reader import Entry, Feed, FeedExistsError, Reader, TagNotFoundError -from requests import Response from discord_rss_bot import custom_message from discord_rss_bot.filter.blacklist import should_be_skipped from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent from discord_rss_bot.settings import default_custom_message, get_reader +if TYPE_CHECKING: + from collections.abc import Iterable -def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None): - """ - Send a single entry to Discord. + from requests import Response + + +def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> str | None: + """Send a single entry to Discord. Args: entry: The entry to send to Discord. + custom_reader: The reader to use. If None, the default reader will be used. + + Returns: + str | None: The error message if there was an error, otherwise None. """ # Get the default reader if we didn't get a custom one. reader: Reader = get_reader() if custom_reader is None else custom_reader @@ -27,7 +34,7 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None): return "No webhook URL found." # Try to get the custom message for the feed. If the user has none, we will use the default message. - if custom_message.get_custom_message(reader, entry.feed) != "": + if not custom_message.get_custom_message(reader, entry.feed): webhook_message = custom_message.replace_tags_in_text_message(entry=entry) else: webhook_message: str = default_custom_message @@ -39,11 +46,19 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None): webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True) response: Response = webhook.execute() - if not response.ok: - return f"Error sending entry to Discord: {response.text}" + return None if response.ok else f"Error sending entry to Discord: {response.text}" def create_embed_webhook(webhook_url: str, entry: Entry) -> DiscordWebhook: + """Create a webhook with an embed. + + Args: + webhook_url (str): The webhook URL. + entry (Entry): The entry to send to Discord. + + Returns: + DiscordWebhook: The webhook with the embed. + """ webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, rate_limit_retry=True) feed: Feed = entry.feed @@ -66,7 +81,11 @@ def create_embed_webhook(webhook_url: str, entry: Entry) -> DiscordWebhook: if custom_embed.author_name and not custom_embed.author_url and custom_embed.author_icon_url: discord_embed.set_author(name=custom_embed.author_name, icon_url=custom_embed.author_icon_url) if custom_embed.author_name and custom_embed.author_url and custom_embed.author_icon_url: - discord_embed.set_author(name=custom_embed.author_name, url=custom_embed.author_url, icon_url=custom_embed.author_icon_url) # noqa: E501 + discord_embed.set_author( + name=custom_embed.author_name, + url=custom_embed.author_url, + icon_url=custom_embed.author_icon_url, + ) if custom_embed.thumbnail_url: discord_embed.set_thumbnail(url=custom_embed.thumbnail_url) if custom_embed.image_url: @@ -85,8 +104,7 @@ def create_embed_webhook(webhook_url: str, entry: Entry) -> DiscordWebhook: def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = None, do_once: bool = False) -> None: - """ - Send entries to Discord. + """Send entries to Discord. If response was not ok, we will log the error and mark the entry as unread, so it will be sent again next time. @@ -120,7 +138,7 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non else: # If the user has set the custom message to an empty string, we will use the default message, otherwise we # will use the custom message. - if custom_message.get_custom_message(reader, entry.feed) != "": + if not custom_message.get_custom_message(reader, entry.feed): webhook_message = custom_message.replace_tags_in_text_message(entry) else: webhook_message: str = default_custom_message diff --git a/discord_rss_bot/filter/__init__.py b/discord_rss_bot/filter/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/discord_rss_bot/filter/blacklist.py b/discord_rss_bot/filter/blacklist.py index 45092ba..f6ef586 100644 --- a/discord_rss_bot/filter/blacklist.py +++ b/discord_rss_bot/filter/blacklist.py @@ -4,11 +4,12 @@ from discord_rss_bot.filter.utils import is_word_in_text def has_black_tags(custom_reader: Reader, feed: Feed) -> bool: - """ - Return True if the feed has any of the following tags: + """Return True if the feed has blacklist tags. + + The following tags are checked: - blacklist_title - blacklist_summary - - blacklist_content + - blacklist_content. Args: custom_reader: The reader. @@ -25,8 +26,7 @@ def has_black_tags(custom_reader: Reader, feed: Feed) -> bool: def should_be_skipped(custom_reader: Reader, entry: Entry) -> bool: - """ - Return True if the entry is in the blacklist. + """Return True if the entry is in the blacklist. Args: custom_reader: The reader. @@ -40,15 +40,17 @@ def should_be_skipped(custom_reader: Reader, entry: Entry) -> bool: blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", "")) blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", "")) blacklist_author: str = str(custom_reader.get_tag(feed, "blacklist_author", "")) - # TODO: Also add support for entry_text + # TODO: Also add support for entry_text and more. if entry.title and blacklist_title and is_word_in_text(blacklist_title, entry.title): return True - elif entry.summary and blacklist_summary and is_word_in_text(blacklist_summary, entry.summary): + if entry.summary and blacklist_summary and is_word_in_text(blacklist_summary, entry.summary): return True - elif entry.author and blacklist_author and is_word_in_text(blacklist_author, entry.author): + if entry.author and blacklist_author and is_word_in_text(blacklist_author, entry.author): return True - elif entry.content: - if entry.content[0].value and blacklist_content and is_word_in_text(blacklist_content, entry.content[0].value): - return True - return False + return bool( + entry.content + and entry.content[0].value + and blacklist_content + and is_word_in_text(blacklist_content, entry.content[0].value), + ) diff --git a/discord_rss_bot/filter/utils.py b/discord_rss_bot/filter/utils.py index 8fa621b..cc0d550 100644 --- a/discord_rss_bot/filter/utils.py +++ b/discord_rss_bot/filter/utils.py @@ -2,7 +2,8 @@ import re def is_word_in_text(words: str, text: str) -> bool: - """ + """Check if the word is in the text. + Args: words: The words to search for. text: The text to search in. diff --git a/discord_rss_bot/filter/whitelist.py b/discord_rss_bot/filter/whitelist.py index 03c8f45..eb884f6 100644 --- a/discord_rss_bot/filter/whitelist.py +++ b/discord_rss_bot/filter/whitelist.py @@ -4,11 +4,12 @@ from discord_rss_bot.filter.utils import is_word_in_text def has_white_tags(custom_reader: Reader, feed: Feed) -> bool: - """ - Return True if the feed has any of the following tags: + """Return True if the feed has whitelist tags. + + The following tags are checked: - whitelist_title - whitelist_summary - - whitelist_content + - whitelist_content. Args: custom_reader: The reader. @@ -25,8 +26,7 @@ def has_white_tags(custom_reader: Reader, feed: Feed) -> bool: def should_be_sent(custom_reader: Reader, entry: Entry) -> bool: - """ - Return True if the entry is in the whitelist. + """Return True if the entry is in the whitelist. Args: custom_reader: The reader. @@ -43,11 +43,13 @@ def should_be_sent(custom_reader: Reader, entry: Entry) -> bool: if entry.title and whitelist_title and is_word_in_text(whitelist_title, entry.title): return True - elif entry.summary and whitelist_summary and is_word_in_text(whitelist_summary, entry.summary): + if entry.summary and whitelist_summary and is_word_in_text(whitelist_summary, entry.summary): return True - elif entry.author and whitelist_author and is_word_in_text(whitelist_author, entry.author): + if entry.author and whitelist_author and is_word_in_text(whitelist_author, entry.author): return True - elif entry.content: - if entry.content[0].value and whitelist_content and is_word_in_text(whitelist_content, entry.content[0].value): - return True - return False + return bool( + entry.content + and entry.content[0].value + and whitelist_content + and is_word_in_text(whitelist_content, entry.content[0].value), + ) diff --git a/discord_rss_bot/healthcheck.py b/discord_rss_bot/healthcheck.py index 1ede4d1..b14a2a7 100644 --- a/discord_rss_bot/healthcheck.py +++ b/discord_rss_bot/healthcheck.py @@ -7,14 +7,15 @@ def healthcheck() -> None: """Check if the website is up. sys.exit(0): success - the container is healthy and ready for use. - sys.exit(1): unhealthy - the container is not working correctly.""" + sys.exit(1): unhealthy - the container is not working correctly. + """ # TODO: We should check more than just that the website is up. try: r: requests.Response = requests.get(url="http://localhost:5000", timeout=5) if r.ok: sys.exit(0) except requests.exceptions.RequestException as e: - print(f"Healthcheck failed: {e}", file=sys.stderr) + print(f"Healthcheck failed: {e}", file=sys.stderr) # noqa: T201 sys.exit(1) diff --git a/discord_rss_bot/main.py b/discord_rss_bot/main.py index dcfc6cb..aac510e 100644 --- a/discord_rss_bot/main.py +++ b/discord_rss_bot/main.py @@ -1,9 +1,9 @@ import json import urllib.parse +from collections.abc import Iterable from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timezone from functools import lru_cache -from typing import Iterable import httpx import uvicorn @@ -22,7 +22,7 @@ from discord_rss_bot.custom_message import ( CustomEmbed, get_custom_message, get_embed, - get_image, + get_first_image, replace_tags_in_text_message, save_embed, ) @@ -47,45 +47,44 @@ templates.env.filters["discord_markdown"] = convert_html_to_md @app.post("/add_webhook") -async def post_add_webhook(webhook_name: str = Form(), webhook_url: str = Form()): - """ - Add a feed to the database. +async def post_add_webhook(webhook_name: str = Form(), webhook_url: str = Form()) -> RedirectResponse: + """Add a feed to the database. Args: webhook_name: The name of the webhook. webhook_url: The url of the webhook. """ - if add_webhook(reader, webhook_name, webhook_url): - return RedirectResponse(url="/", status_code=303) + add_webhook(reader, webhook_name, webhook_url) + return RedirectResponse(url="/", status_code=303) @app.post("/delete_webhook") -async def post_delete_webhook(webhook_url: str = Form()): - """ - Delete a webhook from the database. +async def post_delete_webhook(webhook_url: str = Form()) -> RedirectResponse: + """Delete a webhook from the database. Args: webhook_url: The url of the webhook. """ - if remove_webhook(reader, webhook_url): - return RedirectResponse(url="/", status_code=303) + # TODO: Check if the webhook is in use by any feeds before deleting it. + remove_webhook(reader, webhook_url) + return RedirectResponse(url="/", status_code=303) @app.post("/add") -async def post_create_feed(feed_url: str = Form(), webhook_dropdown: str = Form()): - """ - Add a feed to the database. +async def post_create_feed(feed_url: str = Form(), webhook_dropdown: str = Form()) -> RedirectResponse: + """Add a feed to the database. Args: feed_url: The feed to add. webhook_dropdown: The webhook to use. """ + clean_feed_url: str = feed_url.strip() create_feed(reader, feed_url, webhook_dropdown) - return RedirectResponse(url=f"/feed/?feed_url={feed_url}", status_code=303) + return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.post("/pause") -async def post_pause_feed(feed_url: str = Form()): +async def post_pause_feed(feed_url: str = Form()) -> RedirectResponse: """Pause a feed. Args: @@ -97,7 +96,7 @@ async def post_pause_feed(feed_url: str = Form()): @app.post("/unpause") -async def post_unpause_feed(feed_url: str = Form()): +async def post_unpause_feed(feed_url: str = Form()) -> RedirectResponse: """Unpause a feed. Args: @@ -115,7 +114,7 @@ async def post_set_whitelist( whitelist_content: str = Form(None), whitelist_author: str = Form(None), feed_url: str = Form(), -): +) -> RedirectResponse: """Set what the whitelist should be sent, if you have this set only words in the whitelist will be sent. Args: @@ -127,7 +126,7 @@ async def post_set_whitelist( """ clean_feed_url: str = feed_url.strip() if whitelist_title: - reader.set_tag(clean_feed_url, "whitelist_title", whitelist_title) # type: ignore + reader.set_tag(clean_feed_url, "whitelist_title", [whitelist_title]) if whitelist_summary: reader.set_tag(clean_feed_url, "whitelist_summary", whitelist_summary) # type: ignore if whitelist_content: @@ -172,8 +171,10 @@ async def post_set_blacklist( blacklist_content: str = Form(None), blacklist_author: str = Form(None), feed_url: str = Form(), -): - """Set the blacklist, if this is set we will check if words are in the title, summary or content +) -> RedirectResponse: + """Set the blacklist. + + If this is set we will check if words are in the title, summary or content and then don't send that entry. Args: @@ -193,7 +194,7 @@ async def post_set_blacklist( if blacklist_author: reader.set_tag(clean_feed_url, "blacklist_author", blacklist_author) # type: ignore - return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(feed_url)}", status_code=303) + return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.get("/blacklist", response_class=HTMLResponse) @@ -218,9 +219,8 @@ async def get_blacklist(feed_url: str, request: Request): @app.post("/custom") -async def post_set_custom(custom_message: str = Form(""), feed_url: str = Form()): - """ - Set the custom message, this is used when sending the message. +async def post_set_custom(custom_message: str = Form(""), feed_url: str = Form()) -> RedirectResponse: + """Set the custom message, this is used when sending the message. Args: custom_message: The custom message. @@ -231,7 +231,8 @@ async def post_set_custom(custom_message: str = Form(""), feed_url: str = Form() else: reader.set_tag(feed_url, "custom_message", settings.default_custom_message) # type: ignore - return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(feed_url)}", status_code=303) + clean_feed_url: str = feed_url.strip() + return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.get("/custom", response_class=HTMLResponse) @@ -304,11 +305,25 @@ async def post_embed( author_icon_url: str = Form(""), footer_text: str = Form(""), footer_icon_url: str = Form(""), -): +) -> RedirectResponse: """Set the embed settings. Args: feed_url: What feed we should get the custom message for. + title: The title of the embed. + description: The description of the embed. + color: The color of the embed. + image_url: The image url of the embed. + thumbnail_url: The thumbnail url of the embed. + author_name: The author name of the embed. + author_url: The author url of the embed. + author_icon_url: The author icon url of the embed. + footer_text: The footer text of the embed. + footer_icon_url: The footer icon url of the embed. + + + Returns: + RedirectResponse: Redirect to the embed page. """ clean_feed_url: str = feed_url.strip() feed: Feed = reader.get_feed(urllib.parse.unquote(clean_feed_url)) @@ -328,31 +343,37 @@ async def post_embed( # Save the data. save_embed(reader, feed, custom_embed) - return RedirectResponse(url=f"/feed/?feed_url={clean_feed_url}", status_code=303) + return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.post("/use_embed") -async def post_use_embed(feed_url: str = Form()): +async def post_use_embed(feed_url: str = Form()) -> RedirectResponse: """Use embed instead of text. Args: feed_url: The feed to change. + + Returns: + RedirectResponse: Redirect to the feed page. """ clean_feed_url: str = feed_url.strip() reader.set_tag(clean_feed_url, "should_send_embed", True) # type: ignore - return RedirectResponse(url=f"/feed/?feed_url={clean_feed_url}", status_code=303) + return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.post("/use_text") -async def post_use_text(feed_url: str = Form()): +async def post_use_text(feed_url: str = Form()) -> RedirectResponse: """Use text instead of embed. Args: feed_url: The feed to change. + + Returns: + RedirectResponse: Redirect to the feed page. """ clean_feed_url: str = feed_url.strip() reader.set_tag(clean_feed_url, "should_send_embed", False) # type: ignore - return RedirectResponse(url=f"/feed/?feed_url={clean_feed_url}", status_code=303) + return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.get("/add", response_class=HTMLResponse) @@ -367,11 +388,14 @@ def get_add(request: Request): @app.get("/feed", response_class=HTMLResponse) async def get_feed(feed_url: str, request: Request): - """ - Get a feed by URL. + """Get a feed by URL. Args: feed_url: The feed to add. + request: The request object. + + Returns: + HTMLResponse: The feed page. """ clean_feed_url: str = urllib.parse.unquote(feed_url.strip()) @@ -404,19 +428,18 @@ def create_html_for_feed(entries: Iterable[Entry]) -> str: """Create HTML for the search results. Args: - search_results: The search results. - custom_reader: The reader. If None, we will get the reader from the settings. + entries: The entries to create HTML for. """ html: str = "" for entry in entries: - first_image = "" + first_image: str = "" summary: str | None = entry.summary content = "" if entry.content: for content_item in entry.content: content: str = content_item.value - first_image = get_image(summary, content) + first_image = get_first_image(summary, content) text: str = replace_tags_in_text_message(entry) or "
No content available.
" published = "" @@ -432,24 +455,30 @@ def create_html_for_feed(entries: Iterable[Entry]) -> str: whitelisted = "Whitelisted" entry_id: str = urllib.parse.quote(entry.id) - to_disord_html: str = f"Send to Discord" + to_discord_html: str = f"Send to Discord" image_html: str = f"" if first_image else "" html += f"""
{blacklisted}{whitelisted}

{entry.title}

-{f"By { entry.author } @" if entry.author else ""}{published} - {to_disord_html} +{f"By { entry.author } @" if entry.author else ""}{published} - {to_discord_html} {text} {image_html}
""" - return html.strip() @app.get("/add_webhook", response_class=HTMLResponse) async def get_add_webhook(request: Request): - """Page for adding a new webhook.""" + """Page for adding a new webhook. + + Args: + request: The request object. + + Returns: + HTMLResponse: The add webhook page. + """ return templates.TemplateResponse("add_webhook.html", {"request": request}) @@ -457,36 +486,54 @@ async def get_add_webhook(request: Request): class WebhookInfo: custom_name: str url: str - type: int | None = None - id: str | None = None + webhook_type: int | None = None + webhook_id: str | None = None name: str | None = None avatar: str | None = None channel_id: str | None = None guild_id: str | None = None token: str | None = None + avatar_mod: int | None = None -@lru_cache() -def get_data_from_hook_url(hook_name: str, hook_url: str): +@lru_cache +def get_data_from_hook_url(hook_name: str, hook_url: str) -> WebhookInfo: + """Get data from a webhook URL. + + Args: + hook_name (str): The webhook name. + hook_url (str): The webhook URL. + + Returns: + WebhookInfo: The webhook username, avatar, guild id, etc. + """ our_hook: WebhookInfo = WebhookInfo(custom_name=hook_name, url=hook_url) if hook_url: response: Response = httpx.get(hook_url) - if response.status_code == 200: + if response.is_success: webhook_json = json.loads(response.text) - our_hook.type = webhook_json["type"] or None - our_hook.id = webhook_json["id"] or None + our_hook.webhook_type = webhook_json["type"] or None + our_hook.webhook_id = webhook_json["id"] or None our_hook.name = webhook_json["name"] or None our_hook.avatar = webhook_json["avatar"] or None our_hook.channel_id = webhook_json["channel_id"] or None our_hook.guild_id = webhook_json["guild_id"] or None our_hook.token = webhook_json["token"] or None + our_hook.avatar_mod = int(webhook_json["channel_id"] or 0) % 5 return our_hook @app.get("/webhooks", response_class=HTMLResponse) async def get_webhooks(request: Request): - """Page for adding a new webhook.""" + """Page for adding a new webhook. + + Args: + request: The request object. + + Returns: + HTMLResponse: The add webhook page. + """ hooks_with_data = [] for hook in reader.get_tag((), "webhooks", ""): @@ -499,12 +546,26 @@ async def get_webhooks(request: Request): @app.get("/", response_class=HTMLResponse) def get_index(request: Request): - """This is the root of the website.""" + """This is the root of the website. + + Args: + request: The request object. + + Returns: + HTMLResponse: The index page. + """ return templates.TemplateResponse("index.html", make_context_index(request)) def make_context_index(request: Request): - """Create the needed context for the index page.""" + """Create the needed context for the index page. + + Args: + request: The request object. + + Returns: + dict: The context for the index page. + """ hooks: list[dict] = reader.get_tag((), "webhooks", []) # type: ignore feed_list = [] @@ -537,11 +598,13 @@ def make_context_index(request: Request): @app.post("/remove", response_class=HTMLResponse) async def remove_feed(feed_url: str = Form()): - """ - Get a feed by URL. + """Get a feed by URL. Args: feed_url: The feed to add. + + Returns: + RedirectResponse: Redirect to the index page. """ try: reader.delete_feed(urllib.parse.unquote(feed_url)) @@ -553,11 +616,14 @@ async def remove_feed(feed_url: str = Form()): @app.get("/search", response_class=HTMLResponse) async def search(request: Request, query: str): - """ - Get entries matching a full-text search query. + """Get entries matching a full-text search query. Args: query: The query to search for. + request: The request object. + + Returns: + HTMLResponse: The search page. """ reader.update_search() @@ -572,18 +638,25 @@ async def search(request: Request, query: str): @app.get("/post_entry", response_class=HTMLResponse) async def post_entry(entry_id: str): - """Send single entry to Discord.""" + """Send single entry to Discord. + + Args: + entry_id: The entry to send. + + Returns: + RedirectResponse: Redirect to the feed page. + """ unquoted_entry_id: str = urllib.parse.unquote(entry_id) entry: Entry | None = next((entry for entry in reader.get_entries() if entry.id == unquoted_entry_id), None) if entry is None: - return {"error": f"Failed to get entry '{entry_id}' when posting to Discord."} + return HTMLResponse(status_code=404, content=f"Entry '{entry_id}' not found.") if result := send_entry_to_discord(entry=entry): return result # Redirect to the feed page. - clean_url: str = entry.feed.url.strip() - return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303) + clean_feed_url: str = entry.feed.url.strip() + return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.on_event("startup") @@ -598,7 +671,7 @@ def startup() -> None: # Update all feeds every 15 minutes. # TODO: Make this configurable. - scheduler.add_job(send_to_discord, "interval", minutes=15, next_run_time=datetime.now()) + scheduler.add_job(send_to_discord, "interval", minutes=15, next_run_time=datetime.now(tz=timezone.utc)) scheduler.start() diff --git a/discord_rss_bot/search.py b/discord_rss_bot/search.py index 5140093..a48c99a 100644 --- a/discord_rss_bot/search.py +++ b/discord_rss_bot/search.py @@ -1,9 +1,13 @@ import urllib.parse +from typing import TYPE_CHECKING -from reader import Feed, HighlightedString, Reader +from reader import EntrySearchResult, Feed, HighlightedString, Reader from discord_rss_bot.settings import get_reader +if TYPE_CHECKING: + from collections.abc import Iterable + def create_html_for_search_results(query: str, custom_reader: Reader | None = None) -> str: """Create HTML for the search results. @@ -21,7 +25,7 @@ def create_html_for_search_results(query: str, custom_reader: Reader | None = No # Get the default reader if we didn't get a custom one. reader: Reader = get_reader() if custom_reader is None else custom_reader - search_results = reader.search_entries(query) + search_results: Iterable[EntrySearchResult] = reader.search_entries(query) html: str = "" for result in search_results: diff --git a/discord_rss_bot/settings.py b/discord_rss_bot/settings.py index 78d588e..fca259f 100644 --- a/discord_rss_bot/settings.py +++ b/discord_rss_bot/settings.py @@ -1,11 +1,11 @@ -import os from functools import lru_cache +from pathlib import Path from platformdirs import user_data_dir from reader import Reader, make_reader # type: ignore data_dir: str = user_data_dir(appname="discord_rss_bot", appauthor="TheLovinator", roaming=True) -os.makedirs(data_dir, exist_ok=True) +Path.mkdir(Path(data_dir), exist_ok=True) print(f"Data is stored in '{data_dir}'.") @@ -19,23 +19,21 @@ default_custom_embed: dict[str, str] = { } -@lru_cache() -def get_reader(custom_location: str = "") -> Reader: +@lru_cache +def get_reader(custom_location: Path | None = None) -> Reader: """Get the reader. Args: custom_location: The location of the database file. """ + db_location: Path = custom_location or Path(data_dir) / "db.sqlite" - db_location: str = custom_location or os.path.join(data_dir, "db.sqlite") - - return make_reader(url=db_location) + return make_reader(url=str(db_location)) def list_webhooks(reader: Reader) -> list[dict[str, str]]: - """ - Get current webhooks from the database if they exist otherwise use an empty list. + """Get current webhooks from the database if they exist otherwise use an empty list. Args: reader: The reader to use. diff --git a/discord_rss_bot/templates/add_webhook.html b/discord_rss_bot/templates/add_webhook.html index f4f2514..26dd800 100644 --- a/discord_rss_bot/templates/add_webhook.html +++ b/discord_rss_bot/templates/add_webhook.html @@ -1,36 +1,42 @@ {% extends "base.html" %} {% block title %} - | Add new webhook + | Add new webhook {% endblock title %} {% block content %} -
-
- {# Webhook name #} -
- -
- -
-
- {# Webhook URL #} -
- -
- -
-
- {# Submit button #} -
- -
-
-
+
+
+ {# Webhook name #} +
+ +
+ +
+
+ {# Webhook URL #} +
+ +
+ +
+
+ {# Submit button #} +
+ +
+
+
{% endblock content %} diff --git a/discord_rss_bot/templates/custom.html b/discord_rss_bot/templates/custom.html index 68cb73b..db56359 100644 --- a/discord_rss_bot/templates/custom.html +++ b/discord_rss_bot/templates/custom.html @@ -11,209 +11,215 @@
- +
diff --git a/discord_rss_bot/templates/webhooks.html b/discord_rss_bot/templates/webhooks.html index 9ddd82c..1c9fffd 100644 --- a/discord_rss_bot/templates/webhooks.html +++ b/discord_rss_bot/templates/webhooks.html @@ -10,8 +10,19 @@
{% for hook in hooks_with_data %}
- + {% if hook.avatar is not none %} + Webhook avatar + {% else %} + Default Discord avatar + {% endif %}

{{ hook.custom_name }}

  • Name: {{ hook.name }} @@ -23,13 +34,13 @@ Guild ID: {{ hook.guild_id }}
  • - Webhook ID: {{ hook.id }} + Webhook ID: {{ hook.webhook_id }}
  • Webhook token: {{ hook.token }}
  • - Webhook type: {{ hook.type }} + Webhook type: {{ hook.webhook_type }}
  • Webhook URL: {{ hook.url }} diff --git a/discord_rss_bot/webhook.py b/discord_rss_bot/webhook.py index c78be44..a826ca1 100644 --- a/discord_rss_bot/webhook.py +++ b/discord_rss_bot/webhook.py @@ -5,7 +5,7 @@ from discord_rss_bot.missing_tags import add_missing_tags from discord_rss_bot.settings import list_webhooks -def add_webhook(reader: Reader, webhook_name: str, webhook_url: str): +def add_webhook(reader: Reader, webhook_name: str, webhook_url: str) -> None: """Add new webhook. Args: @@ -15,9 +15,6 @@ def add_webhook(reader: Reader, webhook_name: str, webhook_url: str): Raises: HTTPException: This is raised when the webhook already exists - - Returns: - Returns True if everyting was succesful """ # Get current webhooks from the database if they exist otherwise use an empty list. webhooks: list[dict[str, str]] = list_webhooks(reader) @@ -31,13 +28,25 @@ def add_webhook(reader: Reader, webhook_name: str, webhook_url: str): reader.set_tag((), "webhooks", webhooks) # type: ignore add_missing_tags(reader) - return True + return # TODO: Show this error on the page. + # TODO: Replace HTTPException with a custom exception. raise HTTPException(status_code=409, detail="Webhook already exists") -def remove_webhook(reader: Reader, webhook_url: str): +def remove_webhook(reader: Reader, webhook_url: str) -> None: + """Remove webhook. + + Args: + reader (Reader): The Reader to use + webhook_url (str): The webhook URL to remove + + Raises: + HTTPException: If webhook could not be deleted + HTTPException: Webhook not found + """ + # TODO: Replace HTTPException with a custom exception for both of these. # Get current webhooks from the database if they exist otherwise use an empty list. webhooks: list[dict[str, str]] = list_webhooks(reader) @@ -52,7 +61,7 @@ def remove_webhook(reader: Reader, webhook_url: str): # Add our new list of webhooks to the database. reader.set_tag((), "webhooks", webhooks) # type: ignore - return True + return # TODO: Show this error on the page. raise HTTPException(status_code=404, detail="Webhook not found") diff --git a/poetry.lock b/poetry.lock index 794f66f..148e16b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.4.0 and should not be changed by hand. +# This file is automatically @generated by Poetry and should not be changed by hand. [[package]] name = "anyio" @@ -708,14 +708,14 @@ files = [ [[package]] name = "pathspec" -version = "0.11.0" +version = "0.11.1" description = "Utility library for gitignore style pattern matching of file paths." category = "dev" optional = false python-versions = ">=3.7" files = [ - {file = "pathspec-0.11.0-py3-none-any.whl", hash = "sha256:3a66eb970cbac598f9e5ccb5b2cf58930cd8e3ed86d393d541eaf2d8b1705229"}, - {file = "pathspec-0.11.0.tar.gz", hash = "sha256:64d338d4e0914e91c1792321e6907b5a593f1ab1851de7fc269557a21b30ebbc"}, + {file = "pathspec-0.11.1-py3-none-any.whl", hash = "sha256:d8af70af76652554bd134c22b3e8a1cc46ed7d91edcdd721ef1a0c51a84a5293"}, + {file = "pathspec-0.11.1.tar.gz", hash = "sha256:2798de800fa92780e33acca925945e9a19a133b715067cf165b8866c15a31687"}, ] [[package]] diff --git a/pyproject.toml b/pyproject.toml index 2856726..2e514e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,15 +28,91 @@ djlint = "^1.19.13" requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api" -[tool.isort] -profile = "black" - [tool.black] line-length = 120 target-version = ["py311"] +preview = true [tool.djlint] ignore = "D004,D018,J018,T001" profile = "jinja" max_line_length = 120 format_attribute_template_tags = true + +[tool.ruff] +line-length = 120 +select = [ + "E", + "F", + "B", + "W", + "C90", + "I", + "N", + "D", + "UP", + "YTT", + "ANN", + "S", + "BLE", + # "FBT", # Reader uses positional boolean values in its function calls + "A", + "COM", + "C4", + "DTZ", + "EM", + "EXE", + "ISC", + "ICN", + "G", + "INP", + "PIE", + "T20", + "PYI", + "PT", + "Q", + "RSE", + "RET", + "SLF", + "SIM", + "TID", + "TCH", + "ARG", + "PTH", + "ERA", + "PGH", + "PL", + "PLC", + "PLE", + "PLR", + "PLW", + "TRY", + "RUF", +] +ignore = [ + "D100", # pydocstyle - missing docstring in public module + "D101", # pydocstyle - missing docstring in public class + "D102", # pydocstyle - missing docstring in public method + "D103", # pydocstyle - missing docstring in public function + "D104", # pydocstyle - missing docstring in public package + "D105", # pydocstyle - missing docstring in magic method + "D106", # pydocstyle - missing docstring in public nested class + "D107", # pydocstyle - missing docstring in __init__ + "G002", # Allow % in logging + "UP031", # Allow % in logging + "B008", # Allow Form() as a default value +] + +[tool.ruff.pydocstyle] +convention = "google" + +[tool.ruff.per-file-ignores] +"tests/*" = ["S101"] + +[tool.pytest.ini_options] +addopts = "-vvvvvv --exitfirst" +filterwarnings = [ + "ignore:'cgi' is deprecated and slated for removal in Python 3.13:DeprecationWarning", + "ignore:pkg_resources is deprecated as an API:DeprecationWarning", + "ignore:No parser was explicitly specified:UserWarning", +] diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_blacklist.py b/tests/test_blacklist.py index e190cd2..9c3bd4f 100644 --- a/tests/test_blacklist.py +++ b/tests/test_blacklist.py @@ -1,11 +1,14 @@ import tempfile from pathlib import Path -from typing import Iterable +from typing import TYPE_CHECKING from reader import Entry, Feed, Reader, make_reader from discord_rss_bot.filter.blacklist import has_black_tags, should_be_skipped +if TYPE_CHECKING: + from collections.abc import Iterable + feed_url: str = "https://lovinator.space/rss_test.xml" diff --git a/tests/test_custom_filter.py b/tests/test_custom_filter.py index c56062a..2e31f16 100644 --- a/tests/test_custom_filter.py +++ b/tests/test_custom_filter.py @@ -1,12 +1,14 @@ -import os import pathlib import tempfile - -from reader import Reader +from pathlib import Path +from typing import TYPE_CHECKING from discord_rss_bot.custom_filters import encode_url, entry_is_blacklisted, entry_is_whitelisted from discord_rss_bot.settings import get_reader +if TYPE_CHECKING: + from reader import Reader + def test_encode_url() -> None: # Test normal input @@ -19,16 +21,16 @@ def test_encode_url() -> None: == r"https%3A//www.example.com/my%20path%3Fq%3Dabc%26b%3D1" ) # Test empty input - assert encode_url("") == "" + assert not encode_url("") # Test input as None - assert encode_url(None) == "" # type: ignore + assert not encode_url(None) # type: ignore def test_entry_is_whitelisted() -> None: # Test with a custom reader. with tempfile.TemporaryDirectory() as temp_dir: # Create the temp directory - os.makedirs(temp_dir, exist_ok=True) + Path.mkdir(Path(temp_dir), exist_ok=True) custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite") custom_reader: Reader = get_reader(custom_location=str(custom_loc)) @@ -69,7 +71,7 @@ def test_entry_is_blacklisted() -> None: # Test with a custom reader. with tempfile.TemporaryDirectory() as temp_dir: # Create the temp directory - os.makedirs(temp_dir, exist_ok=True) + Path.mkdir(Path(temp_dir), exist_ok=True) custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite") custom_reader: Reader = get_reader(custom_location=str(custom_loc)) diff --git a/tests/test_feeds.py b/tests/test_feeds.py index 214410b..23d757f 100644 --- a/tests/test_feeds.py +++ b/tests/test_feeds.py @@ -13,11 +13,11 @@ def test_send_to_discord() -> None: """Test sending to Discord.""" with tempfile.TemporaryDirectory() as temp_dir: # Create the temp directory. - os.makedirs(temp_dir, exist_ok=True) - assert os.path.exists(temp_dir) + Path.mkdir(Path(temp_dir), exist_ok=True) + assert Path.exists(Path(temp_dir)) # Create a temporary reader. - reader: Reader = make_reader(url=str(Path(temp_dir, "test_db.sqlite"))) + reader: Reader = make_reader(url=str(Path(temp_dir) / "test_db.sqlite")) assert reader is not None # Add a feed to the reader. @@ -35,7 +35,8 @@ def test_send_to_discord() -> None: # Get the webhook. webhook_url: str | None = os.environ.get("TEST_WEBHOOK_URL") - if webhook_url is None: + if not webhook_url: + reader.close() pytest.skip("No webhook URL provided.") assert webhook_url is not None diff --git a/tests/test_main.py b/tests/test_main.py index bfbadb7..1ffe217 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,10 +1,12 @@ -from typing import Literal +from typing import TYPE_CHECKING, Literal from fastapi.testclient import TestClient -from httpx import Response from discord_rss_bot.main import app, encode_url +if TYPE_CHECKING: + from httpx import Response + client: TestClient = TestClient(app) webhook_name: str = "Hello, I am a webhook!" webhook_url: str = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz" @@ -180,7 +182,7 @@ def test_unpause_feed() -> None: assert feed_url in response.text -def test_remove_feed(): +def test_remove_feed() -> None: """Test the /remove page.""" # Remove the feed if it already exists before we run the test. feeds: Response = client.get("/") @@ -201,7 +203,7 @@ def test_remove_feed(): assert feed_url not in response.text -def test_delete_webhook(): +def test_delete_webhook() -> None: """Test the /delete_webhook page.""" # Remove the feed if it already exists before we run the test. feeds: Response = client.get("/webhooks") diff --git a/tests/test_markdown.py b/tests/test_markdown.py index 8f7c233..dc7a4af 100644 --- a/tests/test_markdown.py +++ b/tests/test_markdown.py @@ -28,7 +28,9 @@ def test_convert_to_md() -> None: # Test multiple tags assert ( - convert_html_to_md('bold italic link code strikethrough') # noqa: E501 + convert_html_to_md( + 'bold italic link code strikethrough', + ) == "**bold** *italic* [link](https://example.com) `code` ~~strikethrough~~" ) diff --git a/tests/test_search.py b/tests/test_search.py index 229e2ce..3b6a577 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -1,20 +1,22 @@ -import os import tempfile from pathlib import Path -from typing import Iterable +from typing import TYPE_CHECKING from reader import Feed, Reader, make_reader from discord_rss_bot.search import create_html_for_search_results +if TYPE_CHECKING: + from collections.abc import Iterable + def test_create_html_for_search_results() -> None: """Test create_html_for_search_results.""" # Create a reader. with tempfile.TemporaryDirectory() as temp_dir: # Create the temp directory. - os.makedirs(temp_dir, exist_ok=True) - assert os.path.exists(temp_dir) + Path.mkdir(Path(temp_dir), exist_ok=True) + assert Path.exists(Path(temp_dir)) # Create a temporary reader. reader: Reader = make_reader(url=str(Path(temp_dir, "test_db.sqlite"))) diff --git a/tests/test_settings.py b/tests/test_settings.py index 2786b22..51cc1fc 100644 --- a/tests/test_settings.py +++ b/tests/test_settings.py @@ -1,6 +1,6 @@ -import os import pathlib import tempfile +from pathlib import Path from reader import Reader @@ -15,7 +15,7 @@ def test_reader() -> None: # Test the reader with a custom location. with tempfile.TemporaryDirectory() as temp_dir: # Create the temp directory - os.makedirs(temp_dir, exist_ok=True) + Path.mkdir(Path(temp_dir), exist_ok=True) custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite") custom_reader: Reader = get_reader(custom_location=str(custom_loc)) @@ -27,12 +27,12 @@ def test_reader() -> None: def test_data_dir() -> None: """Test the data directory.""" - assert os.path.exists(data_dir) + assert Path.exists(Path(data_dir)) def test_default_custom_message() -> None: """Test the default custom message.""" - assert "{{entry_title}}\n{{entry_link}}" == default_custom_message + assert default_custom_message == "{{entry_title}}\n{{entry_link}}" def test_get_webhook_for_entry() -> None: @@ -40,7 +40,7 @@ def test_get_webhook_for_entry() -> None: # Test with a custom reader. with tempfile.TemporaryDirectory() as temp_dir: # Create the temp directory - os.makedirs(temp_dir, exist_ok=True) + Path.mkdir(Path(temp_dir), exist_ok=True) custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite") custom_reader: Reader = get_reader(custom_location=str(custom_loc)) diff --git a/tests/test_whitelist.py b/tests/test_whitelist.py index dccfb3f..81acbcd 100644 --- a/tests/test_whitelist.py +++ b/tests/test_whitelist.py @@ -1,11 +1,14 @@ import tempfile from pathlib import Path -from typing import Iterable +from typing import TYPE_CHECKING from reader import Entry, Feed, Reader, make_reader from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent +if TYPE_CHECKING: + from collections.abc import Iterable + feed_url: str = "https://lovinator.space/rss_test.xml"