Remove every try except

This commit is contained in:
2022-12-08 12:34:06 +01:00
parent 09fa6ece48
commit e52c066824
2 changed files with 31 additions and 219 deletions

View File

@ -21,113 +21,17 @@ Exceptions:
Used in send_to_discord(). If no webhook found, it will raise NoWebhookFoundError. Used in send_to_discord(). If no webhook found, it will raise NoWebhookFoundError.
""" """
from typing import Any, Iterable from typing import Iterable
from discord_webhook import DiscordWebhook from discord_webhook import DiscordWebhook
from pydantic import BaseModel
from reader import ( from reader import (
Entry, Entry,
EntryNotFoundError,
FeedExistsError,
FeedNotFoundError,
InvalidFeedURLError,
ParseError,
StorageError,
TagNotFoundError,
) )
from requests import Response from requests import Response
from discord_rss_bot.settings import logger, reader from discord_rss_bot.settings import logger, reader
class IfFeedError(BaseModel):
"""Update a feed.
Attributes:
feed_url: The feed to update.
webhook: The webhook to use.
error: True if error, False if no error.
err_msg: The error message, if any.
exception: The exception, if any.
"""
feed_url: str
webhook: str
error: bool
err_msg: str = ""
exception: str = ""
class NoWebhookFoundError(Exception):
"""Raises an exception if no webhook is found.
Used in send_to_discord()."""
def __init__(self, message) -> None:
self.message = message
def __str__(self) -> Any:
return self.message
def add_feed(feed_url: str, webhook: str, exist_ok=False, allow_invalid_url=False) -> IfFeedError:
"""
Add a feed to reader. If error occurs, it will return IfFeedError with error=True.
Args:
feed_url: The feed to add.
webhook: The webhook to use.
exist_ok: If the feed already exists, do nothing.
allow_invalid_url: If the feed url is invalid, add it anyway.
Returns:
IfFeedError: Error or not.
"""
logger.debug(f"Adding feed: {feed_url}")
try:
reader.add_feed(feed_url, exist_ok=exist_ok, allow_invalid_url=allow_invalid_url)
except FeedExistsError as error:
error_msg = "Feed already exists"
logger.error(error_msg, exc_info=True)
return IfFeedError(
error=True,
err_msg=error_msg,
feed_url=feed_url,
webhook=webhook,
exception=error.message,
)
except InvalidFeedURLError as error:
error_msg = "Invalid feed URL"
logger.error(error_msg, exc_info=True)
return IfFeedError(
error=True,
err_msg=error_msg,
feed_url=feed_url,
webhook=webhook,
exception=error.message,
)
logger.debug(f"Successfully added feed: {feed_url}")
return IfFeedError(error=False, feed_url=feed_url, webhook=webhook)
def check_feed(feed_url: str) -> None:
"""Update a single feed and send its unread entries to Discord.
We don't need to mark entries as read here, because send_to_discord() does that when sending entries to Discord
if it was successful.
Args:
feed_url: The feed to check.
"""
if feed_url is None:
logger.error("No feed URL given")
send_to_discord(feed_url)
return
send_to_discord(feed_url)
def send_to_discord(feed=None) -> None: def send_to_discord(feed=None) -> None:
""" """
Send entries to Discord. Send entries to Discord.
@ -143,11 +47,11 @@ def send_to_discord(feed=None) -> None:
Returns: Returns:
Response: The response from the webhook. Response: The response from the webhook.
""" """
reader.update_feeds()
if feed is None: if feed is None:
reader.update_feeds()
entries: Iterable[Entry] = reader.get_entries(read=False) entries: Iterable[Entry] = reader.get_entries(read=False)
else: else:
reader.update_feed(feed)
entries: Iterable[Entry] = reader.get_entries(feed=feed, read=False) entries: Iterable[Entry] = reader.get_entries(feed=feed, read=False)
if not entries: if not entries:
@ -156,28 +60,11 @@ def send_to_discord(feed=None) -> None:
for entry in entries: for entry in entries:
logger.debug(f"Sending entry {entry} to Discord") logger.debug(f"Sending entry {entry} to Discord")
try:
reader.set_entry_read(entry, True)
logger.debug(f"New entry: {entry.title}")
except EntryNotFoundError:
logger.error("Entry not found", exc_info=True)
raise
except StorageError:
logger.error("Storage error", exc_info=True)
raise
try: reader.set_entry_read(entry, True)
webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook")) logger.debug(f"Entry {entry.title} marked as read")
except TagNotFoundError:
logger.error("Tag not found", exc_info=True)
raise
except StorageError:
logger.error("Storage error", exc_info=True)
raise
if not webhook_url: webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook"))
logger.error(f"No webhook found for feed: {entry.feed.url}")
raise NoWebhookFoundError(f"No webhook found for feed: {entry.feed.url}")
logger.debug(f"Sending to webhook: {webhook_url}") logger.debug(f"Sending to webhook: {webhook_url}")
webhook_message: str = f":robot: :mega: {entry.title}\n{entry.link}" webhook_message: str = f":robot: :mega: {entry.title}\n{entry.link}"
@ -186,53 +73,4 @@ def send_to_discord(feed=None) -> None:
if not response.ok: if not response.ok:
logger.error(f"Error: {response.status_code} {response.reason}") logger.error(f"Error: {response.status_code} {response.reason}")
reader.set_entry_read(entry, False) reader.set_entry_read(entry, False)
logger.debug(f"Entry {entry.title} marked as unread")
def update_feed(feed_url: str, webhook: str) -> IfFeedError:
"""
Update a feed.
Args:
feed_url: The feed to update.
webhook: The webhook to use.
Returns:
IfFeedError: Error or not.
"""
try:
reader.update_feed(feed_url)
except FeedNotFoundError as error:
error_msg = "Feed not found"
logger.error(error_msg, exc_info=True)
return IfFeedError(
error=True,
err_msg=error_msg,
feed_url=feed_url,
webhook=webhook,
exception=error.message,
)
except ParseError as error:
error_msg = "An error occurred while getting/parsing feed"
logger.error(error_msg, exc_info=True)
return IfFeedError(
error=True,
err_msg=error_msg,
feed_url=feed_url,
webhook=webhook,
exception=error.message,
)
except StorageError as error:
error_msg = "An exception was raised by the underlying storage"
logger.error(error_msg, exc_info=True)
return IfFeedError(
error=True,
err_msg=error_msg,
feed_url=feed_url,
webhook=webhook,
exception=error.message,
)
return IfFeedError(error=False, feed_url=feed_url, webhook=webhook)

View File

@ -36,12 +36,15 @@ from fastapi import FastAPI, Form, HTTPException, Request
from fastapi.responses import FileResponse, HTMLResponse from fastapi.responses import FileResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
from reader import EntryCounts, Feed, FeedCounts, FeedNotFoundError, ReaderError, ResourceNotFoundError, StorageError, \ from reader import (
TagNotFoundError EntryCounts,
Feed,
FeedCounts,
)
from starlette.templating import _TemplateResponse from starlette.templating import _TemplateResponse
from tomlkit.toml_document import TOMLDocument from tomlkit.toml_document import TOMLDocument
from discord_rss_bot.feeds import IfFeedError, add_feed, send_to_discord, update_feed from discord_rss_bot.feeds import send_to_discord
from discord_rss_bot.settings import logger, read_settings_file, reader from discord_rss_bot.settings import logger, read_settings_file, reader
app: FastAPI = FastAPI() app: FastAPI = FastAPI()
@ -77,37 +80,23 @@ async def create_feed(feed_url: str = Form(), webhook_dropdown: str = Form()) ->
feed_url = feed_url.strip() feed_url = feed_url.strip()
logger.debug(f"Stripped feed_url: {feed_url}") logger.debug(f"Stripped feed_url: {feed_url}")
logger.info(f"Add feed: {feed_url}") logger.info(f"Adding feed {feed_url} for {webhook_dropdown}")
logger.info(f"Webhook: {webhook_dropdown}") reader.add_feed(feed_url)
reader.update_feed(feed_url)
# Add a new feed to the database. # Mark every entry as read, so we don't send all the old entries to Discord.
added_feed: IfFeedError = add_feed(feed_url, webhook_dropdown) entries = reader.get_entries(feed=feed_url, read=False)
for entry in entries:
# Update a single feed. The feed will be updated even if updates are disabled for it. logger.debug(f"Marking {entry.title} as read")
updated_feed: IfFeedError = update_feed(feed_url, webhook_dropdown) reader.set_entry_read(entry, True)
if updated_feed.error or added_feed.error:
error_dict: dict[str, Any] = {
"error": updated_feed.error,
"feed": updated_feed.feed_url,
"webhook": updated_feed.webhook,
"exception": updated_feed.exception,
}
return HTTPException(status_code=500, detail=error_dict)
settings: TOMLDocument = read_settings_file() settings: TOMLDocument = read_settings_file()
logger.debug(f"Webhook name: {webhook_dropdown} with URL: {settings['webhooks'][webhook_dropdown]}") logger.debug(f"Webhook name: {webhook_dropdown} with URL: {settings['webhooks'][webhook_dropdown]}")
webhook_url: str = str(settings["webhooks"][webhook_dropdown]) webhook_url: str = str(settings["webhooks"][webhook_dropdown])
try: reader.set_tag(feed_url, "webhook", webhook_url)
reader.set_tag(feed_url, "webhook", webhook_url)
except ResourceNotFoundError as e:
error_msg: str = f"ResourceNotFoundError: Could not set webhook: {e}"
logger.error(error_msg, exc_info=True)
return HTTPException(status_code=500, detail=error_msg)
new_tag = reader.get_tag(feed_url, "webhook") # TODO: Go to the feed page.
logger.info(f"New tag: {new_tag}")
return {"feed_url": str(feed_url), "status": "added"} return {"feed_url": str(feed_url), "status": "added"}
@ -118,7 +107,7 @@ def create_list_of_webhooks() -> list[dict[str, str]]:
list_of_webhooks = [] list_of_webhooks = []
for hook in settings["webhooks"]: for hook in settings["webhooks"]:
logger.info(f"Webhook name: {hook} with URL: {settings['webhooks'][hook]}") logger.info(f"Webhook name: {hook} with URL: {settings['webhooks'][hook]}")
list_of_webhooks.append({"name": hook, "url": settings['webhooks'][hook]}) list_of_webhooks.append({"name": hook, "url": settings["webhooks"][hook]})
logger.debug(f"Hook: {hook}, URL: {settings['webhooks'][hook]}") logger.debug(f"Hook: {hook}, URL: {settings['webhooks'][hook]}")
logger.info(f"List of webhooks: {list_of_webhooks}") logger.info(f"List of webhooks: {list_of_webhooks}")
return list_of_webhooks return list_of_webhooks
@ -169,8 +158,9 @@ async def get_feed(feed_url: str, request: Request) -> _TemplateResponse:
# Get the entries in the feed. # Get the entries in the feed.
feed_counts: FeedCounts = reader.get_feed_counts(feed=feed_url) feed_counts: FeedCounts = reader.get_feed_counts(feed=feed_url)
return templates.TemplateResponse("feed.html", {"request": request, "feed": feed, "entries": entries, return templates.TemplateResponse(
"feed_counts": feed_counts}) "feed.html", {"request": request, "feed": feed, "entries": entries, "feed_counts": feed_counts}
)
@app.get("/", response_class=HTMLResponse) @app.get("/", response_class=HTMLResponse)
@ -205,10 +195,7 @@ def make_context_index(request) -> dict:
feeds: Iterable[Feed] = reader.get_feeds() feeds: Iterable[Feed] = reader.get_feeds()
for feed in feeds: for feed in feeds:
logger.debug(f"Feed: {feed}") logger.debug(f"Feed: {feed}")
try: hook = reader.get_tag(feed.url, "webhook")
hook = reader.get_tag(feed.url, "webhook")
except TagNotFoundError:
hook = "None"
feed_list.append({"feed": feed, "webhook": hook}) feed_list.append({"feed": feed, "webhook": hook})
feed_count: FeedCounts = reader.get_feed_counts() feed_count: FeedCounts = reader.get_feed_counts()
@ -235,14 +222,8 @@ async def remove_feed(request: Request, feed_url: str = Form()):
Returns: Returns:
HTMLResponse: The HTML response. HTMLResponse: The HTML response.
""" """
try:
reader.delete_feed(feed_url) reader.delete_feed(feed_url)
except FeedNotFoundError:
logger.error(f"Feed not found: {feed_url}")
return {"error": "Feed not found.", "feed": feed_url}
except StorageError:
logger.error(f"Storage error: {feed_url}")
return {"error": "Storage error.", "feed": feed_url}
logger.info(f"Deleted feed: {feed_url}") logger.info(f"Deleted feed: {feed_url}")
context = make_context_index(request) context = make_context_index(request)
@ -278,14 +259,7 @@ def shutdown() -> None:
It stops the scheduler.""" It stops the scheduler."""
scheduler: BackgroundScheduler = BackgroundScheduler() scheduler: BackgroundScheduler = BackgroundScheduler()
scheduler.shutdown() scheduler.shutdown()
logger.info("Scheduler stopped.") reader.close()
try:
reader.close()
except ReaderError:
logger.error("Error closing reader.", exc_info=True)
sys.exit()
logger.info("Reader closed.")
if __name__ == "__main__": if __name__ == "__main__":