Stuff and things

This commit is contained in:
2024-10-30 18:45:02 +01:00
parent 14a78b4717
commit 7ba3716b98
32 changed files with 1225 additions and 1150 deletions

View File

@ -20,13 +20,12 @@ from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from httpx import Response
from markdownify import markdownify
from reader import Entry, Feed, FeedNotFoundError, Reader, TagNotFoundError
from reader import Entry, EntryNotFoundError, Feed, FeedNotFoundError, Reader, TagNotFoundError
from reader.types import JSONType
from starlette.responses import RedirectResponse
from discord_rss_bot import settings
from discord_rss_bot.custom_filters import (
encode_url,
entry_is_blacklisted,
entry_is_whitelisted,
)
@ -42,7 +41,6 @@ from discord_rss_bot.feeds import create_feed, send_entry_to_discord, send_to_di
from discord_rss_bot.missing_tags import add_missing_tags
from discord_rss_bot.search import create_html_for_search_results
from discord_rss_bot.settings import get_reader
from discord_rss_bot.webhook import add_webhook, remove_webhook
if TYPE_CHECKING:
from collections.abc import Iterable
@ -90,7 +88,7 @@ reader: Reader = get_reader()
@asynccontextmanager
async def lifespan(app: FastAPI) -> typing.AsyncGenerator[None]:
"""This is needed for the ASGI server to run."""
add_missing_tags(reader=reader)
add_missing_tags(reader)
scheduler: AsyncIOScheduler = AsyncIOScheduler()
# Update all feeds every 15 minutes.
@ -109,7 +107,7 @@ templates: Jinja2Templates = Jinja2Templates(directory="discord_rss_bot/template
# Add the filters to the Jinja2 environment so they can be used in html templates.
templates.env.filters["encode_url"] = encode_url
templates.env.filters["encode_url"] = lambda url: urllib.parse.quote(url) if url else ""
templates.env.filters["entry_is_whitelisted"] = entry_is_whitelisted
templates.env.filters["entry_is_blacklisted"] = entry_is_blacklisted
templates.env.filters["discord_markdown"] = markdownify
@ -126,11 +124,32 @@ async def post_add_webhook(
webhook_name: The name of the webhook.
webhook_url: The url of the webhook.
Raises:
HTTPException: If the webhook already exists.
Returns:
RedirectResponse: Redirect to the index page.
"""
add_webhook(reader, webhook_name, webhook_url)
return RedirectResponse(url="/", status_code=303)
# Get current webhooks from the database if they exist otherwise use an empty list.
webhooks = list(reader.get_tag((), "webhooks", []))
# Webhooks are stored as a list of dictionaries.
# Example: [{"name": "webhook_name", "url": "webhook_url"}]
webhooks = cast(list[dict[str, str]], webhooks)
# Only add the webhook if it doesn't already exist.
stripped_webhook_name = webhook_name.strip()
if all(webhook["name"] != stripped_webhook_name for webhook in webhooks):
# Add the new webhook to the list of webhooks.
webhooks.append({"name": webhook_name.strip(), "url": webhook_url.strip()})
reader.set_tag((), "webhooks", webhooks) # type: ignore
return RedirectResponse(url="/", status_code=303)
# TODO(TheLovinator): Show this error on the page.
# TODO(TheLovinator): Replace HTTPException with WebhookAlreadyExistsError.
raise HTTPException(status_code=409, detail="Webhook already exists")
@app.post("/delete_webhook")
@ -140,11 +159,37 @@ async def post_delete_webhook(webhook_url: Annotated[str, Form()]) -> RedirectRe
Args:
webhook_url: The url of the webhook.
Raises:
HTTPException: If the webhook could not be deleted
Returns:
RedirectResponse: Redirect to the index page.
"""
# TODO(TheLovinator): Check if the webhook is in use by any feeds before deleting it.
remove_webhook(reader, webhook_url)
# TODO(TheLovinator): Replace HTTPException with a custom exception for both of these.
# Get current webhooks from the database if they exist otherwise use an empty list.
webhooks = list(reader.get_tag((), "webhooks", []))
# Webhooks are stored as a list of dictionaries.
# Example: [{"name": "webhook_name", "url": "webhook_url"}]
webhooks = cast(list[dict[str, str]], webhooks)
# Only add the webhook if it doesn't already exist.
webhooks_to_remove: list[dict[str, str]] = [
webhook for webhook in webhooks if webhook["url"] == webhook_url.strip()
]
# Remove the webhooks outside the loop.
for webhook in webhooks_to_remove:
webhooks.remove(webhook)
# Check if any webhooks were removed.
if not all(webhook not in webhooks for webhook in webhooks_to_remove):
raise HTTPException(status_code=500, detail="Webhook could not be deleted")
# Add our new list of webhooks to the database.
reader.set_tag((), "webhooks", webhooks) # type: ignore
return RedirectResponse(url="/", status_code=303)
@ -515,7 +560,7 @@ def get_add(request: Request):
@app.get("/feed", response_class=HTMLResponse)
async def get_feed(feed_url: str, request: Request, starting_after: str | None = None):
async def get_feed(feed_url: str, request: Request, starting_after: str = ""):
"""Get a feed by URL.
Args:
@ -523,15 +568,65 @@ async def get_feed(feed_url: str, request: Request, starting_after: str | None =
request: The request object.
starting_after: The entry to start after. Used for pagination.
Raises:
HTTPException: If the feed is not found.
Returns:
HTMLResponse: The feed page.
"""
entries_per_page: int = 20
clean_feed_url: str = urllib.parse.unquote(feed_url.strip())
feed: Feed = reader.get_feed(clean_feed_url)
try:
feed: Feed = reader.get_feed(clean_feed_url)
except FeedNotFoundError as e:
raise HTTPException(status_code=404, detail=f"Feed '{clean_feed_url}' not found.\n\n{e}") from e
# Only show button if more than 10 entries.
total_entries: int = reader.get_entry_counts(feed=feed).total or 0
show_more_entires_button: bool = total_entries > entries_per_page
# Get entries from the feed.
entries: typing.Iterable[Entry] = reader.get_entries(feed=clean_feed_url, limit=10)
if starting_after:
try:
start_after_entry: Entry | None = reader.get_entry((str(feed.url), starting_after))
except FeedNotFoundError as e:
raise HTTPException(status_code=404, detail=f"Feed '{clean_feed_url}' not found.\n\n{e}") from e
except EntryNotFoundError as e:
current_entries = list(reader.get_entries(feed=clean_feed_url))
msg: str = f"{e}\n\n{[entry.id for entry in current_entries]}"
html: str = create_html_for_feed(current_entries)
context = {
"request": request,
"feed": feed,
"entries": current_entries,
"feed_counts": reader.get_feed_counts(feed=clean_feed_url),
"html": html,
"should_send_embed": False,
"last_entry": None,
"messages": msg,
"show_more_entires_button": show_more_entires_button,
"total_entries": total_entries,
}
return templates.TemplateResponse(request=request, name="feed.html", context=context)
else:
start_after_entry = None
entries: typing.Iterable[Entry] = reader.get_entries(
feed=clean_feed_url,
starting_after=start_after_entry,
limit=entries_per_page,
)
entries = list(entries)
# Get the last entry.
last_entry: Entry | None = None
if entries:
last_entry = entries[-1]
# Create the html for the entries.
html: str = create_html_for_feed(entries)
@ -549,47 +644,9 @@ async def get_feed(feed_url: str, request: Request, starting_after: str | None =
"feed_counts": reader.get_feed_counts(feed=clean_feed_url),
"html": html,
"should_send_embed": should_send_embed,
"show_more_button": True,
}
return templates.TemplateResponse(request=request, name="feed.html", context=context)
@app.get("/feed_more", response_class=HTMLResponse)
async def get_all_entries(feed_url: str, request: Request):
"""Get a feed by URL and show more entries.
Args:
feed_url: The feed to add.
request: The request object.
starting_after: The entry to start after. Used for pagination.
Returns:
HTMLResponse: The feed page.
"""
clean_feed_url: str = urllib.parse.unquote(feed_url.strip())
feed: Feed = reader.get_feed(clean_feed_url)
# Get entries from the feed.
entries: typing.Iterable[Entry] = reader.get_entries(feed=clean_feed_url, limit=200)
# Create the html for the entries.
html: str = create_html_for_feed(entries)
try:
should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed"))
except TagNotFoundError:
add_missing_tags(reader)
should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed"))
context = {
"request": request,
"feed": feed,
"entries": entries,
"feed_counts": reader.get_feed_counts(feed=clean_feed_url),
"html": html,
"should_send_embed": should_send_embed,
"show_more_button": False,
"last_entry": last_entry,
"show_more_entires_button": show_more_entires_button,
"total_entries": total_entries,
}
return templates.TemplateResponse(request=request, name="feed.html", context=context)