You can now modify webhooks

This commit is contained in:
2023-08-08 02:07:17 +02:00
parent 9ba237a75a
commit 381c16596c
4 changed files with 78 additions and 45 deletions

View File

@ -4,6 +4,7 @@ from collections.abc import Iterable
from dataclasses import dataclass
from datetime import datetime, timezone
from functools import lru_cache
from typing import cast
import httpx
import uvicorn
@ -666,6 +667,51 @@ async def post_entry(entry_id: str): # noqa: ANN201
return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.post("/modify_webhook", response_class=HTMLResponse)
def modify_webhook(old_hook: str = Form(), new_hook: str = Form()): # noqa: ANN201
"""Modify a webhook.
Args:
old_hook: The webhook to modify.
new_hook: The new webhook.
Raises:
HTTPException: Webhook could not be modified.
"""
# Get current webhooks from the database if they exist otherwise use an empty list.
webhooks = list(reader.get_tag((), "webhooks", []))
# Webhooks are stored as a list of dictionaries.
# Example: [{"name": "webhook_name", "url": "webhook_url"}] # noqa: ERA001
webhooks = cast(list[dict[str, str]], webhooks)
for hook in webhooks:
if hook["url"] in old_hook.strip():
hook["url"] = new_hook.strip()
# Check if it has been modified.
if hook["url"] != new_hook.strip():
raise HTTPException(status_code=500, detail="Webhook could not be modified")
# Add our new list of webhooks to the database.
reader.set_tag((), "webhooks", webhooks) # type: ignore
# Loop through all feeds and update the webhook if it
# matches the old one.
feeds: Iterable[Feed] = reader.get_feeds()
for feed in feeds:
try:
webhook = reader.get_tag(feed, "webhook")
except TagNotFoundError:
continue
if webhook == old_hook.strip():
reader.set_tag(feed.url, "webhook", new_hook.strip()) # type: ignore
# Redirect to the webhook page.
return RedirectResponse(url="/webhooks", status_code=303)
@app.on_event("startup")
def startup() -> None:
"""This is called when the server starts.

View File

@ -29,23 +29,3 @@ def get_reader(custom_location: Path | None = None) -> Reader:
db_location: Path = custom_location or Path(data_dir) / "db.sqlite"
return make_reader(url=str(db_location))
def list_webhooks(reader: Reader) -> list[dict[str, str]]:
"""Get current webhooks from the database if they exist otherwise use an empty list.
Args:
reader: The reader to use.
Returns:
list[dict[str, str]]: The webhooks.
"""
webhooks: list[dict[str, str]] = []
# Get global tags
if reader.get_tags(()) is not None:
for tag in reader.get_tag_keys(()):
if tag == "webhooks":
webhooks = reader.get_tag((), "webhooks") # type: ignore
return webhooks

View File

@ -7,7 +7,7 @@
<h3>Available webhooks</h3>
<ul class="list-inline">
<a class="btn btn-primary" href="/add_webhook">Add new</a>
<br/>
<br />
{% for hook in hooks_with_data %}
<div class="p-2 border border-dark text-muted">
{% if hook.avatar is not none %}
@ -15,43 +15,41 @@
class="img-thumbnail"
height="128"
width="128"
alt="Webhook avatar"/>
alt="Webhook avatar" />
{% else %}
<img src="https://cdn.discordapp.com/embed/avatars/{{ hook.avatar_mod }}.png"
class="img-thumbnail"
height="128"
width="128"
alt="Default Discord avatar"/>
alt="Default Discord avatar" />
{% endif %}
<h3>{{ hook.custom_name }}</h3>
<li>
<strong>Name</strong>: {{ hook.name }}
</li>
<li>
<strong>Channel ID</strong>: {{ hook.channel_id }}
<strong>Webhook URL</strong>: <a class="text-muted" href="{{ hook.url }}">{{ hook.url }}</a>
</li>
<li>
<strong>Guild ID</strong>: {{ hook.guild_id }}
</li>
<li>
<strong>Webhook ID</strong>: {{ hook.webhook_id }}
</li>
<li>
<strong>Webhook token</strong>: {{ hook.token }}
</li>
<li>
<strong>Webhook type</strong>: {{ hook.webhook_type }}
</li>
<li>
<strong>Webhook URL</strong>: <a href="{{ hook.url }}">{{ hook.url }}</a>
</li>
<br/>
<br />
<form action="/modify_webhook" method="post">
<input type="hidden" name="old_hook" value="{{- hook.url -}}" />
<div class="row pb-2">
<label for="new_hook" class="col-sm-1 col-form-label">Modify webhook</label>
<div class="col-sm-9">
<input name="new_hook"
type="text"
class="form-control bg-dark border-dark text-muted"
id="new_hook" />
</div>
<button type="submit" class="btn btn-primary col-sm-1 ">Modify</button>
</div>
</form>
<form action="/delete_webhook" method="post">
<input type="hidden" name="webhook_url" value="{{- hook.url -}}"/>
<input type="hidden" name="webhook_url" value="{{- hook.url -}}" />
<button type="submit" class="btn btn-danger">Delete</button>
</form>
</div>
<br/>
<br />
{% endfor %}
</ul>
{% endblock content %}

View File

@ -1,8 +1,9 @@
from typing import cast
from fastapi import HTTPException
from reader import Reader
from discord_rss_bot.missing_tags import add_missing_tags
from discord_rss_bot.settings import list_webhooks
def add_webhook(reader: Reader, webhook_name: str, webhook_url: str) -> None:
@ -17,7 +18,11 @@ def add_webhook(reader: Reader, webhook_name: str, webhook_url: str) -> None:
HTTPException: This is raised when the webhook already exists
"""
# Get current webhooks from the database if they exist otherwise use an empty list.
webhooks: list[dict[str, str]] = list_webhooks(reader)
webhooks = list(reader.get_tag((), "webhooks", []))
# Webhooks are stored as a list of dictionaries.
# Example: [{"name": "webhook_name", "url": "webhook_url"}] # noqa: ERA001
webhooks = cast(list[dict[str, str]], webhooks)
# Only add the webhook if it doesn't already exist.
if all(webhook["name"] != webhook_name.strip() for webhook in webhooks):
@ -48,7 +53,11 @@ def remove_webhook(reader: Reader, webhook_url: str) -> None:
"""
# TODO: Replace HTTPException with a custom exception for both of these.
# Get current webhooks from the database if they exist otherwise use an empty list.
webhooks: list[dict[str, str]] = list_webhooks(reader)
webhooks = list(reader.get_tag((), "webhooks", []))
# Webhooks are stored as a list of dictionaries.
# Example: [{"name": "webhook_name", "url": "webhook_url"}] # noqa: ERA001
webhooks = cast(list[dict[str, str]], webhooks)
# Only add the webhook if it doesn't already exist.
for webhook in webhooks: