Add config file, dropdown for webhooks, and sends stuff to Discord

This commit is contained in:
2022-12-02 16:47:47 +01:00
parent 3bec59cdb6
commit 400b72dbf4
11 changed files with 328 additions and 187 deletions

34
discord_rss_bot/feeds.py Normal file
View File

@ -0,0 +1,34 @@
from discord_webhook import DiscordWebhook
from discord_rss_bot.settings import logger, reader
def check_feeds() -> None:
"""Check all feeds"""
reader.update_feeds()
entries = reader.get_entries(read=False)
_check_feed(entries)
def check_feed(feed_url: str) -> None:
"""Check a single feed"""
reader.update_feeds()
entry = reader.get_entries(feed=feed_url, read=False)
_check_feed(entry, feed_url)
def _check_feed(entries, feed_url: str) -> None:
for entry in entries:
reader.mark_entry_as_read(entry)
logger.debug(f"New entry: {entry.title}")
webhook_url = reader.get_tag(feed_url, "webhook")
if webhook_url:
logger.debug(f"Sending to webhook: {webhook_url}")
webhook = DiscordWebhook(url=str(webhook_url), content=f":robot: :mega: New entry: {entry.title}\n"
f"{entry.link}", rate_limit_retry=True)
response = webhook.execute()
if not response.ok:
# TODO: Send error to discord
logger.error(f"Error: {response.status_code} {response.reason}")
reader.mark_entry_as_unread(entry)

View File

@ -1,31 +1,28 @@
import logging
import enum
import sys
import uvicorn
from apscheduler.schedulers.background import BackgroundScheduler
from discord_webhook import DiscordWebhook
from fastapi import FastAPI, Form, Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from reader import make_reader
from reader import FeedExistsError
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
from discord_rss_bot.feeds import _check_feed
from discord_rss_bot.settings import logger, read_settings_file, reader
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
reader = make_reader("db.sqlite")
@app.post("/check", response_class=HTMLResponse)
def read_check_feed(request: Request, feed_url: str = Form()):
def check_feed(request: Request, feed_url: str = Form()):
"""Check all feeds"""
reader.update_feeds()
entry = reader.get_entries(feed=feed_url, read=False)
_check_feed(entry)
_check_feed(entry, feed_url)
logger.info(f"Get feed: {feed_url}")
feed = reader.get_feed(feed_url)
@ -33,46 +30,25 @@ def read_check_feed(request: Request, feed_url: str = Form()):
return templates.TemplateResponse("feed.html", {"request": request, "feed": feed})
def check_feeds() -> None:
"""Check all feeds"""
reader.update_feeds()
entries = reader.get_entries(read=False)
_check_feed(entries)
def check_feed(feed_url: str) -> None:
"""Check a single feed"""
reader.update_feeds()
entry = reader.get_entries(feed=feed_url, read=False)
_check_feed(entry)
def _check_feed(entries):
for entry in entries:
reader.mark_entry_as_read(entry)
print(f"New entry: {entry.title}")
webhook_url = reader.get_tag((), "webhook")
if webhook_url:
print(f"Sending to webhook: {webhook_url}")
webhook = DiscordWebhook(url=str(webhook_url), content=f":robot: :mega: New entry: {entry.title}\n"
f"{entry.link}", rate_limit_retry=True)
response = webhook.execute()
if not response.ok:
# TODO: Send error to discord
print(f"Error: {response.status_code} {response.reason}")
reader.mark_entry_as_unread(entry)
@app.on_event('startup')
def init_data():
"""Run on startup"""
def startup():
"""This is called when the server starts.
It reads the settings file and starts the scheduler."""
settings = read_settings_file()
if not settings["webhooks"]:
logger.critical("No webhooks found in settings file.")
sys.exit()
for key in settings["webhooks"]:
logger.info(f"Webhook name: {key} with URL: {settings['webhooks'][key]}")
scheduler = BackgroundScheduler()
scheduler.start()
@app.get("/", response_class=HTMLResponse)
def read_root(request: Request):
def index(request: Request):
"""
This is the root of the website.
@ -82,14 +58,39 @@ def read_root(request: Request):
Returns:
HTMLResponse: The HTML response.
"""
context = make_context_index(request)
return templates.TemplateResponse("index.html", context)
def make_context_index(request) -> dict:
"""
Create the needed context for the index page.
Used by / and /add.
Args:
request: The request.
Returns:
dict: The context.
"""
hooks = create_list_of_webhooks()
for hook in hooks:
logger.info(f"Webhook name: {hook.name}")
feed_list = list()
feeds = reader.get_feeds()
for feed in feeds:
feed_list.append(feed)
feed_count = reader.get_feed_counts()
entry_count = reader.get_entry_counts()
context = {"request": request,
"feeds": feeds,
"feeds": feed_list,
"feed_count": feed_count,
"entry_count": entry_count}
return templates.TemplateResponse("index.html", context)
"entry_count": entry_count,
"webhooks": hooks}
return context
@app.post("/remove", response_class=HTMLResponse)
@ -129,36 +130,53 @@ async def get_feed(request: Request, feed_url: str = Form()):
return templates.TemplateResponse("feed.html", {"request": request, "feed": feed})
@app.post("/global_webhook", response_class=HTMLResponse)
async def add_global_webhook(request: Request, webhook_url: str = Form()):
"""
Add a global webhook.
def create_list_of_webhooks():
"""List with webhooks."""
logger.info("Creating list with webhooks.")
settings = read_settings_file()
list_of_webhooks = dict()
for hook in settings["webhooks"]:
logger.info(f"Webhook name: {hook} with URL: {settings['webhooks'][hook]}")
list_of_webhooks[hook] = settings["webhooks"][hook]
Args:
request: The request.
webhook_url: The webhook URL.
logger.info(f"List of webhooks: {list_of_webhooks}")
return enum.Enum("DiscordWebhooks", list_of_webhooks)
Returns:
HTMLResponse: The HTML response.
"""
logger.info(f"Add global webhook: {webhook_url}")
reader.set_tag("webhook", webhook_url)
return templates.TemplateResponse("index.html", {"request": request})
def get_hook_by_name(name):
"""Get a webhook by name."""
settings = read_settings_file()
logger.debug(f"Webhook name: {name} with URL: {settings['webhooks'][name]}")
return settings["webhooks"][name]
@app.post("/add")
async def create_feed(feed_url: str = Form()):
async def create_feed(feed_url: str = Form(), webhook_dropdown: str = Form()):
"""
Add a feed to the database.
Args:
feed_url: The feed to add.
default_webhook: The default webhook to use.
webhook_dropdown: The webhook to use.
Returns:
dict: The feed that was added.
"""
reader.add_feed(feed_url)
logger.info(f"Add feed: {feed_url}")
logger.info(f"Webhook: {webhook_dropdown}")
try:
reader.add_feed(feed_url)
except FeedExistsError as error:
logger.error(f"Feed already exists: {error}")
return {"error": "Feed already exists."}
reader.update_feed(feed_url)
webhook_url = get_hook_by_name(webhook_dropdown)
reader.set_tag(feed_url, "webhook", webhook_url)
return {"feed_url": str(feed_url), "status": "added"}
new_tag = reader.get_tag(feed_url, "webhook")
logger.info(f"New tag: {new_tag}")
return {"feed_url": str(feed_url), "status": "added", "webhook": webhook_url}
if __name__ == "__main__":
uvicorn.run("main:app", log_level="debug")

View File

@ -1,28 +1,44 @@
import functools
import logging
import os
from pathlib import Path
from platformdirs import user_data_dir
from reader import make_reader
from tomlkit import TOMLDocument, comment, document, parse, table
logging.basicConfig(
level=logging.DEBUG,
format="[%(asctime)s] [%(funcName)s:%(lineno)d] %(message)s",
)
logger = logging.getLogger(__name__)
# For get_data_dir()
data_directory = user_data_dir(appname="discord_rss_bot", appauthor="TheLovinator", roaming=True)
def get_app_dir(app_dir: str = user_data_dir("discord_rss_bot")) -> Path:
def get_data_dir(data_dir: str = data_directory) -> Path:
"""
Get the application directory. This is where the database file is stored.
Get the data directory. This is where the database file and config file are stored.
Args:
app_dir: The application directory, defaults to user_data_dir().
data_dir: The application directory, defaults to user_data_dir().
Returns:
Path: The application directory.
"""
print(f"Data directory: {app_dir}")
if data_dir != user_data_dir("discord_rss_bot"):
logger.info(f"Using custom data directory: {data_dir}")
# Use the environment variable if it exists instead of the default app dir.
app_dir = os.getenv("DATABASE_LOCATION") or app_dir
data_dir = os.getenv("DATA_DIR") or data_dir
logger.debug(f"Data directory: {data_dir}")
# Create the data directory if it doesn't exist
os.makedirs(app_dir, exist_ok=True)
os.makedirs(data_dir, exist_ok=True)
return Path(app_dir)
return Path(data_dir)
def get_db_file(custom_db_name: str = "db.sqlite") -> Path:
@ -34,13 +50,76 @@ def get_db_file(custom_db_name: str = "db.sqlite") -> Path:
Returns:
Path: The database file.
"""
if custom_db_name != "db.sqlite":
logger.info(f"Using custom database file: {custom_db_name}")
# Store the database file in the data directory
app_dir = get_app_dir()
data_dir = get_data_dir()
db_location: Path = Path(os.path.join(data_dir, custom_db_name))
# Use the environment variable if it exists instead of the default db name.
db_name = os.getenv("DATABASE_NAME") or custom_db_name
db_file: Path = Path(os.path.join(app_dir, db_name))
print(f"Database file: {db_file}")
db_file = os.getenv("DATABASE_LOCATION") or db_location
logger.debug(f"Database file: {db_file}")
return Path(db_file)
def _create_settings_file(settings_file) -> None:
"""Create the settings file if it doesn't exist."""
logger.debug(f"Settings file: {settings_file}")
# [webhooks]
# Both options are commented out by default.
webhooks = table()
webhooks.add(comment('"First webhook" = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz"'))
webhooks.add(comment('"Second webhook" = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz"'))
# [database]
# Option is commented out by default.
database = table()
database.add(comment('"location" = "/path/to/database/file"'))
doc = document()
doc.add("webhooks", webhooks)
doc.add("database", database)
logger.debug(f"Settings file: {doc}")
logger.debug(f"Settings file as TOML: {doc.as_string()}")
# Write the settings file
with open(settings_file, "w") as f:
f.write(doc.as_string())
def read_settings_file(custom_settings_name: str = "settings.toml") -> TOMLDocument:
"""Read the settings file
Args:
custom_settings_name: The name of the settings file, defaults to settings.toml.
Returns:
dict: The settings file as a dict.
"""
if custom_settings_name != "settings.toml":
logger.info(f"Using custom name for settings file: {custom_settings_name}")
# Store the database file in the data directory
data_dir = get_data_dir()
settings_file_location: Path = Path(os.path.join(data_dir, custom_settings_name))
# Use the environment variable if it exists instead of the default db name.
settings_file = os.getenv("SETTINGS_FILE_LOCATION") or settings_file_location
logger.debug(f"Settings file: {settings_file}")
# Create the settings file if it doesn't exist
if not os.path.exists(settings_file):
_create_settings_file(settings_file)
with open(settings_file, encoding="utf-8") as f:
data = parse(f.read())
logger.debug(f"Contents of settings file: {data}")
return data
reader = make_reader(str(get_db_file()))

View File

@ -0,0 +1,33 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Feed</title>
</head>
<body>
URL: {{ feed.url }} <br>
Title: {{ feed.title }} <br>
Updated: {{ feed.updated }} <br>
Link: {{ feed.link }} <br>
Author: {{ feed.author }} <br>
Subtitle: {{ feed.subtitle }} <br>
Version: {{ feed.version }} <br>
User title: {{ feed.user_title }} <br>
Added on: {{ feed.added }} <br>
Last update: {{ feed.last_update }} <br>
Last exception: {{ feed.last_exception }} <br>
Updates enabled: {{ feed.updates_enabled }} <br>
<form action="/check" method="post">
<button type="submit" name="feed_url" value="{{ feed.url }}">
Send new entries to Discord.
</button>
</form>
<form action="/remove" method="post">
<button type="submit" name="feed_url" value="{{ feed.url }}">
Remove feed.
</button>
</form>
</body>
</html>

View File

@ -0,0 +1,70 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Index</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
</head>
<body>
<!-- Create a new feed -->
<form action="/add" method="post">
<label>
<input type="text" name="feed_url" placeholder="Feed URL">
</label>
<label for="webhook_dropdown">Choose:</label>
<select id="webhook_dropdown" name="webhook_dropdown">
{% for hook in webhooks %}
<!-- {{ hook.name }} {{ hook.value }} -->
<option value="{{ hook.name }}">{{ hook.name }}</option>
{% endfor %}
</select>
<input type="submit" value="Add feed">
</form>
<!-- List all feeds -->
{% for tag in tags %}
{{ tag }}
{% endfor %}
<ul>
<!-- Check if any feeds -->
{% if feeds %}
{% for feed in feeds %}
<form action="/feed" method="post">
<button type="submit" name="feed_url" value="{{ feed.url }}">
{{ feed.url }}
</button>
</form>
{% endfor %}
{% else %}
<p>No feeds yet</p>
{% endif %}
</ul>
<!-- Feed stats -->
<hr>
<ul>
<li>
<p>Feed stats:</p>
<p>Total: {{ feed_count.total }} feeds</p>
<p>Broken: {{ feed_count.broken }} feeds</p>
<p>Enabled: {{ feed_count.updates_enabled }} feeds</p>
</li>
</ul>
<!-- Feed entries stats -->
<ul>
<li>
<p>Feed entries:</p>
<p>Total: {{ entry_count.total }} entries</p>
<p>Read: {{ entry_count.broken }} entries</p>
<p>Enabled: {{ entry_count.updates_enabled }} entries</p>
<p>Important: {{ entry_count.important }} entries</p>
<p>Has enclosures: {{ entry_count.has_enclosures }} entries</p>
</li>
</ul>
</body>
</html>

View File

@ -1,26 +0,0 @@
import sys
from contextlib import closing
from reader import make_reader
from discord_rss_bot.discord_rss_bot import db_file_str
def webhook_get() -> None:
"""Get the webhook url"""
# TODO: Add name to output
with closing(make_reader(db_file_str)) as reader:
try:
webhook_url = reader.get_tag((), "webhook")
print(f"Webhook: {webhook_url}")
except Exception as e:
print("No webhook was found. Use `webhook add` to add one.")
print(f"Error: {e}\nPlease report this error to the developer.")
sys.exit()
def webhook_add(webhook_url: str) -> None:
"""Add a webhook to the database"""
with closing(make_reader(db_file_str)) as reader:
reader.set_tag((), "webhook", webhook_url)
print(f"Webhook set to {webhook_url}")