Use database instead of config file

This commit is contained in:
2022-12-16 14:36:44 +01:00
parent ad61275724
commit ee3ce2016c
6 changed files with 191 additions and 85 deletions

View File

@ -26,15 +26,13 @@ Functions:
startup()
Runs on startup.
"""
import sys
import urllib.parse
from datetime import datetime
from typing import Any, Iterable
import uvicorn
from apscheduler.schedulers.background import BackgroundScheduler
from fastapi import FastAPI, Form, Request
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from reader import (
@ -45,13 +43,13 @@ from reader import (
Feed,
FeedCounts,
Reader,
TagNotFoundError,
)
from starlette.responses import RedirectResponse
from starlette.templating import _TemplateResponse
from tomlkit.toml_document import TOMLDocument
from discord_rss_bot.feeds import send_to_discord
from discord_rss_bot.search import create_html_for_search_results
from discord_rss_bot.settings import get_reader, read_settings_file
from discord_rss_bot.settings import get_reader
app: FastAPI = FastAPI()
app.mount("/static", StaticFiles(directory="discord_rss_bot/static"), name="static")
@ -77,8 +75,89 @@ def encode_url(url_to_quote: str) -> str:
templates.env.filters["encode_url"] = encode_url
@app.post("/add_webhook")
async def add_webhook(webhook_name: str = Form(), webhook_url: str = Form()) -> RedirectResponse | dict[str, str]:
"""
Add a feed to the database.
Args:
webhook_name: The name of the webhook.
webhook_url: The url of the webhook.
Returns:
dict: The feed that was added.
"""
# Remove leading and trailing whitespace.
clean_webhook_name: str = webhook_name.strip()
clean_webhook_url: str = webhook_url.strip()
# Get current webhooks from the database if they exist otherwise use an empty list.
webhooks: list[dict[str, str]] = []
if reader.get_tags(()) is not None:
for tag in reader.get_tag_keys(()):
if tag == "webhooks":
webhooks = reader.get_tag((), "webhooks")
break
# Only add the webhook if it doesn't already exist.
if not any(webhook["name"] == clean_webhook_name for webhook in webhooks):
# Create a dict with webhook name and URL.
new_webhook: dict[str, str] = {"name": clean_webhook_name, "url": clean_webhook_url}
# Add the new webhook to the list of webhooks.
webhooks.append(new_webhook)
# Add our new list of webhooks to the database.
reader.set_tag((), "webhooks", webhooks)
return RedirectResponse(url="/", status_code=303)
# TODO: Show this error on the page.
return {"error": "Webhook already exists."}
@app.post("/delete_webhook")
async def delete_webhook(webhook_url: str = Form()) -> RedirectResponse | dict[str, str]:
"""
Delete a webhook from the database.
Args:
webhook_name: The name of the webhook.
webhook_url: The url of the webhook.
Returns:
dict: The feed that was added.
"""
# Remove leading and trailing whitespace.
clean_webhook_url: str = webhook_url.strip()
# Get current webhooks from the database if they exist otherwise use an empty list.
webhooks: list[dict[str, str]] = []
if reader.get_tags(()) is not None:
for tag in reader.get_tag_keys(()):
if tag == "webhooks":
webhooks = reader.get_tag((), "webhooks")
break
# Only add the webhook if it doesn't already exist.
for webhook in webhooks:
if webhook["url"] == clean_webhook_url:
# Add the new webhook to the list of webhooks.
webhooks.remove(webhook)
print(f"Removed webhook {webhook['name']}.")
# Add our new list of webhooks to the database.
reader.set_tag((), "webhooks", webhooks)
return RedirectResponse(url="/", status_code=303)
# TODO: Show this error on the page.
return {"error": "Could not find webhook."}
@app.post("/add")
async def create_feed(feed_url: str = Form(), webhook_dropdown: str = Form()) -> RedirectResponse:
async def create_feed(feed_url: str = Form(), webhook_dropdown: str = Form()) -> dict[str, str] | RedirectResponse:
"""
Add a feed to the database.
@ -99,8 +178,20 @@ async def create_feed(feed_url: str = Form(), webhook_dropdown: str = Form()) ->
for entry in entries:
reader.set_entry_read(entry, True)
settings: TOMLDocument = read_settings_file()
webhook_url: str = str(settings["webhooks"][webhook_dropdown]) # type: ignore
try:
hooks = reader.get_tag((), "webhooks")
except TagNotFoundError:
hooks = []
if len(hooks) > 0:
# Get the webhook URL from the dropdown.
for hook in hooks:
if hook["name"] == webhook_dropdown:
webhook_url = hook["url"]
break
else:
return {"error": "Webhook not found."}
reader.set_tag(clean_feed_url, "webhook", webhook_url) # type: ignore
reader.get_tag(clean_feed_url, "webhook")
@ -109,23 +200,6 @@ async def create_feed(feed_url: str = Form(), webhook_dropdown: str = Form()) ->
return RedirectResponse(url=f"/feed/?feed_url={feed_url}", status_code=303)
def create_list_of_webhooks() -> list[dict[str, str]]:
"""List with webhooks."""
settings: TOMLDocument = read_settings_file()
list_of_webhooks: list[dict[str, str]] = []
for hook in settings["webhooks"]: # type: ignore
list_of_webhooks.append({"name": hook, "url": settings["webhooks"][hook]}) # type: ignore
return list_of_webhooks
@app.get("/favicon.ico", include_in_schema=False)
async def favicon() -> FileResponse:
"""Return favicon."""
return FileResponse("static/favicon.ico")
@app.get("/add", response_class=HTMLResponse)
def get_add(request: Request) -> _TemplateResponse:
"""
@ -141,7 +215,7 @@ def get_add(request: Request) -> _TemplateResponse:
return templates.TemplateResponse("add.html", context)
@app.get("/feed/", response_class=HTMLResponse)
@app.get("/feed", response_class=HTMLResponse)
async def get_feed(feed_url: str, request: Request) -> _TemplateResponse:
"""
Get a feed by URL.
@ -168,6 +242,25 @@ async def get_feed(feed_url: str, request: Request) -> _TemplateResponse:
return templates.TemplateResponse("feed.html", context)
@app.get("/webhooks", response_class=HTMLResponse)
async def get_webhooks(request: Request) -> _TemplateResponse:
"""
Page for adding a new webhook.
Args:
request: The request.
Returns:
HTMLResponse: The HTML response.
"""
try:
webhooks = reader.get_tag((), "webhooks")
except TagNotFoundError:
webhooks = []
context = {"request": request, "webhooks": webhooks}
return templates.TemplateResponse("webhooks.html", context)
@app.get("/", response_class=HTMLResponse)
def index(request: Request) -> _TemplateResponse:
"""
@ -195,7 +288,12 @@ def make_context_index(request) -> dict:
dict: The context.
"""
hooks: list[dict[str, str]] = create_list_of_webhooks()
# Get webhooks name and url from the database.
try:
hooks = reader.get_tag((), "webhooks")
except TagNotFoundError:
hooks = []
feed_list = []
feeds: Iterable[Feed] = reader.get_feeds()
for feed in feeds:
@ -267,15 +365,10 @@ def startup() -> None:
"""This is called when the server starts.
It reads the settings file and starts the scheduler."""
settings: TOMLDocument = read_settings_file()
if not settings["webhooks"]:
sys.exit("No webhooks found in settings file.")
scheduler: BackgroundScheduler = BackgroundScheduler()
# Update all feeds every 15 minutes.
scheduler.add_job(send_to_discord, "interval", minutes=15, next_run_time=datetime.now())
# scheduler.add_job(send_to_discord, "interval", minutes=15, next_run_time=datetime.now())
scheduler.start()