Move things away from main and refactor things

This commit is contained in:
2023-01-28 21:11:20 +01:00
parent e571ad9ee2
commit fa061782f4
6 changed files with 127 additions and 246 deletions

View File

@ -72,16 +72,16 @@ def try_to_replace(custom_message: str, template: str, replace_with: str) -> str
return custom_message return custom_message
def replace_tags_in_text_message(feed: Feed, entry: Entry) -> str: def replace_tags_in_text_message(entry: Entry) -> str:
"""Replace tags in custom_message. """Replace tags in custom_message.
Args: Args:
feed: The feed to get the tags from.
entry: The entry to get the tags from. entry: The entry to get the tags from.
Returns: Returns:
Returns the custom_message with the tags replaced. Returns the custom_message with the tags replaced.
""" """
feed: Feed = entry.feed
custom_reader: Reader = get_reader() custom_reader: Reader = get_reader()
custom_message: str = get_custom_message(feed=feed, custom_reader=custom_reader) custom_message: str = get_custom_message(feed=feed, custom_reader=custom_reader)

View File

@ -11,24 +11,6 @@ from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent
from discord_rss_bot.settings import default_custom_message, get_reader from discord_rss_bot.settings import default_custom_message, get_reader
def get_entry_from_id(entry_id: str, custom_reader: Reader | None = None) -> Entry | None:
"""
Get an entry from an ID.
Args:
entry_id: The ID of the entry.
custom_reader: If we should use a custom reader instead of the default one.
Returns:
Entry: The entry with the ID. None if it doesn't exist.
"""
# Get the default reader if we didn't get a custom one.
reader: Reader = get_reader() if custom_reader is None else custom_reader
# Get the entry from the ID, or return None if it doesn't exist.
return next((entry for entry in reader.get_entries() if entry.id == entry_id), None)
def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None): def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None):
""" """
Send a single entry to Discord. Send a single entry to Discord.
@ -46,7 +28,7 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None):
# Try to get the custom message for the feed. If the user has none, we will use the default message. # Try to get the custom message for the feed. If the user has none, we will use the default message.
if custom_message.get_custom_message(reader, entry.feed) != "": if custom_message.get_custom_message(reader, entry.feed) != "":
webhook_message = custom_message.replace_tags_in_text_message(entry=entry, feed=entry.feed) # type: ignore webhook_message = custom_message.replace_tags_in_text_message(entry=entry)
else: else:
webhook_message: str = default_custom_message webhook_message: str = default_custom_message
@ -144,7 +126,7 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non
# If the user has set the custom message to an empty string, we will use the default message, otherwise we # If the user has set the custom message to an empty string, we will use the default message, otherwise we
# will use the custom message. # will use the custom message.
if custom_message.get_custom_message(reader, entry.feed) != "": if custom_message.get_custom_message(reader, entry.feed) != "":
webhook_message = custom_message.replace_tags_in_text_message(entry=entry, feed=entry.feed) webhook_message = custom_message.replace_tags_in_text_message(entry)
else: else:
webhook_message: str = default_custom_message webhook_message: str = default_custom_message

View File

@ -3,7 +3,7 @@ import urllib.parse
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from functools import lru_cache from functools import lru_cache
from typing import Dict, Iterable from typing import Iterable
import httpx import httpx
import uvicorn import uvicorn
@ -13,17 +13,7 @@ from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
from httpx import Response from httpx import Response
from reader import ( from reader import Entry, Feed, FeedNotFoundError, Reader, TagNotFoundError
Entry,
EntryCounts,
EntrySearchCounts,
EntrySearchResult,
Feed,
FeedCounts,
FeedNotFoundError,
Reader,
TagNotFoundError,
)
from starlette.responses import RedirectResponse from starlette.responses import RedirectResponse
from discord_rss_bot import settings from discord_rss_bot import settings
@ -36,7 +26,7 @@ from discord_rss_bot.custom_message import (
replace_tags_in_text_message, replace_tags_in_text_message,
save_embed, save_embed,
) )
from discord_rss_bot.feeds import create_feed, get_entry_from_id, send_entry_to_discord, send_to_discord from discord_rss_bot.feeds import create_feed, send_entry_to_discord, send_to_discord
from discord_rss_bot.filter.blacklist import get_blacklist_content, get_blacklist_summary, get_blacklist_title from discord_rss_bot.filter.blacklist import get_blacklist_content, get_blacklist_summary, get_blacklist_title
from discord_rss_bot.filter.whitelist import get_whitelist_content, get_whitelist_summary, get_whitelist_title from discord_rss_bot.filter.whitelist import get_whitelist_content, get_whitelist_summary, get_whitelist_title
from discord_rss_bot.markdown import convert_html_to_md from discord_rss_bot.markdown import convert_html_to_md
@ -59,7 +49,7 @@ templates.env.filters["discord_markdown"] = convert_html_to_md
@app.post("/add_webhook") @app.post("/add_webhook")
async def post_add_webhook(webhook_name=Form(), webhook_url=Form()): async def post_add_webhook(webhook_name: str = Form(), webhook_url: str = Form()):
""" """
Add a feed to the database. Add a feed to the database.
@ -72,7 +62,7 @@ async def post_add_webhook(webhook_name=Form(), webhook_url=Form()):
@app.post("/delete_webhook") @app.post("/delete_webhook")
async def post_delete_webhook(webhook_url=Form()): async def post_delete_webhook(webhook_url: str = Form()):
""" """
Delete a webhook from the database. Delete a webhook from the database.
@ -84,49 +74,48 @@ async def post_delete_webhook(webhook_url=Form()):
@app.post("/add") @app.post("/add")
async def post_create_feed(feed_url=Form(), webhook_dropdown=Form()): async def post_create_feed(feed_url: str = Form(), webhook_dropdown: str = Form()):
""" """
Add a feed to the database. Add a feed to the database.
Args: Args:
feed_url: The feed to add. feed_url: The feed to add.
webhook_dropdown: The webhook to use. webhook_dropdown: The webhook to use.
Returns:
dict: The feed that was added.
""" """
create_feed(reader, feed_url, webhook_dropdown) create_feed(reader, feed_url, webhook_dropdown)
return RedirectResponse(url=f"/feed/?feed_url={feed_url}", status_code=303) return RedirectResponse(url=f"/feed/?feed_url={feed_url}", status_code=303)
@app.post("/pause") @app.post("/pause")
async def post_pause_feed(feed_url=Form()): async def post_pause_feed(feed_url: str = Form()):
"""Pause a feed. """Pause a feed.
Args: Args:
feed_url: The feed to pause. feed_url: The feed to pause.
""" """
reader.disable_feed_updates(feed_url) clean_feed_url: str = feed_url.strip()
return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(feed_url)}", status_code=303) reader.disable_feed_updates(clean_feed_url)
return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.post("/unpause") @app.post("/unpause")
async def post_unpause_feed(feed_url=Form()): async def post_unpause_feed(feed_url: str = Form()):
"""Unpause a feed. """Unpause a feed.
Args: Args:
feed_url: The Feed to unpause. feed_url: The Feed to unpause.
""" """
reader.enable_feed_updates(feed_url) clean_feed_url: str = feed_url.strip()
return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(feed_url)}", status_code=303) reader.enable_feed_updates(clean_feed_url)
return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.post("/whitelist") @app.post("/whitelist")
async def post_set_whitelist( async def post_set_whitelist(
whitelist_title=Form(None), whitelist_title: str = Form(None),
whitelist_summary=Form(None), whitelist_summary: str = Form(None),
whitelist_content=Form(None), whitelist_content: str = Form(None),
feed_url=Form(), feed_url: str = Form(),
): ):
"""Set what the whitelist should be sent, if you have this set only words in the whitelist will be sent. """Set what the whitelist should be sent, if you have this set only words in the whitelist will be sent.
@ -136,25 +125,26 @@ async def post_set_whitelist(
whitelist_content: Whitelisted words for when checking the title. whitelist_content: Whitelisted words for when checking the title.
feed_url: The feed we should set the whitelist for. feed_url: The feed we should set the whitelist for.
""" """
clean_feed_url: str = feed_url.strip()
if whitelist_title: if whitelist_title:
reader.set_tag(feed_url, "whitelist_title", whitelist_title) reader.set_tag(clean_feed_url, "whitelist_title", whitelist_title) # type: ignore
if whitelist_summary: if whitelist_summary:
reader.set_tag(feed_url, "whitelist_summary", whitelist_summary) reader.set_tag(clean_feed_url, "whitelist_summary", whitelist_summary) # type: ignore
if whitelist_content: if whitelist_content:
reader.set_tag(feed_url, "whitelist_content", whitelist_content) reader.set_tag(clean_feed_url, "whitelist_content", whitelist_content) # type: ignore
return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(feed_url)}", status_code=303) return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.get("/whitelist", response_class=HTMLResponse) @app.get("/whitelist", response_class=HTMLResponse)
async def get_whitelist(feed_url, request: Request): async def get_whitelist(feed_url: str, request: Request):
"""Get the whitelist. """Get the whitelist.
Args: Args:
feed_url: What feed we should get the whitelist for. feed_url: What feed we should get the whitelist for.
request: The HTTP request.
""" """
feed: Feed = reader.get_feed(urllib.parse.unquote(feed_url)) clean_feed_url: str = feed_url.strip()
feed: Feed = reader.get_feed(urllib.parse.unquote(clean_feed_url))
# Get previous data, this is used when creating the form. # Get previous data, this is used when creating the form.
whitelist_title: str = get_whitelist_title(reader, feed) whitelist_title: str = get_whitelist_title(reader, feed)
@ -173,10 +163,10 @@ async def get_whitelist(feed_url, request: Request):
@app.post("/blacklist") @app.post("/blacklist")
async def post_set_blacklist( async def post_set_blacklist(
blacklist_title=Form(None), blacklist_title: str = Form(None),
blacklist_summary=Form(None), blacklist_summary: str = Form(None),
blacklist_content=Form(None), blacklist_content: str = Form(None),
feed_url=Form(), feed_url: str = Form(),
): ):
"""Set the blacklist, if this is set we will check if words are in the title, summary or content """Set the blacklist, if this is set we will check if words are in the title, summary or content
and then don't send that entry. and then don't send that entry.
@ -187,18 +177,19 @@ async def post_set_blacklist(
blacklist_content: Blacklisted words for when checking the content. blacklist_content: Blacklisted words for when checking the content.
feed_url: What feed we should set the blacklist for. feed_url: What feed we should set the blacklist for.
""" """
clean_feed_url = feed_url.strip()
if blacklist_title: if blacklist_title:
reader.set_tag(feed_url, "blacklist_title", blacklist_title) reader.set_tag(clean_feed_url, "blacklist_title", blacklist_title) # type: ignore
if blacklist_summary: if blacklist_summary:
reader.set_tag(feed_url, "blacklist_summary", blacklist_summary) reader.set_tag(clean_feed_url, "blacklist_summary", blacklist_summary) # type: ignore
if blacklist_content: if blacklist_content:
reader.set_tag(feed_url, "blacklist_content", blacklist_content) reader.set_tag(clean_feed_url, "blacklist_content", blacklist_content) # type: ignore
return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(feed_url)}", status_code=303) return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(feed_url)}", status_code=303)
@app.get("/blacklist", response_class=HTMLResponse) @app.get("/blacklist", response_class=HTMLResponse)
async def get_blacklist(feed_url, request: Request): async def get_blacklist(feed_url: str, request: Request):
feed: Feed = reader.get_feed(urllib.parse.unquote(feed_url)) feed: Feed = reader.get_feed(urllib.parse.unquote(feed_url))
# Get previous data, this is used when creating the form. # Get previous data, this is used when creating the form.
@ -217,7 +208,7 @@ async def get_blacklist(feed_url, request: Request):
@app.post("/custom") @app.post("/custom")
async def post_set_custom(custom_message=Form(""), feed_url=Form()): async def post_set_custom(custom_message: str = Form(""), feed_url: str = Form()):
""" """
Set the custom message, this is used when sending the message. Set the custom message, this is used when sending the message.
@ -225,63 +216,44 @@ async def post_set_custom(custom_message=Form(""), feed_url=Form()):
custom_message: The custom message. custom_message: The custom message.
feed_url: The feed we should set the custom message for. feed_url: The feed we should set the custom message for.
""" """
if custom_message := custom_message.strip(): if custom_message:
reader.set_tag(feed_url, "custom_message", custom_message) # type: ignore reader.set_tag(feed_url, "custom_message", custom_message.strip()) # type: ignore
else: else:
reader.set_tag(feed_url, "custom_message", settings.default_custom_message) # type: ignore reader.set_tag(feed_url, "custom_message", settings.default_custom_message) # type: ignore
clean_url: str = urllib.parse.quote(feed_url) return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(feed_url)}", status_code=303)
return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303)
@app.get("/custom", response_class=HTMLResponse) @app.get("/custom", response_class=HTMLResponse)
async def get_custom(feed_url, request: Request): async def get_custom(feed_url: str, request: Request):
"""Get the custom message. This is used when sending the message to Discord. """Get the custom message. This is used when sending the message to Discord.
Args: Args:
feed_url: What feed we should get the custom message for. feed_url: What feed we should get the custom message for.
request: The HTTP request.
Returns:
custom.html
""" """
feed: Feed = reader.get_feed(urllib.parse.unquote(feed_url.strip()))
# Make feed_url a valid URL. context = {
url: str = urllib.parse.unquote(feed_url) "request": request,
"feed": feed,
feed: Feed = reader.get_feed(url) "custom_message": get_custom_message(reader, feed),
}
# Get previous data, this is used when creating the form.
custom_message: str = get_custom_message(reader, feed)
context = {"request": request, "feed": feed, "custom_message": custom_message}
# Get the first entry, this is used to show the user what the custom message will look like. # Get the first entry, this is used to show the user what the custom message will look like.
entries: Iterable[Entry] = reader.get_entries(feed=feed, limit=1) for entry in reader.get_entries(feed=feed, limit=1):
for entry in entries:
# Append to context.
context["entry"] = entry context["entry"] = entry
return templates.TemplateResponse("custom.html", context) return templates.TemplateResponse("custom.html", context)
@app.get("/embed", response_class=HTMLResponse) @app.get("/embed", response_class=HTMLResponse)
async def get_embed_page(feed_url, request: Request): async def get_embed_page(feed_url: str, request: Request):
"""Get the custom message. This is used when sending the message to Discord. """Get the custom message. This is used when sending the message to Discord.
Args: Args:
feed_url: What feed we should get the custom message for. feed_url: What feed we should get the custom message for.
request: The HTTP request.
Returns:
custom.html
""" """
feed: Feed = reader.get_feed(urllib.parse.unquote(feed_url.strip()))
# Make feed_url a valid URL.
url: str = urllib.parse.unquote(feed_url)
feed: Feed = reader.get_feed(url)
# Get previous data, this is used when creating the form. # Get previous data, this is used when creating the form.
embed: CustomEmbed = get_embed(reader, feed) embed: CustomEmbed = get_embed(reader, feed)
@ -300,14 +272,10 @@ async def get_embed_page(feed_url, request: Request):
"footer_text": embed.footer_text, "footer_text": embed.footer_text,
"footer_icon_url": embed.footer_icon_url, "footer_icon_url": embed.footer_icon_url,
} }
if custom_embed := get_embed(reader, feed):
# Get the first entry, this is used to show the user what the custom message will look like.
entries: Iterable[Entry] = reader.get_entries(feed=feed, limit=1)
if custom_embed := get_embed(reader, feed_url):
context["custom_embed"] = custom_embed context["custom_embed"] = custom_embed
for entry in entries: for entry in reader.get_entries(feed=feed, limit=1):
# Append to context. # Append to context.
context["entry"] = entry context["entry"] = entry
return templates.TemplateResponse("embed.html", context) return templates.TemplateResponse("embed.html", context)
@ -315,36 +283,27 @@ async def get_embed_page(feed_url, request: Request):
@app.post("/embed", response_class=HTMLResponse) @app.post("/embed", response_class=HTMLResponse)
async def post_embed( async def post_embed(
feed_url=Form(), feed_url: str = Form(),
title=Form(""), title: str = Form(""),
description=Form(""), description: str = Form(""),
color=Form(""), color: str = Form(""),
image_url=Form(""), image_url: str = Form(""),
thumbnail_url=Form(""), thumbnail_url: str = Form(""),
author_name=Form(""), author_name: str = Form(""),
author_url=Form(""), author_url: str = Form(""),
author_icon_url=Form(""), author_icon_url: str = Form(""),
footer_text=Form(""), footer_text: str = Form(""),
footer_icon_url=Form(""), footer_icon_url: str = Form(""),
): ):
"""Set the embed settings. """Set the embed settings.
Args: Args:
feed_url: What feed we should get the custom message for. feed_url: What feed we should get the custom message for.
request: The HTTP request.
Returns:
custom.html
""" """
clean_feed_url: str = feed_url.strip()
# Make feed_url a valid URL. feed: Feed = reader.get_feed(urllib.parse.unquote(clean_feed_url))
url: str = urllib.parse.unquote(feed_url)
feed: Feed = reader.get_feed(url)
custom_embed: CustomEmbed = get_embed(reader, feed) custom_embed: CustomEmbed = get_embed(reader, feed)
# Get the data from the form.
custom_embed.title = title custom_embed.title = title
custom_embed.description = description custom_embed.description = description
custom_embed.color = color custom_embed.color = color
@ -357,62 +316,55 @@ async def post_embed(
custom_embed.footer_icon_url = footer_icon_url custom_embed.footer_icon_url = footer_icon_url
# Save the data. # Save the data.
save_embed(reader, feed_url, custom_embed) save_embed(reader, feed, custom_embed)
clean_url: str = urllib.parse.quote(feed_url) return RedirectResponse(url=f"/feed/?feed_url={clean_feed_url}", status_code=303)
return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303)
@app.post("/use_embed") @app.post("/use_embed")
async def post_use_embed(feed_url=Form()): async def post_use_embed(feed_url: str = Form()):
url: str = urllib.parse.unquote(feed_url) """Use embed instead of text.
feed: Feed = reader.get_feed(url) Args:
reader.set_tag(feed, "should_send_embed", True) # type: ignore feed_url: The feed to change.
return RedirectResponse(url=f"/feed/?feed_url={feed_url}", status_code=303) """
clean_feed_url: str = feed_url.strip()
reader.set_tag(clean_feed_url, "should_send_embed", True) # type: ignore
return RedirectResponse(url=f"/feed/?feed_url={clean_feed_url}", status_code=303)
@app.post("/use_text") @app.post("/use_text")
async def post_use_text(feed_url=Form()): async def post_use_text(feed_url: str = Form()):
url: str = urllib.parse.unquote(feed_url) """Use text instead of embed.
feed: Feed = reader.get_feed(url) Args:
reader.set_tag(feed, "should_send_embed", False) # type: ignore feed_url: The feed to change.
return RedirectResponse(url=f"/feed/?feed_url={feed_url}", status_code=303) """
clean_feed_url: str = feed_url.strip()
reader.set_tag(clean_feed_url, "should_send_embed", False) # type: ignore
return RedirectResponse(url=f"/feed/?feed_url={clean_feed_url}", status_code=303)
@app.get("/add", response_class=HTMLResponse) @app.get("/add", response_class=HTMLResponse)
def get_add(request: Request): def get_add(request: Request):
""" """Page for adding a new feed."""
Page for adding a new feed. return templates.TemplateResponse("add.html", make_context_index(request))
Args:
request: The request.
"""
context = make_context_index(request)
return templates.TemplateResponse("add.html", context)
@app.get("/feed", response_class=HTMLResponse) @app.get("/feed", response_class=HTMLResponse)
async def get_feed(feed_url, request: Request): async def get_feed(feed_url: str, request: Request):
""" """
Get a feed by URL. Get a feed by URL.
Args: Args:
request: The request.
feed_url: The feed to add. feed_url: The feed to add.
""" """
# Make feed_url a valid URL. clean_feed_url: str = urllib.parse.unquote(feed_url.strip())
url: str = urllib.parse.unquote(feed_url)
feed: Feed = reader.get_feed(url) feed: Feed = reader.get_feed(clean_feed_url)
# Get entries from the feed. # Get entries from the feed.
entries: Iterable[Entry] = reader.get_entries(feed=url) entries: Iterable[Entry] = reader.get_entries(feed=clean_feed_url)
# Get the entries in the feed.
feed_counts: FeedCounts = reader.get_feed_counts(feed=url)
# Create the html for the entries. # Create the html for the entries.
html: str = create_html_for_feed(entries) html: str = create_html_for_feed(entries)
@ -427,7 +379,7 @@ async def get_feed(feed_url, request: Request):
"request": request, "request": request,
"feed": feed, "feed": feed,
"entries": entries, "entries": entries,
"feed_counts": feed_counts, "feed_counts": reader.get_feed_counts(feed=clean_feed_url),
"html": html, "html": html,
"should_send_embed": should_send_embed, "should_send_embed": should_send_embed,
} }
@ -443,28 +395,22 @@ def create_html_for_feed(entries: Iterable[Entry]) -> str:
""" """
html: str = "" html: str = ""
for entry in entries: for entry in entries:
# Get first image.
first_image = "" first_image = ""
first_image_text = "" first_image_text = ""
if images := get_images_from_entry(entry=entry): if images := get_images_from_entry(entry=entry):
first_image: str = images[0][0] first_image: str = images[0][0]
first_image_text: str = images[0][1] first_image_text: str = images[0][1]
# Get the text from the entry. text: str = replace_tags_in_text_message(entry) or "<div class='text-muted'>No content available.</div>"
text = replace_tags_in_text_message(entry.feed, entry)
if not text:
text = "<div class='text-muted'>No content available.</div>"
published = "" published = ""
if entry.published: if entry.published:
published: str = entry.published.strftime("%Y-%m-%d %H:%M:%S") published: str = entry.published.strftime("%Y-%m-%d %H:%M:%S")
blacklisted = "" blacklisted: str = ""
if entry_is_blacklisted(entry): if entry_is_blacklisted(entry):
blacklisted = "<span class='badge bg-danger'>Blacklisted</span>" blacklisted = "<span class='badge bg-danger'>Blacklisted</span>"
whitelisted = "" whitelisted: str = ""
if entry_is_whitelisted(entry): if entry_is_whitelisted(entry):
whitelisted = "<span class='badge bg-success'>Whitelisted</span>" whitelisted = "<span class='badge bg-success'>Whitelisted</span>"
@ -486,12 +432,7 @@ def create_html_for_feed(entries: Iterable[Entry]) -> str:
@app.get("/add_webhook", response_class=HTMLResponse) @app.get("/add_webhook", response_class=HTMLResponse)
async def get_add_webhook(request: Request): async def get_add_webhook(request: Request):
""" """Page for adding a new webhook."""
Page for adding a new webhook.
Args:
request: The request.
"""
return templates.TemplateResponse("add_webhook.html", {"request": request}) return templates.TemplateResponse("add_webhook.html", {"request": request})
@ -528,39 +469,21 @@ def get_data_from_hook_url(hook_name: str, hook_url: str):
@app.get("/webhooks", response_class=HTMLResponse) @app.get("/webhooks", response_class=HTMLResponse)
async def get_webhooks(request: Request): async def get_webhooks(request: Request):
""" """Page for adding a new webhook."""
Page for adding a new webhook.
Args:
request: The request.
"""
hooks: Dict[str, str] = reader.get_tag((), "webhooks", "") # type: ignore
hooks_with_data = [] hooks_with_data = []
for hook in hooks: for hook in reader.get_tag((), "webhooks", ""):
hook_url: str = hook["url"] # type: ignore our_hook: WebhookInfo = get_data_from_hook_url(hook_url=hook["url"], hook_name=hook["name"]) # type: ignore
hook_name: str = hook["name"] # type: ignore
our_hook: WebhookInfo = get_data_from_hook_url(hook_url=hook_url, hook_name=hook_name)
hooks_with_data.append(our_hook) hooks_with_data.append(our_hook)
return templates.TemplateResponse(
"webhooks.html", context = {"request": request, "hooks_with_data": hooks_with_data}
{ return templates.TemplateResponse("webhooks.html", context)
"request": request,
"hooks_with_data": hooks_with_data,
},
)
@app.get("/", response_class=HTMLResponse) @app.get("/", response_class=HTMLResponse)
def get_index(request: Request): def get_index(request: Request):
""" """This is the root of the website."""
This is the root of the website. return templates.TemplateResponse("index.html", make_context_index(request))
Args:
request: The request.
"""
context = make_context_index(request)
return templates.TemplateResponse("index.html", context)
def make_context_index(request: Request): def make_context_index(request: Request):
@ -568,14 +491,8 @@ def make_context_index(request: Request):
Create the needed context for the index page. Create the needed context for the index page.
Used by / and /add. Used by / and /add.
Args:
request: The request.
""" """
# Get webhooks name and url from the database. hooks: list[dict] = reader.get_tag((), "webhooks", []) # type: ignore
try:
hooks: list[dict] = reader.get_tag((), "webhooks") # type: ignore
except TagNotFoundError:
hooks = []
feed_list = [] feed_list = []
broken_feeds = [] broken_feeds = []
@ -594,16 +511,11 @@ def make_context_index(request: Request):
if webhook not in webhook_list: if webhook not in webhook_list:
feeds_without_corresponding_webhook.append(feed) feeds_without_corresponding_webhook.append(feed)
# Sort feed_list by when the feed was added.
feed_list.sort(key=lambda x: x["feed"].added)
feed_count: FeedCounts = reader.get_feed_counts()
entry_count: EntryCounts = reader.get_entry_counts()
return { return {
"request": request, "request": request,
"feeds": feed_list, "feeds": feed_list,
"feed_count": feed_count, "feed_count": reader.get_feed_counts(),
"entry_count": entry_count, "entry_count": reader.get_entry_counts(),
"webhooks": hooks, "webhooks": hooks,
"broken_feeds": broken_feeds, "broken_feeds": broken_feeds,
"feeds_without_corresponding_webhook": feeds_without_corresponding_webhook, "feeds_without_corresponding_webhook": feeds_without_corresponding_webhook,
@ -611,22 +523,18 @@ def make_context_index(request: Request):
@app.post("/remove", response_class=HTMLResponse) @app.post("/remove", response_class=HTMLResponse)
async def remove_feed(feed_url=Form()): async def remove_feed(feed_url: str = Form()):
""" """
Get a feed by URL. Get a feed by URL.
Args: Args:
feed_url: The feed to add. feed_url: The feed to add.
""" """
# Unquote the url
unquoted_feed_url: str = urllib.parse.unquote(feed_url)
try: try:
reader.delete_feed(unquoted_feed_url) reader.delete_feed(urllib.parse.unquote(feed_url))
except FeedNotFoundError as e: except FeedNotFoundError as e:
raise HTTPException(status_code=404, detail="Feed not found") from e raise HTTPException(status_code=404, detail="Feed not found") from e
reader.update_search()
return RedirectResponse(url="/", status_code=303) return RedirectResponse(url="/", status_code=303)
@ -636,33 +544,24 @@ async def search(request: Request, query: str):
Get entries matching a full-text search query. Get entries matching a full-text search query.
Args: Args:
request: The request.
query: The query to search for. query: The query to search for.
""" """
reader.update_search() reader.update_search()
search_results: Iterable[EntrySearchResult] = reader.search_entries(query)
search_amount: EntrySearchCounts = reader.search_entry_counts(query)
search_html: str = create_html_for_search_results(search_results)
context = { context = {
"request": request, "request": request,
"search_html": search_html, "search_html": create_html_for_search_results(query),
"query": query, "query": query,
"search_amount": search_amount, "search_amount": reader.search_entry_counts(query),
} }
return templates.TemplateResponse("search.html", context) return templates.TemplateResponse("search.html", context)
@app.get("/post_entry", response_class=HTMLResponse) @app.get("/post_entry", response_class=HTMLResponse)
async def post_entry(entry_id: str): async def post_entry(entry_id: str):
""" """Send single entry to Discord."""
Send a feed to Discord."""
# Unquote the entry id.
unquoted_entry_id: str = urllib.parse.unquote(entry_id) unquoted_entry_id: str = urllib.parse.unquote(entry_id)
entry: Entry | None = next((entry for entry in reader.get_entries() if entry.id == unquoted_entry_id), None)
print(f"Sending entry '{unquoted_entry_id}' to Discord.")
entry: Entry | None = get_entry_from_id(entry_id=unquoted_entry_id)
if entry is None: if entry is None:
return {"error": f"Failed to get entry '{entry_id}' when posting to Discord."} return {"error": f"Failed to get entry '{entry_id}' when posting to Discord."}
@ -678,7 +577,8 @@ async def post_entry(entry_id: str):
def startup() -> None: def startup() -> None:
"""This is called when the server starts. """This is called when the server starts.
It reads the settings file and starts the scheduler.""" It reads the settings file and starts the scheduler.
"""
add_missing_tags(reader=reader) add_missing_tags(reader=reader)
scheduler: BackgroundScheduler = BackgroundScheduler() scheduler: BackgroundScheduler = BackgroundScheduler()

View File

@ -1,16 +1,15 @@
import urllib.parse import urllib.parse
from typing import Iterable
from reader import EntrySearchResult, Feed, HighlightedString, Reader from reader import Feed, HighlightedString, Reader
from discord_rss_bot.settings import get_reader from discord_rss_bot.settings import get_reader
def create_html_for_search_results(search_results: Iterable[EntrySearchResult], custom_reader: Reader | None = None) -> str: def create_html_for_search_results(query: str, custom_reader: Reader | None = None) -> str:
"""Create HTML for the search results. """Create HTML for the search results.
Args: Args:
search_results: The search results. query: Our search query
custom_reader: The reader. If None, we will get the reader from the settings. custom_reader: The reader. If None, we will get the reader from the settings.
Returns: Returns:
@ -22,6 +21,8 @@ def create_html_for_search_results(search_results: Iterable[EntrySearchResult],
# Get the default reader if we didn't get a custom one. # Get the default reader if we didn't get a custom one.
reader: Reader = get_reader() if custom_reader is None else custom_reader reader: Reader = get_reader() if custom_reader is None else custom_reader
search_results = reader.search_entries(query)
html: str = "" html: str = ""
for result in search_results: for result in search_results:
if ".summary" in result.content: if ".summary" in result.content:

View File

@ -38,13 +38,14 @@ def add_webhook(reader: Reader, webhook_name: str, webhook_url: str):
def remove_webhook(reader: Reader, webhook_url: str): def remove_webhook(reader: Reader, webhook_url: str):
clean_webhook_url: str = webhook_url.strip()
# Get current webhooks from the database if they exist otherwise use an empty list. # Get current webhooks from the database if they exist otherwise use an empty list.
webhooks: list[dict[str, str]] = list_webhooks(reader) webhooks: list[dict[str, str]] = list_webhooks(reader)
# Only add the webhook if it doesn't already exist. # Only add the webhook if it doesn't already exist.
for webhook in webhooks: for webhook in webhooks:
if webhook["url"] in [webhook_url, webhook_url]: if webhook["url"] in clean_webhook_url:
# Add the new webhook to the list of webhooks.
webhooks.remove(webhook) webhooks.remove(webhook)
# Check if it has been removed. # Check if it has been removed.

View File

@ -3,7 +3,7 @@ import tempfile
from pathlib import Path from pathlib import Path
from typing import Iterable from typing import Iterable
from reader import EntrySearchResult, Feed, Reader, make_reader from reader import Feed, Reader, make_reader
from discord_rss_bot.search import create_html_for_search_results from discord_rss_bot.search import create_html_for_search_results
@ -39,11 +39,8 @@ def test_create_html_for_search_results() -> None:
reader.enable_search() reader.enable_search()
reader.update_search() reader.update_search()
# Get the HTML for the search results.
search_results: Iterable[EntrySearchResult] = reader.search_entries("a", feed=feed) # type: ignore
# Create the HTML and check if it is not empty. # Create the HTML and check if it is not empty.
search_html: str = create_html_for_search_results(search_results, reader) search_html: str = create_html_for_search_results("a", reader)
assert search_html is not None assert search_html is not None
assert len(search_html) > 10 assert len(search_html) > 10