Use Ruff and fix all its warnings and errors

This commit is contained in:
2023-03-18 01:50:45 +01:00
parent 15284c5646
commit 948a5a2af9
27 changed files with 504 additions and 313 deletions

View File

@ -1,9 +1,9 @@
import json
import urllib.parse
from collections.abc import Iterable
from dataclasses import dataclass
from datetime import datetime
from datetime import datetime, timezone
from functools import lru_cache
from typing import Iterable
import httpx
import uvicorn
@ -22,7 +22,7 @@ from discord_rss_bot.custom_message import (
CustomEmbed,
get_custom_message,
get_embed,
get_image,
get_first_image,
replace_tags_in_text_message,
save_embed,
)
@ -47,45 +47,44 @@ templates.env.filters["discord_markdown"] = convert_html_to_md
@app.post("/add_webhook")
async def post_add_webhook(webhook_name: str = Form(), webhook_url: str = Form()):
"""
Add a feed to the database.
async def post_add_webhook(webhook_name: str = Form(), webhook_url: str = Form()) -> RedirectResponse:
"""Add a feed to the database.
Args:
webhook_name: The name of the webhook.
webhook_url: The url of the webhook.
"""
if add_webhook(reader, webhook_name, webhook_url):
return RedirectResponse(url="/", status_code=303)
add_webhook(reader, webhook_name, webhook_url)
return RedirectResponse(url="/", status_code=303)
@app.post("/delete_webhook")
async def post_delete_webhook(webhook_url: str = Form()):
"""
Delete a webhook from the database.
async def post_delete_webhook(webhook_url: str = Form()) -> RedirectResponse:
"""Delete a webhook from the database.
Args:
webhook_url: The url of the webhook.
"""
if remove_webhook(reader, webhook_url):
return RedirectResponse(url="/", status_code=303)
# TODO: Check if the webhook is in use by any feeds before deleting it.
remove_webhook(reader, webhook_url)
return RedirectResponse(url="/", status_code=303)
@app.post("/add")
async def post_create_feed(feed_url: str = Form(), webhook_dropdown: str = Form()):
"""
Add a feed to the database.
async def post_create_feed(feed_url: str = Form(), webhook_dropdown: str = Form()) -> RedirectResponse:
"""Add a feed to the database.
Args:
feed_url: The feed to add.
webhook_dropdown: The webhook to use.
"""
clean_feed_url: str = feed_url.strip()
create_feed(reader, feed_url, webhook_dropdown)
return RedirectResponse(url=f"/feed/?feed_url={feed_url}", status_code=303)
return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.post("/pause")
async def post_pause_feed(feed_url: str = Form()):
async def post_pause_feed(feed_url: str = Form()) -> RedirectResponse:
"""Pause a feed.
Args:
@ -97,7 +96,7 @@ async def post_pause_feed(feed_url: str = Form()):
@app.post("/unpause")
async def post_unpause_feed(feed_url: str = Form()):
async def post_unpause_feed(feed_url: str = Form()) -> RedirectResponse:
"""Unpause a feed.
Args:
@ -115,7 +114,7 @@ async def post_set_whitelist(
whitelist_content: str = Form(None),
whitelist_author: str = Form(None),
feed_url: str = Form(),
):
) -> RedirectResponse:
"""Set what the whitelist should be sent, if you have this set only words in the whitelist will be sent.
Args:
@ -127,7 +126,7 @@ async def post_set_whitelist(
"""
clean_feed_url: str = feed_url.strip()
if whitelist_title:
reader.set_tag(clean_feed_url, "whitelist_title", whitelist_title) # type: ignore
reader.set_tag(clean_feed_url, "whitelist_title", [whitelist_title])
if whitelist_summary:
reader.set_tag(clean_feed_url, "whitelist_summary", whitelist_summary) # type: ignore
if whitelist_content:
@ -172,8 +171,10 @@ async def post_set_blacklist(
blacklist_content: str = Form(None),
blacklist_author: str = Form(None),
feed_url: str = Form(),
):
"""Set the blacklist, if this is set we will check if words are in the title, summary or content
) -> RedirectResponse:
"""Set the blacklist.
If this is set we will check if words are in the title, summary or content
and then don't send that entry.
Args:
@ -193,7 +194,7 @@ async def post_set_blacklist(
if blacklist_author:
reader.set_tag(clean_feed_url, "blacklist_author", blacklist_author) # type: ignore
return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(feed_url)}", status_code=303)
return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.get("/blacklist", response_class=HTMLResponse)
@ -218,9 +219,8 @@ async def get_blacklist(feed_url: str, request: Request):
@app.post("/custom")
async def post_set_custom(custom_message: str = Form(""), feed_url: str = Form()):
"""
Set the custom message, this is used when sending the message.
async def post_set_custom(custom_message: str = Form(""), feed_url: str = Form()) -> RedirectResponse:
"""Set the custom message, this is used when sending the message.
Args:
custom_message: The custom message.
@ -231,7 +231,8 @@ async def post_set_custom(custom_message: str = Form(""), feed_url: str = Form()
else:
reader.set_tag(feed_url, "custom_message", settings.default_custom_message) # type: ignore
return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(feed_url)}", status_code=303)
clean_feed_url: str = feed_url.strip()
return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.get("/custom", response_class=HTMLResponse)
@ -304,11 +305,25 @@ async def post_embed(
author_icon_url: str = Form(""),
footer_text: str = Form(""),
footer_icon_url: str = Form(""),
):
) -> RedirectResponse:
"""Set the embed settings.
Args:
feed_url: What feed we should get the custom message for.
title: The title of the embed.
description: The description of the embed.
color: The color of the embed.
image_url: The image url of the embed.
thumbnail_url: The thumbnail url of the embed.
author_name: The author name of the embed.
author_url: The author url of the embed.
author_icon_url: The author icon url of the embed.
footer_text: The footer text of the embed.
footer_icon_url: The footer icon url of the embed.
Returns:
RedirectResponse: Redirect to the embed page.
"""
clean_feed_url: str = feed_url.strip()
feed: Feed = reader.get_feed(urllib.parse.unquote(clean_feed_url))
@ -328,31 +343,37 @@ async def post_embed(
# Save the data.
save_embed(reader, feed, custom_embed)
return RedirectResponse(url=f"/feed/?feed_url={clean_feed_url}", status_code=303)
return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.post("/use_embed")
async def post_use_embed(feed_url: str = Form()):
async def post_use_embed(feed_url: str = Form()) -> RedirectResponse:
"""Use embed instead of text.
Args:
feed_url: The feed to change.
Returns:
RedirectResponse: Redirect to the feed page.
"""
clean_feed_url: str = feed_url.strip()
reader.set_tag(clean_feed_url, "should_send_embed", True) # type: ignore
return RedirectResponse(url=f"/feed/?feed_url={clean_feed_url}", status_code=303)
return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.post("/use_text")
async def post_use_text(feed_url: str = Form()):
async def post_use_text(feed_url: str = Form()) -> RedirectResponse:
"""Use text instead of embed.
Args:
feed_url: The feed to change.
Returns:
RedirectResponse: Redirect to the feed page.
"""
clean_feed_url: str = feed_url.strip()
reader.set_tag(clean_feed_url, "should_send_embed", False) # type: ignore
return RedirectResponse(url=f"/feed/?feed_url={clean_feed_url}", status_code=303)
return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.get("/add", response_class=HTMLResponse)
@ -367,11 +388,14 @@ def get_add(request: Request):
@app.get("/feed", response_class=HTMLResponse)
async def get_feed(feed_url: str, request: Request):
"""
Get a feed by URL.
"""Get a feed by URL.
Args:
feed_url: The feed to add.
request: The request object.
Returns:
HTMLResponse: The feed page.
"""
clean_feed_url: str = urllib.parse.unquote(feed_url.strip())
@ -404,19 +428,18 @@ def create_html_for_feed(entries: Iterable[Entry]) -> str:
"""Create HTML for the search results.
Args:
search_results: The search results.
custom_reader: The reader. If None, we will get the reader from the settings.
entries: The entries to create HTML for.
"""
html: str = ""
for entry in entries:
first_image = ""
first_image: str = ""
summary: str | None = entry.summary
content = ""
if entry.content:
for content_item in entry.content:
content: str = content_item.value
first_image = get_image(summary, content)
first_image = get_first_image(summary, content)
text: str = replace_tags_in_text_message(entry) or "<div class='text-muted'>No content available.</div>"
published = ""
@ -432,24 +455,30 @@ def create_html_for_feed(entries: Iterable[Entry]) -> str:
whitelisted = "<span class='badge bg-success'>Whitelisted</span>"
entry_id: str = urllib.parse.quote(entry.id)
to_disord_html: str = f"<a class='text-muted' href='/post_entry?entry_id={entry_id}'>Send to Discord</a>"
to_discord_html: str = f"<a class='text-muted' href='/post_entry?entry_id={entry_id}'>Send to Discord</a>"
image_html: str = f"<img src='{first_image}' class='img-fluid'>" if first_image else ""
html += f"""<div class="p-2 mb-2 border border-dark">
{blacklisted}{whitelisted}<a class="text-muted text-decoration-none" href="{entry.link}"><h2>{entry.title}</h2></a>
{f"By { entry.author } @" if entry.author else ""}{published} - {to_disord_html}
{f"By { entry.author } @" if entry.author else ""}{published} - {to_discord_html}
{text}
{image_html}
</div>
"""
return html.strip()
@app.get("/add_webhook", response_class=HTMLResponse)
async def get_add_webhook(request: Request):
"""Page for adding a new webhook."""
"""Page for adding a new webhook.
Args:
request: The request object.
Returns:
HTMLResponse: The add webhook page.
"""
return templates.TemplateResponse("add_webhook.html", {"request": request})
@ -457,36 +486,54 @@ async def get_add_webhook(request: Request):
class WebhookInfo:
custom_name: str
url: str
type: int | None = None
id: str | None = None
webhook_type: int | None = None
webhook_id: str | None = None
name: str | None = None
avatar: str | None = None
channel_id: str | None = None
guild_id: str | None = None
token: str | None = None
avatar_mod: int | None = None
@lru_cache()
def get_data_from_hook_url(hook_name: str, hook_url: str):
@lru_cache
def get_data_from_hook_url(hook_name: str, hook_url: str) -> WebhookInfo:
"""Get data from a webhook URL.
Args:
hook_name (str): The webhook name.
hook_url (str): The webhook URL.
Returns:
WebhookInfo: The webhook username, avatar, guild id, etc.
"""
our_hook: WebhookInfo = WebhookInfo(custom_name=hook_name, url=hook_url)
if hook_url:
response: Response = httpx.get(hook_url)
if response.status_code == 200:
if response.is_success:
webhook_json = json.loads(response.text)
our_hook.type = webhook_json["type"] or None
our_hook.id = webhook_json["id"] or None
our_hook.webhook_type = webhook_json["type"] or None
our_hook.webhook_id = webhook_json["id"] or None
our_hook.name = webhook_json["name"] or None
our_hook.avatar = webhook_json["avatar"] or None
our_hook.channel_id = webhook_json["channel_id"] or None
our_hook.guild_id = webhook_json["guild_id"] or None
our_hook.token = webhook_json["token"] or None
our_hook.avatar_mod = int(webhook_json["channel_id"] or 0) % 5
return our_hook
@app.get("/webhooks", response_class=HTMLResponse)
async def get_webhooks(request: Request):
"""Page for adding a new webhook."""
"""Page for adding a new webhook.
Args:
request: The request object.
Returns:
HTMLResponse: The add webhook page.
"""
hooks_with_data = []
for hook in reader.get_tag((), "webhooks", ""):
@ -499,12 +546,26 @@ async def get_webhooks(request: Request):
@app.get("/", response_class=HTMLResponse)
def get_index(request: Request):
"""This is the root of the website."""
"""This is the root of the website.
Args:
request: The request object.
Returns:
HTMLResponse: The index page.
"""
return templates.TemplateResponse("index.html", make_context_index(request))
def make_context_index(request: Request):
"""Create the needed context for the index page."""
"""Create the needed context for the index page.
Args:
request: The request object.
Returns:
dict: The context for the index page.
"""
hooks: list[dict] = reader.get_tag((), "webhooks", []) # type: ignore
feed_list = []
@ -537,11 +598,13 @@ def make_context_index(request: Request):
@app.post("/remove", response_class=HTMLResponse)
async def remove_feed(feed_url: str = Form()):
"""
Get a feed by URL.
"""Get a feed by URL.
Args:
feed_url: The feed to add.
Returns:
RedirectResponse: Redirect to the index page.
"""
try:
reader.delete_feed(urllib.parse.unquote(feed_url))
@ -553,11 +616,14 @@ async def remove_feed(feed_url: str = Form()):
@app.get("/search", response_class=HTMLResponse)
async def search(request: Request, query: str):
"""
Get entries matching a full-text search query.
"""Get entries matching a full-text search query.
Args:
query: The query to search for.
request: The request object.
Returns:
HTMLResponse: The search page.
"""
reader.update_search()
@ -572,18 +638,25 @@ async def search(request: Request, query: str):
@app.get("/post_entry", response_class=HTMLResponse)
async def post_entry(entry_id: str):
"""Send single entry to Discord."""
"""Send single entry to Discord.
Args:
entry_id: The entry to send.
Returns:
RedirectResponse: Redirect to the feed page.
"""
unquoted_entry_id: str = urllib.parse.unquote(entry_id)
entry: Entry | None = next((entry for entry in reader.get_entries() if entry.id == unquoted_entry_id), None)
if entry is None:
return {"error": f"Failed to get entry '{entry_id}' when posting to Discord."}
return HTMLResponse(status_code=404, content=f"Entry '{entry_id}' not found.")
if result := send_entry_to_discord(entry=entry):
return result
# Redirect to the feed page.
clean_url: str = entry.feed.url.strip()
return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303)
clean_feed_url: str = entry.feed.url.strip()
return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.on_event("startup")
@ -598,7 +671,7 @@ def startup() -> None:
# Update all feeds every 15 minutes.
# TODO: Make this configurable.
scheduler.add_job(send_to_discord, "interval", minutes=15, next_run_time=datetime.now())
scheduler.add_job(send_to_discord, "interval", minutes=15, next_run_time=datetime.now(tz=timezone.utc))
scheduler.start()