Add logging with loguru

This commit is contained in:
2023-01-18 22:56:29 +01:00
parent cbad4e4430
commit ae7a10892d
11 changed files with 327 additions and 15 deletions

View File

@ -8,15 +8,18 @@ from fastapi import FastAPI, Form, Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from loguru import logger
from reader import Entry, EntryCounts, EntrySearchCounts, EntrySearchResult, Feed, FeedCounts, Reader, TagNotFoundError
from starlette.responses import RedirectResponse
from discord_rss_bot import settings
from discord_rss_bot.apscheduler_listener import my_listener
from discord_rss_bot.custom_filters import convert_to_md, encode_url, entry_is_blacklisted, entry_is_whitelisted
from discord_rss_bot.custom_message import get_custom_message, get_images_from_entry, remove_image_tags
from discord_rss_bot.feeds import send_to_discord
from discord_rss_bot.filter.blacklist import get_blacklist_content, get_blacklist_summary, get_blacklist_title
from discord_rss_bot.filter.whitelist import get_whitelist_content, get_whitelist_summary, get_whitelist_title
from discord_rss_bot.logger import init_logging
from discord_rss_bot.search import create_html_for_search_results
from discord_rss_bot.settings import default_custom_message, get_reader, list_webhooks
@ -27,6 +30,8 @@ templates: Jinja2Templates = Jinja2Templates(directory="discord_rss_bot/template
reader: Reader = get_reader()
init_logging()
# Add the filters to the Jinja2 environment so they can be used in html templates.
templates.env.filters["encode_url"] = encode_url
templates.env.filters["entry_is_whitelisted"] = entry_is_whitelisted
@ -49,9 +54,11 @@ async def add_webhook(webhook_name=Form(), webhook_url=Form()):
# Remove leading and trailing whitespace.
clean_webhook_name: str = webhook_name.strip()
clean_webhook_url: str = webhook_url.strip()
logger.info(f"Adding webhook {clean_webhook_name} with url {clean_webhook_url}")
# Get current webhooks from the database if they exist otherwise use an empty list.
webhooks: list[dict[str, str]] = list_webhooks(reader)
logger.debug(f"Current webhooks: {webhooks}")
# Only add the webhook if it doesn't already exist.
if all(webhook["name"] != clean_webhook_name for webhook in webhooks):
@ -64,9 +71,12 @@ async def add_webhook(webhook_name=Form(), webhook_url=Form()):
# Add our new list of webhooks to the database.
reader.set_tag((), "webhooks", webhooks) # type: ignore
logger.info(f"Added webhook {clean_webhook_name} with url {clean_webhook_url}")
return RedirectResponse(url="/", status_code=303)
# TODO: Show this error on the page.
logger.error(f"Webhook {clean_webhook_name} already exists.")
return {"error": "Webhook already exists."}
@ -83,22 +93,25 @@ async def delete_webhook(webhook_url=Form()):
"""
# Remove leading and trailing whitespace.
clean_webhook_url: str = webhook_url.strip()
logger.debug(f"Deleting webhook with url {clean_webhook_url}")
# Get current webhooks from the database if they exist otherwise use an empty list.
webhooks: list[dict[str, str]] = list_webhooks(reader)
logger.debug(f"Current webhooks: {webhooks}")
# Only add the webhook if it doesn't already exist.
for webhook in webhooks:
if webhook["url"] == clean_webhook_url:
# Add the new webhook to the list of webhooks.
webhooks.remove(webhook)
logger.info(f"Deleted webhook with url {clean_webhook_url}")
# Add our new list of webhooks to the database.
reader.set_tag((), "webhooks", webhooks) # type: ignore
return RedirectResponse(url="/", status_code=303)
# TODO: Show this error on the page.
logger.error(f"Could not find webhook with url {clean_webhook_url}")
return {"error": "Could not find webhook."}
@ -115,21 +128,26 @@ async def create_feed(feed_url=Form(), webhook_dropdown=Form()):
dict: The feed that was added.
"""
clean_feed_url: str = feed_url.strip()
logger.info(f"Adding feed {clean_feed_url} with webhook {webhook_dropdown}")
# TODO: Check if the feed is valid, if not return an error or fix it.
# For example, if the feed is missing the protocol, add it.
reader.add_feed(clean_feed_url)
reader.update_feed(clean_feed_url)
logger.debug(f"Added feed {clean_feed_url}")
# Mark every entry as read, so we don't send all the old entries to Discord.
entries: Iterable[Entry] = reader.get_entries(feed=clean_feed_url, read=False)
for entry in entries:
reader.set_entry_read(entry, True)
logger.debug(f"Marked entry {entry.title} as read.")
try:
hooks = reader.get_tag((), "webhooks")
logger.debug(f"Current webhooks: {hooks}")
except TagNotFoundError:
hooks = []
logger.error("No webhooks found in the database.")
webhook_url: str = ""
if hooks:
@ -141,15 +159,18 @@ async def create_feed(feed_url=Form(), webhook_dropdown=Form()):
if not webhook_url:
# TODO: Show this error on the page.
logger.error("No webhook URL found.")
return {"error": "No webhook URL found."}
# This is the webhook that will be used to send the feed to Discord.
reader.set_tag(clean_feed_url, "webhook", webhook_url) # type: ignore
reader.get_tag(clean_feed_url, "webhook")
logger.debug(f"Set webhook {webhook_url} for feed {clean_feed_url}")
# This is the default message that will be sent to Discord.
reader.set_tag(clean_feed_url, "custom_message", default_custom_message) # type: ignore
reader.get_tag(clean_feed_url, "custom_message")
logger.debug(f"Set custom message {default_custom_message} for feed {clean_feed_url}")
# Update the full-text search index so our new feed is searchable.
reader.update_search()
@ -167,11 +188,15 @@ async def pause_feed(feed_url=Form()):
Returns:
Redirect the URL to the feed we paused.
"""
logger.info(f"Pausing feed {feed_url}")
# Disable/pause the feed.
reader.disable_feed_updates(feed_url)
logger.debug(f"Paused feed {feed_url}")
# Clean URL is used to redirect to the feed page.
clean_url: str = urllib.parse.quote(feed_url)
logger.debug(f"Clean URL: {clean_url}")
return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303)
@ -186,11 +211,15 @@ async def unpause_feed(feed_url=Form()):
Returns:
Redirect to the feed we unpaused.
"""
logger.info(f"Unpausing feed {feed_url}")
# Enable/unpause the feed.
reader.enable_feed_updates(feed_url)
logger.debug(f"Unpaused feed {feed_url}")
# Clean URL is used to redirect to the feed page.
clean_url: str = urllib.parse.quote(feed_url)
logger.debug(f"Clean URL: {clean_url}")
return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303)
@ -215,13 +244,17 @@ async def set_whitelist(
"""
if whitelist_title:
reader.set_tag(feed_url, "whitelist_title", whitelist_title)
logger.info(f"Set whitelist_title to {whitelist_title} for feed {feed_url}")
if whitelist_summary:
reader.set_tag(feed_url, "whitelist_summary", whitelist_summary)
logger.info(f"Set whitelist_summary to {whitelist_summary} for feed {feed_url}")
if whitelist_content:
reader.set_tag(feed_url, "whitelist_content", whitelist_content)
logger.info(f"Set whitelist_content to {whitelist_content} for feed {feed_url}")
# Clean URL is used to redirect to the feed page.
clean_url = urllib.parse.quote(feed_url)
clean_url: str = urllib.parse.quote(feed_url)
logger.debug(f"Clean URL: {clean_url}")
return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303)
@ -239,6 +272,7 @@ async def get_whitelist(feed_url, request: Request):
"""
# Make feed_url a valid URL.
url: str = urllib.parse.unquote(feed_url)
logger.debug(f"URL: {url}")
feed: Feed = reader.get_feed(url)
@ -247,6 +281,10 @@ async def get_whitelist(feed_url, request: Request):
whitelist_summary: str = get_whitelist_summary(reader, feed)
whitelist_content: str = get_whitelist_content(reader, feed)
logger.debug(f"whitelist_title: {whitelist_title}")
logger.debug(f"whitelist_summary: {whitelist_summary}")
logger.debug(f"whitelist_content: {whitelist_content}")
context = {
"request": request,
"feed": feed,
@ -254,6 +292,7 @@ async def get_whitelist(feed_url, request: Request):
"whitelist_summary": whitelist_summary,
"whitelist_content": whitelist_content,
}
logger.debug(f"Context: {context}")
return templates.TemplateResponse("whitelist.html", context)
@ -599,9 +638,11 @@ def startup() -> None:
scheduler: BackgroundScheduler = BackgroundScheduler()
# Add our own listener to the scheduler. We use this so we can use loguru instead of the default logger.
scheduler.add_listener(my_listener)
# Update all feeds every 15 minutes.
scheduler.add_job(send_to_discord, "interval", minutes=15, next_run_time=datetime.now())
scheduler.start()