Fix all the bugs
This commit is contained in:
@ -1,25 +1,32 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
from reader import Entry, Feed, Reader, TagNotFoundError
|
||||
|
||||
from discord_rss_bot.is_url_valid import is_url_valid
|
||||
from discord_rss_bot.markdown import convert_html_to_md
|
||||
from discord_rss_bot.settings import get_reader
|
||||
from discord_rss_bot.settings import get_reader, logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from reader.types import JSONType
|
||||
|
||||
|
||||
@dataclass()
|
||||
@dataclass(slots=True)
|
||||
class CustomEmbed:
|
||||
title: str
|
||||
description: str
|
||||
color: str
|
||||
author_name: str
|
||||
author_url: str
|
||||
author_icon_url: str
|
||||
image_url: str
|
||||
thumbnail_url: str
|
||||
footer_text: str
|
||||
footer_icon_url: str
|
||||
title: str = ""
|
||||
description: str = ""
|
||||
color: str = ""
|
||||
author_name: str = ""
|
||||
author_url: str = ""
|
||||
author_icon_url: str = ""
|
||||
image_url: str = ""
|
||||
thumbnail_url: str = ""
|
||||
footer_text: str = ""
|
||||
footer_icon_url: str = ""
|
||||
|
||||
|
||||
def try_to_replace(custom_message: str, template: str, replace_with: str) -> str:
|
||||
@ -59,7 +66,7 @@ def replace_tags_in_text_message(entry: Entry) -> str:
|
||||
|
||||
summary: str = entry.summary or ""
|
||||
|
||||
first_image = get_first_image(summary, content)
|
||||
first_image: str = get_first_image(summary, content)
|
||||
|
||||
summary = convert_html_to_md(summary)
|
||||
content = convert_html_to_md(content)
|
||||
@ -102,7 +109,7 @@ def replace_tags_in_text_message(entry: Entry) -> str:
|
||||
return custom_message.replace("\\n", "\n")
|
||||
|
||||
|
||||
def get_first_image(summary, content):
|
||||
def get_first_image(summary: str | None, content: str | None) -> str:
|
||||
"""Get image from summary or content.
|
||||
|
||||
Args:
|
||||
@ -112,10 +119,25 @@ def get_first_image(summary, content):
|
||||
Returns:
|
||||
The first image
|
||||
"""
|
||||
# TODO(TheLovinator): We should find a better way to get the image.
|
||||
if content and (images := BeautifulSoup(content, features="lxml").find_all("img")):
|
||||
return images[0].attrs["src"]
|
||||
for image in images:
|
||||
if not is_url_valid(image.attrs["src"]):
|
||||
logger.warning(f"Invalid URL: {image.attrs['src']}")
|
||||
continue
|
||||
|
||||
# Genshins first image is a divider, so we ignore it.
|
||||
if not image.attrs["src"].startswith("https://img-os-static.hoyolab.com/divider_config"):
|
||||
return str(image.attrs["src"])
|
||||
if summary and (images := BeautifulSoup(summary, features="lxml").find_all("img")):
|
||||
return images[0].attrs["src"]
|
||||
for image in images:
|
||||
if not is_url_valid(image.attrs["src"]):
|
||||
logger.warning(f"Invalid URL: {image.attrs['src']}")
|
||||
continue
|
||||
|
||||
# Genshins first image is a divider, so we ignore it.
|
||||
if not image.attrs["src"].startswith("https://img-os-static.hoyolab.com/divider_config"):
|
||||
return str(image.attrs["src"])
|
||||
return ""
|
||||
|
||||
|
||||
@ -139,42 +161,47 @@ def replace_tags_in_embed(feed: Feed, entry: Entry) -> CustomEmbed:
|
||||
|
||||
summary: str = entry.summary or ""
|
||||
|
||||
first_image = get_first_image(summary, content)
|
||||
first_image: str = get_first_image(summary, content)
|
||||
|
||||
summary = convert_html_to_md(summary)
|
||||
content = convert_html_to_md(content)
|
||||
|
||||
entry_text: str = content or summary
|
||||
feed_added: str = feed.added.strftime("%Y-%m-%d %H:%M:%S") if feed.added else "Never"
|
||||
feed_last_updated: str = feed.last_updated.strftime("%Y-%m-%d %H:%M:%S") if feed.last_updated else "Never"
|
||||
feed_updated: str = feed.updated.strftime("%Y-%m-%d %H:%M:%S") if feed.updated else "Never"
|
||||
entry_added: str = entry.added.strftime("%Y-%m-%d %H:%M:%S") if entry.added else "Never"
|
||||
entry_published: str = entry.published.strftime("%Y-%m-%d %H:%M:%S") if entry.published else "Never"
|
||||
entry_read_modified: str = entry.read_modified.strftime("%Y-%m-%d %H:%M:%S") if entry.read_modified else "Never"
|
||||
entry_updated: str = entry.updated.strftime("%Y-%m-%d %H:%M:%S") if entry.updated else "Never"
|
||||
|
||||
list_of_replacements = [
|
||||
{"{{feed_author}}": feed.author},
|
||||
{"{{feed_added}}": feed.added},
|
||||
{"{{feed_last_exception}}": feed.last_exception},
|
||||
{"{{feed_last_updated}}": feed.last_updated},
|
||||
{"{{feed_link}}": feed.link},
|
||||
{"{{feed_subtitle}}": feed.subtitle},
|
||||
{"{{feed_title}}": feed.title},
|
||||
{"{{feed_updated}}": feed.updated},
|
||||
{"{{feed_updates_enabled}}": str(feed.updates_enabled)},
|
||||
{"{{feed_url}}": feed.url},
|
||||
{"{{feed_user_title}}": feed.user_title},
|
||||
{"{{feed_version}}": feed.version},
|
||||
{"{{entry_added}}": entry.added},
|
||||
{"{{entry_author}}": entry.author},
|
||||
{"{{entry_content}}": content},
|
||||
list_of_replacements: list[dict[str, str]] = [
|
||||
{"{{feed_author}}": feed.author or ""},
|
||||
{"{{feed_added}}": feed_added or ""},
|
||||
{"{{feed_last_updated}}": feed_last_updated or ""},
|
||||
{"{{feed_link}}": feed.link or ""},
|
||||
{"{{feed_subtitle}}": feed.subtitle or ""},
|
||||
{"{{feed_title}}": feed.title or ""},
|
||||
{"{{feed_updated}}": feed_updated or ""},
|
||||
{"{{feed_updates_enabled}}": "True" if feed.updates_enabled else "False"},
|
||||
{"{{feed_url}}": feed.url or ""},
|
||||
{"{{feed_user_title}}": feed.user_title or ""},
|
||||
{"{{feed_version}}": feed.version or ""},
|
||||
{"{{entry_added}}": entry_added or ""},
|
||||
{"{{entry_author}}": entry.author or ""},
|
||||
{"{{entry_content}}": content or ""},
|
||||
{"{{entry_content_raw}}": entry.content[0].value if entry.content else ""},
|
||||
{"{{entry_id}}": entry.id},
|
||||
{"{{entry_important}}": str(entry.important)},
|
||||
{"{{entry_link}}": entry.link},
|
||||
{"{{entry_published}}": entry.published},
|
||||
{"{{entry_read}}": str(entry.read)},
|
||||
{"{{entry_read_modified}}": entry.read_modified},
|
||||
{"{{entry_summary}}": summary},
|
||||
{"{{entry_important}}": "True" if entry.important else "False"},
|
||||
{"{{entry_link}}": entry.link or ""},
|
||||
{"{{entry_published}}": entry_published},
|
||||
{"{{entry_read}}": "True" if entry.read else "False"},
|
||||
{"{{entry_read_modified}}": entry_read_modified or ""},
|
||||
{"{{entry_summary}}": summary or ""},
|
||||
{"{{entry_summary_raw}}": entry.summary or ""},
|
||||
{"{{entry_title}}": entry.title},
|
||||
{"{{entry_text}}": entry_text},
|
||||
{"{{entry_updated}}": entry.updated},
|
||||
{"{{image_1}}": first_image},
|
||||
{"{{entry_text}}": content or summary or ""},
|
||||
{"{{entry_title}}": entry.title or ""},
|
||||
{"{{entry_updated}}": entry_updated or ""},
|
||||
{"{{image_1}}": first_image or ""},
|
||||
]
|
||||
|
||||
for replacement in list_of_replacements:
|
||||
@ -246,9 +273,10 @@ def get_embed(custom_reader: Reader, feed: Feed) -> CustomEmbed:
|
||||
Returns:
|
||||
Returns the contents from the embed tag.
|
||||
"""
|
||||
if embed := custom_reader.get_tag(feed, "embed", ""):
|
||||
if type(embed) != str:
|
||||
return get_embed_data(embed)
|
||||
embed: str | JSONType = custom_reader.get_tag(feed, "embed", "")
|
||||
if embed:
|
||||
if not isinstance(embed, str):
|
||||
return get_embed_data(embed) # type: ignore
|
||||
embed_data: dict[str, str | int] = json.loads(embed)
|
||||
return get_embed_data(embed_data)
|
||||
|
||||
@ -266,7 +294,7 @@ def get_embed(custom_reader: Reader, feed: Feed) -> CustomEmbed:
|
||||
)
|
||||
|
||||
|
||||
def get_embed_data(embed_data) -> CustomEmbed:
|
||||
def get_embed_data(embed_data: dict[str, str | int]) -> CustomEmbed:
|
||||
"""Get embed data from embed_data.
|
||||
|
||||
Args:
|
||||
@ -275,16 +303,16 @@ def get_embed_data(embed_data) -> CustomEmbed:
|
||||
Returns:
|
||||
Returns the embed data.
|
||||
"""
|
||||
title: str = embed_data.get("title", "")
|
||||
description: str = embed_data.get("description", "")
|
||||
color: str = embed_data.get("color", "")
|
||||
author_name: str = embed_data.get("author_name", "")
|
||||
author_url: str = embed_data.get("author_url", "")
|
||||
author_icon_url: str = embed_data.get("author_icon_url", "")
|
||||
image_url: str = embed_data.get("image_url", "")
|
||||
thumbnail_url: str = embed_data.get("thumbnail_url", "")
|
||||
footer_text: str = embed_data.get("footer_text", "")
|
||||
footer_icon_url: str = embed_data.get("footer_icon_url", "")
|
||||
title: str = str(embed_data.get("title", ""))
|
||||
description: str = str(embed_data.get("description", ""))
|
||||
color: str = str(embed_data.get("color", ""))
|
||||
author_name: str = str(embed_data.get("author_name", ""))
|
||||
author_url: str = str(embed_data.get("author_url", ""))
|
||||
author_icon_url: str = str(embed_data.get("author_icon_url", ""))
|
||||
image_url: str = str(embed_data.get("image_url", ""))
|
||||
thumbnail_url: str = str(embed_data.get("thumbnail_url", ""))
|
||||
footer_text: str = str(embed_data.get("footer_text", ""))
|
||||
footer_icon_url: str = str(embed_data.get("footer_icon_url", ""))
|
||||
|
||||
return CustomEmbed(
|
||||
title=title,
|
||||
|
@ -1,3 +1,8 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import pprint
|
||||
import textwrap
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from discord_webhook import DiscordEmbed, DiscordWebhook
|
||||
@ -7,7 +12,8 @@ from reader import Entry, Feed, FeedExistsError, Reader, TagNotFoundError
|
||||
from discord_rss_bot import custom_message
|
||||
from discord_rss_bot.filter.blacklist import should_be_skipped
|
||||
from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent
|
||||
from discord_rss_bot.settings import default_custom_message, get_reader
|
||||
from discord_rss_bot.is_url_valid import is_url_valid
|
||||
from discord_rss_bot.settings import default_custom_message, get_reader, logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterable
|
||||
@ -38,7 +44,10 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) ->
|
||||
if custom_message.get_custom_message(reader, entry.feed) != "": # noqa: PLC1901
|
||||
webhook_message = custom_message.replace_tags_in_text_message(entry=entry)
|
||||
else:
|
||||
webhook_message: str = default_custom_message
|
||||
webhook_message: str = str(default_custom_message)
|
||||
|
||||
if not webhook_message:
|
||||
webhook_message = "No message found."
|
||||
|
||||
# Create the webhook.
|
||||
if bool(reader.get_tag(entry.feed, "should_send_embed")):
|
||||
@ -47,7 +56,10 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) ->
|
||||
webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True)
|
||||
|
||||
response: Response = webhook.execute()
|
||||
return None if response.ok else f"Error sending entry to Discord: {response.text}"
|
||||
if response.status_code not in {200, 204}:
|
||||
logger.error("Error sending entry to Discord: %s\n%s", response.text, pprint.pformat(webhook.json))
|
||||
return f"Error sending entry to Discord: {response.text}"
|
||||
return None
|
||||
|
||||
|
||||
def create_embed_webhook(webhook_url: str, entry: Entry) -> DiscordWebhook:
|
||||
@ -68,42 +80,57 @@ def create_embed_webhook(webhook_url: str, entry: Entry) -> DiscordWebhook:
|
||||
|
||||
discord_embed: DiscordEmbed = DiscordEmbed()
|
||||
|
||||
if custom_embed.title:
|
||||
discord_embed.set_title(custom_embed.title)
|
||||
if custom_embed.description:
|
||||
discord_embed.set_description(custom_embed.description)
|
||||
if custom_embed.color and type(custom_embed.color) == str and custom_embed.color.startswith("#"):
|
||||
custom_embed.color = custom_embed.color[1:]
|
||||
discord_embed.set_color(int(custom_embed.color, 16))
|
||||
if custom_embed.author_name and not custom_embed.author_url and not custom_embed.author_icon_url:
|
||||
embed_title: str = textwrap.shorten(custom_embed.title, width=200, placeholder="...")
|
||||
discord_embed.set_title(embed_title) if embed_title else None
|
||||
|
||||
webhook_message: str = textwrap.shorten(custom_embed.description, width=2000, placeholder="...")
|
||||
discord_embed.set_description(webhook_message) if webhook_message else None
|
||||
|
||||
custom_embed_author_url: str | None = custom_embed.author_url
|
||||
if not is_url_valid(custom_embed_author_url):
|
||||
custom_embed_author_url = None
|
||||
|
||||
custom_embed_color: str | None = custom_embed.color or None
|
||||
if custom_embed_color and custom_embed_color.startswith("#"):
|
||||
custom_embed_color = custom_embed_color[1:]
|
||||
discord_embed.set_color(int(custom_embed_color, 16))
|
||||
|
||||
if custom_embed.author_name and not custom_embed_author_url and not custom_embed.author_icon_url:
|
||||
discord_embed.set_author(name=custom_embed.author_name)
|
||||
if custom_embed.author_name and custom_embed.author_url and not custom_embed.author_icon_url:
|
||||
discord_embed.set_author(name=custom_embed.author_name, url=custom_embed.author_url)
|
||||
if custom_embed.author_name and not custom_embed.author_url and custom_embed.author_icon_url:
|
||||
|
||||
if custom_embed.author_name and custom_embed_author_url and not custom_embed.author_icon_url:
|
||||
discord_embed.set_author(name=custom_embed.author_name, url=custom_embed_author_url)
|
||||
|
||||
if custom_embed.author_name and not custom_embed_author_url and custom_embed.author_icon_url:
|
||||
discord_embed.set_author(name=custom_embed.author_name, icon_url=custom_embed.author_icon_url)
|
||||
if custom_embed.author_name and custom_embed.author_url and custom_embed.author_icon_url:
|
||||
|
||||
if custom_embed.author_name and custom_embed_author_url and custom_embed.author_icon_url:
|
||||
discord_embed.set_author(
|
||||
name=custom_embed.author_name,
|
||||
url=custom_embed.author_url,
|
||||
url=custom_embed_author_url,
|
||||
icon_url=custom_embed.author_icon_url,
|
||||
)
|
||||
|
||||
if custom_embed.thumbnail_url:
|
||||
discord_embed.set_thumbnail(url=custom_embed.thumbnail_url)
|
||||
|
||||
if custom_embed.image_url:
|
||||
discord_embed.set_image(url=custom_embed.image_url)
|
||||
|
||||
if custom_embed.footer_text:
|
||||
discord_embed.set_footer(text=custom_embed.footer_text)
|
||||
|
||||
if custom_embed.footer_icon_url and custom_embed.footer_text:
|
||||
discord_embed.set_footer(text=custom_embed.footer_text, icon_url=custom_embed.footer_icon_url)
|
||||
|
||||
if custom_embed.footer_icon_url and not custom_embed.footer_text:
|
||||
discord_embed.set_footer(text="-", icon_url=custom_embed.footer_icon_url)
|
||||
|
||||
webhook.add_embed(discord_embed)
|
||||
|
||||
return webhook
|
||||
|
||||
|
||||
def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = None, do_once: bool = False) -> None:
|
||||
def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: # noqa: PLR0912
|
||||
"""Send entries to Discord.
|
||||
|
||||
If response was not ok, we will log the error and mark the entry as unread, so it will be sent again next time.
|
||||
@ -125,6 +152,11 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non
|
||||
# Loop through the unread entries.
|
||||
entries: Iterable[Entry] = reader.get_entries(feed=feed, read=False)
|
||||
for entry in entries:
|
||||
if entry.added < datetime.datetime.now(tz=entry.added.tzinfo) - datetime.timedelta(days=1):
|
||||
logger.info("Entry is older than 24 hours: %s from %s", entry.id, entry.feed.url)
|
||||
reader.set_entry_read(entry, True)
|
||||
continue
|
||||
|
||||
# Set the webhook to read, so we don't send it again.
|
||||
reader.set_entry_read(entry, True)
|
||||
|
||||
@ -138,10 +170,13 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non
|
||||
else:
|
||||
# If the user has set the custom message to an empty string, we will use the default message, otherwise we
|
||||
# will use the custom message.
|
||||
if custom_message.get_custom_message(reader, entry.feed) != "":
|
||||
if custom_message.get_custom_message(reader, entry.feed) != "": # noqa: PLC1901
|
||||
webhook_message = custom_message.replace_tags_in_text_message(entry)
|
||||
else:
|
||||
webhook_message: str = default_custom_message
|
||||
webhook_message: str = str(default_custom_message)
|
||||
|
||||
# Truncate the webhook_message to 2000 characters
|
||||
webhook_message = textwrap.shorten(webhook_message, width=2000, placeholder="...")
|
||||
|
||||
# Create the webhook.
|
||||
webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True)
|
||||
@ -150,25 +185,30 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non
|
||||
if has_white_tags(reader, entry.feed):
|
||||
if should_be_sent(reader, entry):
|
||||
response: Response = webhook.execute()
|
||||
if response.status_code not in {200, 204}:
|
||||
logger.error("Error sending entry to Discord: %s\n%s", response.text, pprint.pformat(webhook.json))
|
||||
|
||||
reader.set_entry_read(entry, True)
|
||||
if not response.ok:
|
||||
reader.set_entry_read(entry, False)
|
||||
else:
|
||||
reader.set_entry_read(entry, True)
|
||||
return
|
||||
reader.set_entry_read(entry, True)
|
||||
continue
|
||||
|
||||
# Check if the entry is blacklisted, if it is, mark it as read and continue.
|
||||
if should_be_skipped(reader, entry):
|
||||
logger.info("Entry was blacklisted: %s", entry.id)
|
||||
reader.set_entry_read(entry, True)
|
||||
continue
|
||||
|
||||
# It was not blacklisted, and not forced through whitelist, so we will send it to Discord.
|
||||
response: Response = webhook.execute()
|
||||
if not response.ok:
|
||||
reader.set_entry_read(entry, False)
|
||||
if response.status_code not in {200, 204}:
|
||||
logger.error("Error sending entry to Discord: %s\n%s", response.text, pprint.pformat(webhook.json))
|
||||
reader.set_entry_read(entry, True)
|
||||
return
|
||||
|
||||
# If we only want to send one entry, we will break the loop. This is used when testing this function.
|
||||
if do_once:
|
||||
logger.info("Sent one entry to Discord.")
|
||||
break
|
||||
|
||||
# Update the search index.
|
||||
@ -196,11 +236,10 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None:
|
||||
break
|
||||
|
||||
if not webhook_url:
|
||||
# TODO: Show this error on the page.
|
||||
raise HTTPException(status_code=404, detail="Webhook not found")
|
||||
|
||||
try:
|
||||
# TODO: Check if the feed is valid
|
||||
# TODO(TheLovinator): Check if the feed is valid
|
||||
reader.add_feed(clean_feed_url)
|
||||
except FeedExistsError:
|
||||
# Add the webhook to an already added feed if it doesn't have a webhook instead of trying to create a new.
|
||||
@ -217,7 +256,7 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None:
|
||||
reader.set_entry_read(entry, True)
|
||||
|
||||
if not default_custom_message:
|
||||
# TODO: Show this error on the page.
|
||||
# TODO(TheLovinator): Show this error on the page.
|
||||
raise HTTPException(status_code=404, detail="Default custom message couldn't be found.")
|
||||
|
||||
# This is the webhook that will be used to send the feed to Discord.
|
||||
|
@ -40,7 +40,7 @@ def should_be_skipped(custom_reader: Reader, entry: Entry) -> bool:
|
||||
blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", ""))
|
||||
blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", ""))
|
||||
blacklist_author: str = str(custom_reader.get_tag(feed, "blacklist_author", ""))
|
||||
# TODO: Also add support for entry_text and more.
|
||||
# TODO(TheLovinator): Also add support for entry_text and more.
|
||||
|
||||
if entry.title and blacklist_title and is_word_in_text(blacklist_title, entry.title):
|
||||
return True
|
||||
|
@ -1,3 +1,5 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
|
||||
|
||||
|
@ -9,7 +9,7 @@ def healthcheck() -> None:
|
||||
sys.exit(0): success - the container is healthy and ready for use.
|
||||
sys.exit(1): unhealthy - the container is not working correctly.
|
||||
"""
|
||||
# TODO: We should check more than just that the website is up.
|
||||
# TODO(TheLovinator): We should check more than just that the website is up.
|
||||
try:
|
||||
r: requests.Response = requests.get(url="http://localhost:5000", timeout=5)
|
||||
if r.ok:
|
||||
|
17
discord_rss_bot/is_url_valid.py
Normal file
17
discord_rss_bot/is_url_valid.py
Normal file
@ -0,0 +1,17 @@
|
||||
from urllib.parse import ParseResult, urlparse
|
||||
|
||||
|
||||
def is_url_valid(url: str) -> bool:
|
||||
"""Check if a URL is valid.
|
||||
|
||||
Args:
|
||||
url: The URL to check.
|
||||
|
||||
Returns:
|
||||
bool: True if the URL is valid, False otherwise.
|
||||
"""
|
||||
try:
|
||||
result: ParseResult = urlparse(url)
|
||||
return all([result.scheme, result.netloc])
|
||||
except ValueError:
|
||||
return False
|
@ -1,20 +1,24 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import typing
|
||||
import urllib.parse
|
||||
from collections.abc import Iterable
|
||||
from contextlib import asynccontextmanager
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from functools import lru_cache
|
||||
from typing import cast
|
||||
from typing import TYPE_CHECKING, cast
|
||||
|
||||
import httpx
|
||||
import uvicorn
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
from fastapi import FastAPI, Form, HTTPException, Request
|
||||
from fastapi.responses import HTMLResponse
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from fastapi.templating import Jinja2Templates
|
||||
from httpx import Response
|
||||
from reader import Entry, Feed, FeedNotFoundError, Reader, TagNotFoundError
|
||||
from reader.types import JSONType
|
||||
from starlette.responses import RedirectResponse
|
||||
|
||||
from discord_rss_bot import settings
|
||||
@ -38,11 +42,32 @@ from discord_rss_bot.search import create_html_for_search_results
|
||||
from discord_rss_bot.settings import get_reader
|
||||
from discord_rss_bot.webhook import add_webhook, remove_webhook
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterable
|
||||
|
||||
|
||||
reader: Reader = get_reader()
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI) -> typing.AsyncGenerator[None, None]:
|
||||
"""This is needed for the ASGI server to run."""
|
||||
add_missing_tags(reader=reader)
|
||||
scheduler: AsyncIOScheduler = AsyncIOScheduler()
|
||||
|
||||
# Update all feeds every 15 minutes.
|
||||
# TODO(TheLovinator): Make this configurable.
|
||||
scheduler.add_job(send_to_discord, "interval", minutes=15, next_run_time=datetime.now(tz=timezone.utc))
|
||||
scheduler.start()
|
||||
yield
|
||||
reader.close()
|
||||
scheduler.shutdown(wait=True)
|
||||
|
||||
|
||||
app: FastAPI = FastAPI()
|
||||
app.mount("/static", StaticFiles(directory="discord_rss_bot/static"), name="static")
|
||||
templates: Jinja2Templates = Jinja2Templates(directory="discord_rss_bot/templates")
|
||||
|
||||
reader: Reader = get_reader()
|
||||
|
||||
# Add the filters to the Jinja2 environment so they can be used in html templates.
|
||||
templates.env.filters["encode_url"] = encode_url
|
||||
@ -70,7 +95,7 @@ async def post_delete_webhook(webhook_url: str = Form()) -> RedirectResponse:
|
||||
Args:
|
||||
webhook_url: The url of the webhook.
|
||||
"""
|
||||
# TODO: Check if the webhook is in use by any feeds before deleting it.
|
||||
# TODO(TheLovinator): Check if the webhook is in use by any feeds before deleting it.
|
||||
remove_webhook(reader, webhook_url)
|
||||
return RedirectResponse(url="/", status_code=303)
|
||||
|
||||
@ -131,19 +156,19 @@ async def post_set_whitelist(
|
||||
"""
|
||||
clean_feed_url: str = feed_url.strip()
|
||||
if whitelist_title:
|
||||
reader.set_tag(clean_feed_url, "whitelist_title", whitelist_title) # type: ignore
|
||||
reader.set_tag(clean_feed_url, "whitelist_title", whitelist_title) # type: ignore[call-overload]
|
||||
if whitelist_summary:
|
||||
reader.set_tag(clean_feed_url, "whitelist_summary", whitelist_summary) # type: ignore
|
||||
reader.set_tag(clean_feed_url, "whitelist_summary", whitelist_summary) # type: ignore[call-overload]
|
||||
if whitelist_content:
|
||||
reader.set_tag(clean_feed_url, "whitelist_content", whitelist_content) # type: ignore
|
||||
reader.set_tag(clean_feed_url, "whitelist_content", whitelist_content) # type: ignore[call-overload]
|
||||
if whitelist_author:
|
||||
reader.set_tag(clean_feed_url, "whitelist_author", whitelist_author) # type: ignore
|
||||
reader.set_tag(clean_feed_url, "whitelist_author", whitelist_author) # type: ignore[call-overload]
|
||||
|
||||
return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
|
||||
|
||||
|
||||
@app.get("/whitelist", response_class=HTMLResponse)
|
||||
async def get_whitelist(feed_url: str, request: Request): # noqa: ANN201
|
||||
async def get_whitelist(feed_url: str, request: Request):
|
||||
"""Get the whitelist.
|
||||
|
||||
Args:
|
||||
@ -167,7 +192,7 @@ async def get_whitelist(feed_url: str, request: Request): # noqa: ANN201
|
||||
"whitelist_content": whitelist_content,
|
||||
"whitelist_author": whitelist_author,
|
||||
}
|
||||
return templates.TemplateResponse("whitelist.html", context)
|
||||
return templates.TemplateResponse(request=request, name="whitelist.html", context=context)
|
||||
|
||||
|
||||
@app.post("/blacklist")
|
||||
@ -192,19 +217,28 @@ async def post_set_blacklist(
|
||||
"""
|
||||
clean_feed_url: str = feed_url.strip()
|
||||
if blacklist_title:
|
||||
reader.set_tag(clean_feed_url, "blacklist_title", blacklist_title) # type: ignore
|
||||
reader.set_tag(clean_feed_url, "blacklist_title", blacklist_title) # type: ignore[call-overload]
|
||||
if blacklist_summary:
|
||||
reader.set_tag(clean_feed_url, "blacklist_summary", blacklist_summary) # type: ignore
|
||||
reader.set_tag(clean_feed_url, "blacklist_summary", blacklist_summary) # type: ignore[call-overload]
|
||||
if blacklist_content:
|
||||
reader.set_tag(clean_feed_url, "blacklist_content", blacklist_content) # type: ignore
|
||||
reader.set_tag(clean_feed_url, "blacklist_content", blacklist_content) # type: ignore[call-overload]
|
||||
if blacklist_author:
|
||||
reader.set_tag(clean_feed_url, "blacklist_author", blacklist_author) # type: ignore
|
||||
reader.set_tag(clean_feed_url, "blacklist_author", blacklist_author) # type: ignore[call-overload]
|
||||
|
||||
return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
|
||||
|
||||
|
||||
@app.get("/blacklist", response_class=HTMLResponse)
|
||||
async def get_blacklist(feed_url: str, request: Request): # noqa: ANN201
|
||||
async def get_blacklist(feed_url: str, request: Request):
|
||||
"""Get the blacklist.
|
||||
|
||||
Args:
|
||||
feed_url: What feed we should get the blacklist for.
|
||||
request: The request object.
|
||||
|
||||
Returns:
|
||||
HTMLResponse: The blacklist page.
|
||||
"""
|
||||
feed: Feed = reader.get_feed(urllib.parse.unquote(feed_url))
|
||||
|
||||
# Get previous data, this is used when creating the form.
|
||||
@ -221,7 +255,7 @@ async def get_blacklist(feed_url: str, request: Request): # noqa: ANN201
|
||||
"blacklist_content": blacklist_content,
|
||||
"blacklist_author": blacklist_author,
|
||||
}
|
||||
return templates.TemplateResponse("blacklist.html", context)
|
||||
return templates.TemplateResponse(request=request, name="blacklist.html", context=context)
|
||||
|
||||
|
||||
@app.post("/custom")
|
||||
@ -232,17 +266,23 @@ async def post_set_custom(custom_message: str = Form(""), feed_url: str = Form()
|
||||
custom_message: The custom message.
|
||||
feed_url: The feed we should set the custom message for.
|
||||
"""
|
||||
if custom_message:
|
||||
reader.set_tag(feed_url, "custom_message", custom_message.strip()) # type: ignore
|
||||
our_custom_message: JSONType | str = custom_message.strip()
|
||||
our_custom_message = typing.cast(JSONType, our_custom_message)
|
||||
|
||||
default_custom_message: JSONType | str = settings.default_custom_message
|
||||
default_custom_message = typing.cast(JSONType, default_custom_message)
|
||||
|
||||
if our_custom_message:
|
||||
reader.set_tag(feed_url, "custom_message", our_custom_message)
|
||||
else:
|
||||
reader.set_tag(feed_url, "custom_message", settings.default_custom_message) # type: ignore
|
||||
reader.set_tag(feed_url, "custom_message", default_custom_message)
|
||||
|
||||
clean_feed_url: str = feed_url.strip()
|
||||
return RedirectResponse(url=f"/feed/?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
|
||||
|
||||
|
||||
@app.get("/custom", response_class=HTMLResponse)
|
||||
async def get_custom(feed_url: str, request: Request): # noqa: ANN201
|
||||
async def get_custom(feed_url: str, request: Request):
|
||||
"""Get the custom message. This is used when sending the message to Discord.
|
||||
|
||||
Args:
|
||||
@ -261,11 +301,11 @@ async def get_custom(feed_url: str, request: Request): # noqa: ANN201
|
||||
for entry in reader.get_entries(feed=feed, limit=1):
|
||||
context["entry"] = entry
|
||||
|
||||
return templates.TemplateResponse("custom.html", context)
|
||||
return templates.TemplateResponse(request=request, name="custom.html", context=context)
|
||||
|
||||
|
||||
@app.get("/embed", response_class=HTMLResponse)
|
||||
async def get_embed_page(feed_url: str, request: Request): # noqa: ANN201
|
||||
async def get_embed_page(feed_url: str, request: Request):
|
||||
"""Get the custom message. This is used when sending the message to Discord.
|
||||
|
||||
Args:
|
||||
@ -297,11 +337,11 @@ async def get_embed_page(feed_url: str, request: Request): # noqa: ANN201
|
||||
for entry in reader.get_entries(feed=feed, limit=1):
|
||||
# Append to context.
|
||||
context["entry"] = entry
|
||||
return templates.TemplateResponse("embed.html", context)
|
||||
return templates.TemplateResponse(request=request, name="embed.html", context=context)
|
||||
|
||||
|
||||
@app.post("/embed", response_class=HTMLResponse)
|
||||
async def post_embed( # noqa: PLR0913
|
||||
async def post_embed( # noqa: PLR0913, PLR0917
|
||||
feed_url: str = Form(),
|
||||
title: str = Form(""),
|
||||
description: str = Form(""),
|
||||
@ -385,22 +425,23 @@ async def post_use_text(feed_url: str = Form()) -> RedirectResponse:
|
||||
|
||||
|
||||
@app.get("/add", response_class=HTMLResponse)
|
||||
def get_add(request: Request): # noqa: ANN201
|
||||
def get_add(request: Request):
|
||||
"""Page for adding a new feed."""
|
||||
context = {
|
||||
"request": request,
|
||||
"webhooks": reader.get_tag((), "webhooks", []),
|
||||
}
|
||||
return templates.TemplateResponse("add.html", context)
|
||||
return templates.TemplateResponse(request=request, name="add.html", context=context)
|
||||
|
||||
|
||||
@app.get("/feed", response_class=HTMLResponse)
|
||||
async def get_feed(feed_url: str, request: Request): # noqa: ANN201
|
||||
async def get_feed(feed_url: str, request: Request, starting_after: str | None = None):
|
||||
"""Get a feed by URL.
|
||||
|
||||
Args:
|
||||
feed_url: The feed to add.
|
||||
request: The request object.
|
||||
starting_after: The entry to start after. Used for pagination.
|
||||
|
||||
Returns:
|
||||
HTMLResponse: The feed page.
|
||||
@ -410,7 +451,7 @@ async def get_feed(feed_url: str, request: Request): # noqa: ANN201
|
||||
feed: Feed = reader.get_feed(clean_feed_url)
|
||||
|
||||
# Get entries from the feed.
|
||||
entries: Iterable[Entry] = reader.get_entries(feed=clean_feed_url)
|
||||
entries: typing.Iterable[Entry] = reader.get_entries(feed=clean_feed_url, limit=10)
|
||||
|
||||
# Create the html for the entries.
|
||||
html: str = create_html_for_feed(entries)
|
||||
@ -428,8 +469,49 @@ async def get_feed(feed_url: str, request: Request): # noqa: ANN201
|
||||
"feed_counts": reader.get_feed_counts(feed=clean_feed_url),
|
||||
"html": html,
|
||||
"should_send_embed": should_send_embed,
|
||||
"show_more_button": True,
|
||||
}
|
||||
return templates.TemplateResponse("feed.html", context)
|
||||
return templates.TemplateResponse(request=request, name="feed.html", context=context)
|
||||
|
||||
|
||||
@app.get("/feed_more", response_class=HTMLResponse)
|
||||
async def get_all_entries(feed_url: str, request: Request):
|
||||
"""Get a feed by URL and show more entries.
|
||||
|
||||
Args:
|
||||
feed_url: The feed to add.
|
||||
request: The request object.
|
||||
starting_after: The entry to start after. Used for pagination.
|
||||
|
||||
Returns:
|
||||
HTMLResponse: The feed page.
|
||||
"""
|
||||
clean_feed_url: str = urllib.parse.unquote(feed_url.strip())
|
||||
|
||||
feed: Feed = reader.get_feed(clean_feed_url)
|
||||
|
||||
# Get entries from the feed.
|
||||
entries: typing.Iterable[Entry] = reader.get_entries(feed=clean_feed_url, limit=200)
|
||||
|
||||
# Create the html for the entries.
|
||||
html: str = create_html_for_feed(entries)
|
||||
|
||||
try:
|
||||
should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed"))
|
||||
except TagNotFoundError:
|
||||
add_missing_tags(reader)
|
||||
should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed"))
|
||||
|
||||
context = {
|
||||
"request": request,
|
||||
"feed": feed,
|
||||
"entries": entries,
|
||||
"feed_counts": reader.get_feed_counts(feed=clean_feed_url),
|
||||
"html": html,
|
||||
"should_send_embed": should_send_embed,
|
||||
"show_more_button": False,
|
||||
}
|
||||
return templates.TemplateResponse(request=request, name="feed.html", context=context)
|
||||
|
||||
|
||||
def create_html_for_feed(entries: Iterable[Entry]) -> str:
|
||||
@ -468,7 +550,7 @@ def create_html_for_feed(entries: Iterable[Entry]) -> str:
|
||||
|
||||
html += f"""<div class="p-2 mb-2 border border-dark">
|
||||
{blacklisted}{whitelisted}<a class="text-muted text-decoration-none" href="{entry.link}"><h2>{entry.title}</h2></a>
|
||||
{f"By { entry.author } @" if entry.author else ""}{published} - {to_discord_html}
|
||||
{f"By {entry.author} @" if entry.author else ""}{published} - {to_discord_html}
|
||||
|
||||
{text}
|
||||
{image_html}
|
||||
@ -478,7 +560,7 @@ def create_html_for_feed(entries: Iterable[Entry]) -> str:
|
||||
|
||||
|
||||
@app.get("/add_webhook", response_class=HTMLResponse)
|
||||
async def get_add_webhook(request: Request): # noqa: ANN201
|
||||
async def get_add_webhook(request: Request):
|
||||
"""Page for adding a new webhook.
|
||||
|
||||
Args:
|
||||
@ -487,7 +569,7 @@ async def get_add_webhook(request: Request): # noqa: ANN201
|
||||
Returns:
|
||||
HTMLResponse: The add webhook page.
|
||||
"""
|
||||
return templates.TemplateResponse("add_webhook.html", {"request": request})
|
||||
return templates.TemplateResponse(request=request, name="add_webhook.html", context={"request": request})
|
||||
|
||||
|
||||
@dataclass()
|
||||
@ -533,7 +615,7 @@ def get_data_from_hook_url(hook_name: str, hook_url: str) -> WebhookInfo:
|
||||
|
||||
|
||||
@app.get("/webhooks", response_class=HTMLResponse)
|
||||
async def get_webhooks(request: Request): # noqa: ANN201
|
||||
async def get_webhooks(request: Request):
|
||||
"""Page for adding a new webhook.
|
||||
|
||||
Args:
|
||||
@ -549,11 +631,11 @@ async def get_webhooks(request: Request): # noqa: ANN201
|
||||
hooks_with_data.append(our_hook)
|
||||
|
||||
context = {"request": request, "hooks_with_data": hooks_with_data}
|
||||
return templates.TemplateResponse("webhooks.html", context)
|
||||
return templates.TemplateResponse(request=request, name="webhooks.html", context=context)
|
||||
|
||||
|
||||
@app.get("/", response_class=HTMLResponse)
|
||||
def get_index(request: Request): # noqa: ANN201
|
||||
def get_index(request: Request):
|
||||
"""This is the root of the website.
|
||||
|
||||
Args:
|
||||
@ -562,10 +644,10 @@ def get_index(request: Request): # noqa: ANN201
|
||||
Returns:
|
||||
HTMLResponse: The index page.
|
||||
"""
|
||||
return templates.TemplateResponse("index.html", make_context_index(request))
|
||||
return templates.TemplateResponse(request=request, name="index.html", context=make_context_index(request))
|
||||
|
||||
|
||||
def make_context_index(request: Request): # noqa: ANN201
|
||||
def make_context_index(request: Request):
|
||||
"""Create the needed context for the index page.
|
||||
|
||||
Args:
|
||||
@ -605,7 +687,7 @@ def make_context_index(request: Request): # noqa: ANN201
|
||||
|
||||
|
||||
@app.post("/remove", response_class=HTMLResponse)
|
||||
async def remove_feed(feed_url: str = Form()): # noqa: ANN201
|
||||
async def remove_feed(feed_url: str = Form()):
|
||||
"""Get a feed by URL.
|
||||
|
||||
Args:
|
||||
@ -623,7 +705,7 @@ async def remove_feed(feed_url: str = Form()): # noqa: ANN201
|
||||
|
||||
|
||||
@app.get("/search", response_class=HTMLResponse)
|
||||
async def search(request: Request, query: str): # noqa: ANN201
|
||||
async def search(request: Request, query: str):
|
||||
"""Get entries matching a full-text search query.
|
||||
|
||||
Args:
|
||||
@ -641,11 +723,11 @@ async def search(request: Request, query: str): # noqa: ANN201
|
||||
"query": query,
|
||||
"search_amount": reader.search_entry_counts(query),
|
||||
}
|
||||
return templates.TemplateResponse("search.html", context)
|
||||
return templates.TemplateResponse(request=request, name="search.html", context=context)
|
||||
|
||||
|
||||
@app.get("/post_entry", response_class=HTMLResponse)
|
||||
async def post_entry(entry_id: str): # noqa: ANN201
|
||||
async def post_entry(entry_id: str):
|
||||
"""Send single entry to Discord.
|
||||
|
||||
Args:
|
||||
@ -668,7 +750,7 @@ async def post_entry(entry_id: str): # noqa: ANN201
|
||||
|
||||
|
||||
@app.post("/modify_webhook", response_class=HTMLResponse)
|
||||
def modify_webhook(old_hook: str = Form(), new_hook: str = Form()): # noqa: ANN201
|
||||
def modify_webhook(old_hook: str = Form(), new_hook: str = Form()):
|
||||
"""Modify a webhook.
|
||||
|
||||
Args:
|
||||
@ -682,7 +764,7 @@ def modify_webhook(old_hook: str = Form(), new_hook: str = Form()): # noqa: ANN
|
||||
webhooks = list(reader.get_tag((), "webhooks", []))
|
||||
|
||||
# Webhooks are stored as a list of dictionaries.
|
||||
# Example: [{"name": "webhook_name", "url": "webhook_url"}] # noqa: ERA001
|
||||
# Example: [{"name": "webhook_name", "url": "webhook_url"}]
|
||||
webhooks = cast(list[dict[str, str]], webhooks)
|
||||
|
||||
for hook in webhooks:
|
||||
@ -712,24 +794,8 @@ def modify_webhook(old_hook: str = Form(), new_hook: str = Form()): # noqa: ANN
|
||||
return RedirectResponse(url="/webhooks", status_code=303)
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
def startup() -> None:
|
||||
"""This is called when the server starts.
|
||||
|
||||
It adds missing tags and starts the scheduler.
|
||||
"""
|
||||
add_missing_tags(reader=reader)
|
||||
|
||||
scheduler: BackgroundScheduler = BackgroundScheduler()
|
||||
|
||||
# Update all feeds every 15 minutes.
|
||||
# TODO: Make this configurable.
|
||||
scheduler.add_job(send_to_discord, "interval", minutes=15, next_run_time=datetime.now(tz=timezone.utc))
|
||||
scheduler.start()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# TODO: Make this configurable.
|
||||
# TODO(TheLovinator): Make this configurable.
|
||||
uvicorn.run(
|
||||
"main:app",
|
||||
log_level="info",
|
||||
|
@ -35,6 +35,7 @@ def convert_html_to_md(html: str) -> str:
|
||||
link.decompose()
|
||||
else:
|
||||
link_text: str = link.text or link.get("href")
|
||||
link_text = link_text.replace("http://", "").replace("https://", "")
|
||||
link.replace_with(f"[{link_text}]({link.get('href')})")
|
||||
|
||||
for strikethrough in soup.find_all("s") + soup.find_all("del") + soup.find_all("strike"):
|
||||
|
@ -4,60 +4,88 @@ from discord_rss_bot.settings import default_custom_embed, default_custom_messag
|
||||
|
||||
|
||||
def add_custom_message(reader: Reader, feed: Feed) -> None:
|
||||
"""Add the custom message tag to the feed if it doesn't exist.
|
||||
|
||||
Args:
|
||||
reader: What Reader to use.
|
||||
feed: The feed to add the tag to.
|
||||
"""
|
||||
try:
|
||||
reader.get_tag(feed, "custom_message")
|
||||
except TagNotFoundError:
|
||||
print(f"Adding custom_message tag to '{feed.url}'")
|
||||
reader.set_tag(feed.url, "custom_message", default_custom_message) # type: ignore
|
||||
reader.set_tag(feed.url, "has_custom_message", True) # type: ignore
|
||||
|
||||
|
||||
def add_has_custom_message(reader: Reader, feed: Feed) -> None:
|
||||
"""Add the has_custom_message tag to the feed if it doesn't exist.
|
||||
|
||||
Args:
|
||||
reader: What Reader to use.
|
||||
feed: The feed to add the tag to.
|
||||
"""
|
||||
try:
|
||||
reader.get_tag(feed, "has_custom_message")
|
||||
except TagNotFoundError:
|
||||
if reader.get_tag(feed, "custom_message") == default_custom_message:
|
||||
print(f"Setting has_custom_message tag to False for '{feed.url}'")
|
||||
reader.set_tag(feed.url, "has_custom_message", False) # type: ignore
|
||||
else:
|
||||
print(f"Setting has_custom_message tag to True for '{feed.url}'")
|
||||
reader.set_tag(feed.url, "has_custom_message", True) # type: ignore
|
||||
|
||||
|
||||
def add_if_embed(reader: Reader, feed: Feed) -> None:
|
||||
"""Add the if_embed tag to the feed if it doesn't exist.
|
||||
|
||||
Args:
|
||||
reader: What Reader to use.
|
||||
feed: The feed to add the tag to.
|
||||
"""
|
||||
try:
|
||||
reader.get_tag(feed, "if_embed")
|
||||
except TagNotFoundError:
|
||||
print(f"Setting if_embed tag to True for '{feed.url}'")
|
||||
reader.set_tag(feed.url, "if_embed", True) # type: ignore
|
||||
|
||||
|
||||
def add_custom_embed(reader: Reader, feed: Feed) -> None:
|
||||
"""Add the custom embed tag to the feed if it doesn't exist.
|
||||
|
||||
Args:
|
||||
reader: What Reader to use.
|
||||
feed: The feed to add the tag to.
|
||||
"""
|
||||
try:
|
||||
reader.get_tag(feed, "embed")
|
||||
except TagNotFoundError:
|
||||
print(f"Setting embed tag to default for '{feed.url}'")
|
||||
reader.set_tag(feed.url, "embed", default_custom_embed) # type: ignore
|
||||
reader.set_tag(feed.url, "has_custom_embed", True) # type: ignore
|
||||
|
||||
|
||||
def add_has_custom_embed(reader: Reader, feed: Feed) -> None:
|
||||
"""Add the has_custom_embed tag to the feed if it doesn't exist.
|
||||
|
||||
Args:
|
||||
reader: What Reader to use.
|
||||
feed: The feed to add the tag to.
|
||||
"""
|
||||
try:
|
||||
reader.get_tag(feed, "has_custom_embed")
|
||||
except TagNotFoundError:
|
||||
if reader.get_tag(feed, "embed") == default_custom_embed:
|
||||
print(f"Setting has_custom_embed tag to False for '{feed.url}'")
|
||||
reader.set_tag(feed.url, "has_custom_embed", False) # type: ignore
|
||||
else:
|
||||
print(f"Setting has_custom_embed tag to True for '{feed.url}'")
|
||||
reader.set_tag(feed.url, "has_custom_embed", True) # type: ignore
|
||||
|
||||
|
||||
def add_should_send_embed(reader: Reader, feed: Feed) -> None:
|
||||
"""Add the should_send_embed tag to the feed if it doesn't exist.
|
||||
|
||||
Args:
|
||||
reader: What Reader to use.
|
||||
feed: The feed to add the tag to.
|
||||
"""
|
||||
try:
|
||||
reader.get_tag(feed, "should_send_embed")
|
||||
except TagNotFoundError:
|
||||
print(f"Setting should_send_embed tag to True for '{feed.url}'")
|
||||
reader.set_tag(feed.url, "should_send_embed", True) # type: ignore
|
||||
|
||||
|
||||
|
@ -1,13 +1,15 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import urllib.parse
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from reader import EntrySearchResult, Feed, HighlightedString, Reader
|
||||
|
||||
from discord_rss_bot.settings import get_reader
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterable
|
||||
|
||||
from reader import EntrySearchResult, Feed, HighlightedString, Reader
|
||||
|
||||
|
||||
def create_html_for_search_results(query: str, custom_reader: Reader | None = None) -> str:
|
||||
"""Create HTML for the search results.
|
||||
@ -19,8 +21,8 @@ def create_html_for_search_results(query: str, custom_reader: Reader | None = No
|
||||
Returns:
|
||||
str: The HTML.
|
||||
"""
|
||||
# TODO: There is a .content that also contains text, we should use that if .summary is not available.
|
||||
# TODO: We should also add <span> tags to the title.
|
||||
# TODO(TheLovinator): There is a .content that also contains text, we should use that if .summary is not available.
|
||||
# TODO(TheLovinator): We should also add <span> tags to the title.
|
||||
|
||||
# Get the default reader if we didn't get a custom one.
|
||||
reader: Reader = get_reader() if custom_reader is None else custom_reader
|
||||
@ -55,12 +57,12 @@ def add_span_with_slice(highlighted_string: HighlightedString) -> str:
|
||||
Returns:
|
||||
str: The string with added <span> tags.
|
||||
"""
|
||||
# TODO: We are looping through the highlights and only using the last one. We should use all of them.
|
||||
# TODO(TheLovinator): We are looping through the highlights and only using the last one. We should use all of them.
|
||||
before_span, span_part, after_span = "", "", ""
|
||||
|
||||
for txt_slice in highlighted_string.highlights:
|
||||
before_span: str = f"{highlighted_string.value[: txt_slice.start]}"
|
||||
span_part: str = f"<span class='bg-warning'>{highlighted_string.value[txt_slice.start: txt_slice.stop]}</span>"
|
||||
after_span: str = f"{highlighted_string.value[txt_slice.stop:]}"
|
||||
span_part: str = f"<span class='bg-warning'>{highlighted_string.value[txt_slice.start : txt_slice.stop]}</span>"
|
||||
after_span: str = f"{highlighted_string.value[txt_slice.stop :]}"
|
||||
|
||||
return f"{before_span}{span_part}{after_span}"
|
||||
|
@ -1,15 +1,32 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import sys
|
||||
import typing
|
||||
from functools import lru_cache
|
||||
from pathlib import Path
|
||||
|
||||
from platformdirs import user_data_dir
|
||||
from reader import Reader, make_reader
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from reader.types import JSONType
|
||||
|
||||
data_dir: str = user_data_dir(appname="discord_rss_bot", appauthor="TheLovinator", roaming=True, ensure_exists=True)
|
||||
print(f"Data is stored in '{data_dir}'.")
|
||||
|
||||
|
||||
# TODO: Add default things to the database and make the edible.
|
||||
default_custom_message: str = "{{entry_title}}\n{{entry_link}}"
|
||||
logger: logging.Logger = logging.getLogger("discord_rss_bot")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
stream_handler = logging.StreamHandler(sys.stdout)
|
||||
log_formatter = logging.Formatter(
|
||||
"%(asctime)s [%(processName)s: %(process)d] [%(threadName)s: %(thread)d] [%(levelname)s] %(name)s: %(message)s",
|
||||
)
|
||||
stream_handler.setFormatter(log_formatter)
|
||||
logger.addHandler(stream_handler)
|
||||
|
||||
|
||||
# TODO(TheLovinator): Add default things to the database and make the edible.
|
||||
default_custom_message: JSONType | str = "{{entry_title}}\n{{entry_link}}"
|
||||
default_custom_embed: dict[str, str] = {
|
||||
"title": "{{entry_title}}",
|
||||
"description": "{{entry_text}}",
|
||||
|
6
discord_rss_bot/static/bootstrap.min.css
vendored
Normal file
6
discord_rss_bot/static/bootstrap.min.css
vendored
Normal file
File diff suppressed because one or more lines are too long
7
discord_rss_bot/static/bootstrap.min.js
vendored
Normal file
7
discord_rss_bot/static/bootstrap.min.js
vendored
Normal file
File diff suppressed because one or more lines are too long
@ -4,4 +4,12 @@ body {
|
||||
|
||||
.border {
|
||||
background: #161616;
|
||||
}
|
||||
}
|
||||
|
||||
.text-muted {
|
||||
color: #888888 !important;
|
||||
}
|
||||
|
||||
.form-text {
|
||||
color: #888888;
|
||||
}
|
||||
|
@ -7,10 +7,7 @@
|
||||
content="Stay updated with the latest news and events with our easy-to-use RSS bot. Never miss a message or announcement again with real-time notifications directly to your Discord server." />
|
||||
<meta name="keywords"
|
||||
content="discord, rss, bot, notifications, announcements, updates, real-time, server, messages, news, events, feed." />
|
||||
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/css/bootstrap.min.css"
|
||||
rel="stylesheet"
|
||||
integrity="sha384-rbsA2VBKQhggwzxH7pPCaAqO46MgnOM80zW1RWuH61DGLwZJEdK2Kadq2F9CUG65"
|
||||
crossorigin="anonymous" />
|
||||
<link href="/static/bootstrap.min.css" rel="stylesheet" />
|
||||
<link href="/static/styles.css" rel="stylesheet" />
|
||||
<link rel="icon" href="/static/favicon.ico" type="image/x-icon" />
|
||||
<title>discord-rss-bot
|
||||
@ -46,7 +43,6 @@
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/js/bootstrap.bundle.min.js" integrity="sha384-kenU1KFdBIe4zVF0s0G1M5b4hcpxyD9F7jL+jjXkk+Q2h455rYXK/7HAuoJl+0I4" crossorigin="anonymous">
|
||||
</script>
|
||||
<script src="/static/bootstrap.min.js" defer></script>
|
||||
</body>
|
||||
</html>
|
||||
|
@ -61,4 +61,8 @@
|
||||
<pre>
|
||||
{{ html|safe }}
|
||||
</pre>
|
||||
{% if show_more_button %}
|
||||
<a class="btn btn-dark"
|
||||
href="/feed_more?feed_url={{ feed.url|encode_url }}">Show more (Note: This view is not optimized at all, so be ready to wait a while)</a>
|
||||
{% endif %}
|
||||
{% endblock content %}
|
||||
|
@ -10,19 +10,6 @@
|
||||
<br />
|
||||
{% for hook in hooks_with_data %}
|
||||
<div class="p-2 border border-dark text-muted">
|
||||
{% if hook.avatar is not none %}
|
||||
<img src="https://cdn.discordapp.com/avatars/{{ hook.id }}/{{ hook.avatar }}.webp"
|
||||
class="img-thumbnail"
|
||||
height="128"
|
||||
width="128"
|
||||
alt="Webhook avatar" />
|
||||
{% else %}
|
||||
<img src="https://cdn.discordapp.com/embed/avatars/{{ hook.avatar_mod }}.png"
|
||||
class="img-thumbnail"
|
||||
height="128"
|
||||
width="128"
|
||||
alt="Default Discord avatar" />
|
||||
{% endif %}
|
||||
<h3>{{ hook.custom_name }}</h3>
|
||||
<li>
|
||||
<strong>Name</strong>: {{ hook.name }}
|
||||
|
@ -21,7 +21,7 @@ def add_webhook(reader: Reader, webhook_name: str, webhook_url: str) -> None:
|
||||
webhooks = list(reader.get_tag((), "webhooks", []))
|
||||
|
||||
# Webhooks are stored as a list of dictionaries.
|
||||
# Example: [{"name": "webhook_name", "url": "webhook_url"}] # noqa: ERA001
|
||||
# Example: [{"name": "webhook_name", "url": "webhook_url"}]
|
||||
webhooks = cast(list[dict[str, str]], webhooks)
|
||||
|
||||
# Only add the webhook if it doesn't already exist.
|
||||
@ -35,8 +35,8 @@ def add_webhook(reader: Reader, webhook_name: str, webhook_url: str) -> None:
|
||||
add_missing_tags(reader)
|
||||
return
|
||||
|
||||
# TODO: Show this error on the page.
|
||||
# TODO: Replace HTTPException with a custom exception.
|
||||
# TODO(TheLovinator): Show this error on the page.
|
||||
# TODO(TheLovinator): Replace HTTPException with a custom exception.
|
||||
raise HTTPException(status_code=409, detail="Webhook already exists")
|
||||
|
||||
|
||||
@ -51,12 +51,12 @@ def remove_webhook(reader: Reader, webhook_url: str) -> None:
|
||||
HTTPException: If webhook could not be deleted
|
||||
HTTPException: Webhook not found
|
||||
"""
|
||||
# TODO: Replace HTTPException with a custom exception for both of these.
|
||||
# TODO(TheLovinator): Replace HTTPException with a custom exception for both of these.
|
||||
# Get current webhooks from the database if they exist otherwise use an empty list.
|
||||
webhooks = list(reader.get_tag((), "webhooks", []))
|
||||
|
||||
# Webhooks are stored as a list of dictionaries.
|
||||
# Example: [{"name": "webhook_name", "url": "webhook_url"}] # noqa: ERA001
|
||||
# Example: [{"name": "webhook_name", "url": "webhook_url"}]
|
||||
webhooks = cast(list[dict[str, str]], webhooks)
|
||||
|
||||
# Only add the webhook if it doesn't already exist.
|
||||
@ -72,5 +72,5 @@ def remove_webhook(reader: Reader, webhook_url: str) -> None:
|
||||
reader.set_tag((), "webhooks", webhooks) # type: ignore
|
||||
return
|
||||
|
||||
# TODO: Show this error on the page.
|
||||
# TODO(TheLovinator): Show this error on the page.
|
||||
raise HTTPException(status_code=404, detail="Webhook not found")
|
||||
|
Reference in New Issue
Block a user