Remove Typer and use Fastapi instead

This commit is contained in:
2022-12-01 21:11:44 +01:00
parent 38b91b9084
commit 3bec59cdb6
9 changed files with 960 additions and 288 deletions

View File

View File

@ -1,220 +0,0 @@
import logging
import os
import sys
import time
from contextlib import closing
from pathlib import Path
from shutil import copyfile
import typer
from discord_webhook import DiscordWebhook
from reader import FeedExistsError, make_reader
# Add logging
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
app = typer.Typer()
app_dir = typer.get_app_dir("discord-rss-bot")
logging.debug(f"App dir: {app_dir}")
# Create the data directory if it doesn't exist
os.makedirs(app_dir, exist_ok=True)
# Store the database file in the data directory
db_name = os.getenv("DATABASE_NAME", "db.sqlite")
db_file: Path = Path(os.path.join(app_dir, db_name))
logging.debug(f"Database file: {db_file}")
# Convert Path to string
db_file_str: str = str(db_file)
logging.debug(f"Database file as string: {db_file_str}")
@app.command()
def add(
feed_url: str = typer.Argument(..., help="RSS or Atom feed URL."),
notify_discord: bool = typer.Option(True, help="Send message to Discord."),
) -> None:
"""Add a feed to the database
Args:
feed_url (str): The url of the feed to add
notify_discord (bool): Whether to send a message to Discord when
the feed is added.
"""
with closing(make_reader(db_file_str)) as reader:
try:
# Add the feed to the database
reader.add_feed(feed_url)
except FeedExistsError:
# If the feed already exists, print a message
typer.echo(f"{feed_url} already exists")
sys.exit()
# Update the feeds
reader.update_feeds()
# Mark the feed as read
entries = reader.get_entries(feed=feed_url, read=False)
for entry in entries:
logging.debug(f"Marking {entry.title} as read")
reader.mark_entry_as_read(entry)
if notify_discord:
# Send a message to Discord
webhook_msg = (
f"discord-rss-bot: {feed_url} added to the database.\n"
f"You now have {reader.get_feed_counts()} feeds."
)
webhook_url = reader.get_tag((), "webhook")
logging.debug(f"Webhook URL: {webhook_url}")
if not webhook_url:
typer.echo("No webhook URL found in the database.")
sys.exit()
webhook = DiscordWebhook(url=str(webhook_url), content=webhook_msg, rate_limit_retry=True)
response = webhook.execute()
if response.status_code != 204:
typer.echo(f"Error sending message to Discord - {response.status_code}\n{response.text}")
typer.echo(f"{feed_url} added")
@app.command()
def stats() -> None:
"""Print the amount feeds and entries in the database"""
with closing(make_reader(db_file_str)) as reader:
feed_count = reader.get_feed_counts()
entry_count = reader.get_entry_counts()
typer.echo(
f"""Feeds:
Total: {feed_count.total} feeds
Broken: {feed_count.broken} feeds
Enabled: {feed_count.updates_enabled} feeds"""
)
typer.echo(
f"""Entries:
Total: {entry_count.total} entries
Read: {entry_count.read} entries
Important: {entry_count.important} entries
Has enclosures: {entry_count.has_enclosures} entries
Average number of entries per day:
1 Month: {entry_count.averages[0]:.2f} entries per day
3 Months: {entry_count.averages[1]:.2f} entries per day
12 Months: {entry_count.averages[2]:.2f} entries per day"""
)
@app.command()
def check() -> None:
"""Check new entries for every feed"""
with closing(make_reader(db_file_str)) as reader:
# Update the feeds
reader.update_feeds()
# Get new entries that are not read
entries = reader.get_entries(read=False)
for entry in entries:
# Mark the entry as read
reader.mark_entry_as_read(entry)
logging.debug(f"Marking {entry.title} as read")
webhook_url = reader.get_tag((), "webhook")
logging.debug(f"Webhook URL: {webhook_url}")
if not webhook_url:
typer.echo("No webhook URL found in the database.")
sys.exit()
webhook = DiscordWebhook(url=str(webhook_url), content=f":robot: :mega: {entry.title}\n{entry.link}",
rate_limit_retry=True)
response = webhook.execute()
if response.status_code != 204:
typer.echo(f"Error sending message to Discord - {response.status_code}\n{response.text}")
@app.command()
def backup() -> None:
"""Backup the database"""
backup_dir = os.path.join(app_dir, "backup")
os.makedirs(backup_dir, exist_ok=True)
# Get the current time
current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
backup_file_location = os.path.join(app_dir, "backup", f"db_{current_time}.sqlite")
copyfile(db_file, backup_file_location)
typer.echo(f"{db_file} backed up to {backup_dir}")
@app.command()
def delete() -> None:
"""Delete a feed from the database"""
feed_dict = {}
feed_number = 0
message = ""
with closing(make_reader(db_file_str)) as reader:
for feed in reader.get_feeds():
logging.debug(f"Feed: {feed}")
feed_number += 1
logging.debug(f"Feed number: {feed_number}")
logging.debug(f"Feed URL: {feed.url}")
feed_dict[str(feed_number)] = feed.url
logging.debug(f"Feed dict: {feed_dict}")
message += f"{feed_number}: {feed.title}\n"
typer.echo(message)
feed_to_delete: str = typer.prompt("What feed do you want to remove?")
feed_url = feed_dict.get(str(feed_to_delete))
if not feed_url:
typer.echo("Invalid feed number")
sys.exit()
logging.debug(f"Feed URL: {feed_url}")
confirm_delete = typer.confirm(
f"Are you sure you want to delete {feed_url}?",
)
if not confirm_delete:
typer.echo("Not deleting")
raise typer.Abort()
reader.delete_feed(feed_url)
typer.echo(f"{feed_url} deleted")
@app.command()
def webhook_add(webhook_url: str) -> None:
"""Add a webhook to the database"""
with closing(make_reader(db_file_str)) as reader:
reader.set_tag((), "webhook", webhook_url)
typer.echo(f"Webhook set to {webhook_url}")
@app.command()
def webhook_get() -> None:
"""Get the webhook url"""
# TODO: Add name to output
with closing(make_reader(db_file_str)) as reader:
try:
webhook_url = reader.get_tag((), "webhook")
typer.echo(f"Webhook: {webhook_url}")
except Exception as e:
typer.echo("No webhook was found. Use `webhook add` to add one.")
typer.echo(f"Error: {e}\nPlease report this error to the developer.")
sys.exit()
if __name__ == "__main__":
app()

164
discord_rss_bot/main.py Normal file
View File

@ -0,0 +1,164 @@
import logging
from apscheduler.schedulers.background import BackgroundScheduler
from discord_webhook import DiscordWebhook
from fastapi import FastAPI, Form, Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from reader import make_reader
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
reader = make_reader("db.sqlite")
@app.post("/check", response_class=HTMLResponse)
def read_check_feed(request: Request, feed_url: str = Form()):
"""Check all feeds"""
reader.update_feeds()
entry = reader.get_entries(feed=feed_url, read=False)
_check_feed(entry)
logger.info(f"Get feed: {feed_url}")
feed = reader.get_feed(feed_url)
return templates.TemplateResponse("feed.html", {"request": request, "feed": feed})
def check_feeds() -> None:
"""Check all feeds"""
reader.update_feeds()
entries = reader.get_entries(read=False)
_check_feed(entries)
def check_feed(feed_url: str) -> None:
"""Check a single feed"""
reader.update_feeds()
entry = reader.get_entries(feed=feed_url, read=False)
_check_feed(entry)
def _check_feed(entries):
for entry in entries:
reader.mark_entry_as_read(entry)
print(f"New entry: {entry.title}")
webhook_url = reader.get_tag((), "webhook")
if webhook_url:
print(f"Sending to webhook: {webhook_url}")
webhook = DiscordWebhook(url=str(webhook_url), content=f":robot: :mega: New entry: {entry.title}\n"
f"{entry.link}", rate_limit_retry=True)
response = webhook.execute()
if not response.ok:
# TODO: Send error to discord
print(f"Error: {response.status_code} {response.reason}")
reader.mark_entry_as_unread(entry)
@app.on_event('startup')
def init_data():
"""Run on startup"""
scheduler = BackgroundScheduler()
scheduler.start()
@app.get("/", response_class=HTMLResponse)
def read_root(request: Request):
"""
This is the root of the website.
Args:
request:
Returns:
HTMLResponse: The HTML response.
"""
feeds = reader.get_feeds()
feed_count = reader.get_feed_counts()
entry_count = reader.get_entry_counts()
context = {"request": request,
"feeds": feeds,
"feed_count": feed_count,
"entry_count": entry_count}
return templates.TemplateResponse("index.html", context)
@app.post("/remove", response_class=HTMLResponse)
async def remove_feed(request: Request, feed_url: str = Form()):
"""
Get a feed by URL.
Args:
request: The request.
feed_url: The feed to add.
Returns:
HTMLResponse: The HTML response.
"""
logger.info(f"Get feed: {feed_url}")
feed = reader.get_feed(feed_url)
reader.delete_feed(feed_url)
return templates.TemplateResponse("index.html", {"request": request, "feed": feed})
@app.post("/feed", response_class=HTMLResponse)
async def get_feed(request: Request, feed_url: str = Form()):
"""
Get a feed by URL.
Args:
request: The request.
feed_url: The feed to add.
Returns:
HTMLResponse: The HTML response.
"""
logger.info(f"Get feed: {feed_url}")
feed = reader.get_feed(feed_url)
return templates.TemplateResponse("feed.html", {"request": request, "feed": feed})
@app.post("/global_webhook", response_class=HTMLResponse)
async def add_global_webhook(request: Request, webhook_url: str = Form()):
"""
Add a global webhook.
Args:
request: The request.
webhook_url: The webhook URL.
Returns:
HTMLResponse: The HTML response.
"""
logger.info(f"Add global webhook: {webhook_url}")
reader.set_tag("webhook", webhook_url)
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/add")
async def create_feed(feed_url: str = Form()):
"""
Add a feed to the database.
Args:
feed_url: The feed to add.
default_webhook: The default webhook to use.
Returns:
dict: The feed that was added.
"""
reader.add_feed(feed_url)
reader.update_feed(feed_url)
return {"feed_url": str(feed_url), "status": "added"}

View File

@ -0,0 +1,46 @@
import os
from pathlib import Path
from platformdirs import user_data_dir
def get_app_dir(app_dir: str = user_data_dir("discord_rss_bot")) -> Path:
"""
Get the application directory. This is where the database file is stored.
Args:
app_dir: The application directory, defaults to user_data_dir().
Returns:
Path: The application directory.
"""
print(f"Data directory: {app_dir}")
# Use the environment variable if it exists instead of the default app dir.
app_dir = os.getenv("DATABASE_LOCATION") or app_dir
# Create the data directory if it doesn't exist
os.makedirs(app_dir, exist_ok=True)
return Path(app_dir)
def get_db_file(custom_db_name: str = "db.sqlite") -> Path:
"""Where we store the database file
Args:
custom_db_name: The name of the database file, defaults to db.sqlite.
Returns:
Path: The database file.
"""
# Store the database file in the data directory
app_dir = get_app_dir()
# Use the environment variable if it exists instead of the default db name.
db_name = os.getenv("DATABASE_NAME") or custom_db_name
db_file: Path = Path(os.path.join(app_dir, db_name))
print(f"Database file: {db_file}")
return Path(db_file)

View File

@ -0,0 +1,26 @@
import sys
from contextlib import closing
from reader import make_reader
from discord_rss_bot.discord_rss_bot import db_file_str
def webhook_get() -> None:
"""Get the webhook url"""
# TODO: Add name to output
with closing(make_reader(db_file_str)) as reader:
try:
webhook_url = reader.get_tag((), "webhook")
print(f"Webhook: {webhook_url}")
except Exception as e:
print("No webhook was found. Use `webhook add` to add one.")
print(f"Error: {e}\nPlease report this error to the developer.")
sys.exit()
def webhook_add(webhook_url: str) -> None:
"""Add a webhook to the database"""
with closing(make_reader(db_file_str)) as reader:
reader.set_tag((), "webhook", webhook_url)
print(f"Webhook set to {webhook_url}")