import json import urllib.parse from dataclasses import dataclass from datetime import datetime from functools import lru_cache from typing import Dict, Iterable import httpx import uvicorn from apscheduler.schedulers.background import BackgroundScheduler from fastapi import FastAPI, Form, HTTPException, Request from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from httpx import Response from reader import ( Entry, EntryCounts, EntrySearchCounts, EntrySearchResult, Feed, FeedCounts, FeedNotFoundError, Reader, TagNotFoundError, ) from starlette.responses import RedirectResponse from discord_rss_bot import settings from discord_rss_bot.custom_filters import encode_url, entry_is_blacklisted, entry_is_whitelisted from discord_rss_bot.custom_message import ( CustomEmbed, get_custom_message, get_embed, get_images_from_entry, replace_tags_in_text_message, save_embed, ) from discord_rss_bot.feeds import get_entry_from_id, send_entry_to_discord, send_to_discord from discord_rss_bot.filter.blacklist import get_blacklist_content, get_blacklist_summary, get_blacklist_title from discord_rss_bot.filter.whitelist import get_whitelist_content, get_whitelist_summary, get_whitelist_title from discord_rss_bot.markdown import convert_html_to_md from discord_rss_bot.missing_tags import add_missing_tags from discord_rss_bot.search import create_html_for_search_results from discord_rss_bot.settings import default_custom_message, get_reader from discord_rss_bot.webhook import add_webhook, remove_webhook app: FastAPI = FastAPI() app.mount("/static", StaticFiles(directory="discord_rss_bot/static"), name="static") templates: Jinja2Templates = Jinja2Templates(directory="discord_rss_bot/templates") reader: Reader = get_reader() # Add the filters to the Jinja2 environment so they can be used in html templates. templates.env.filters["encode_url"] = encode_url templates.env.filters["entry_is_whitelisted"] = entry_is_whitelisted templates.env.filters["entry_is_blacklisted"] = entry_is_blacklisted templates.env.filters["discord_markdown"] = convert_html_to_md @app.post("/add_webhook") async def post_add_webhook(webhook_name=Form(), webhook_url=Form()): """ Add a feed to the database. Args: webhook_name: The name of the webhook. webhook_url: The url of the webhook. """ if add_webhook(reader, webhook_name, webhook_url): return RedirectResponse(url="/", status_code=303) @app.post("/delete_webhook") async def post_delete_webhook(webhook_url=Form()): """ Delete a webhook from the database. Args: webhook_url: The url of the webhook. """ if remove_webhook(reader, webhook_url): return RedirectResponse(url="/", status_code=303) @app.post("/add") async def post_create_feed(feed_url=Form(), webhook_dropdown=Form()): """ Add a feed to the database. Args: feed_url: The feed to add. webhook_dropdown: The webhook to use. Returns: dict: The feed that was added. """ clean_feed_url: str = feed_url.strip() # TODO: Check if the feed is valid, if not return an error or fix it. # For example, if the feed is missing the protocol, add it. reader.add_feed(clean_feed_url) reader.update_feed(clean_feed_url) # Mark every entry as read, so we don't send all the old entries to Discord. entries: Iterable[Entry] = reader.get_entries(feed=clean_feed_url, read=False) for entry in entries: reader.set_entry_read(entry, True) hooks = reader.get_tag((), "webhooks", []) webhook_url: str = "" if hooks: # Get the webhook URL from the dropdown. for hook in hooks: if hook["name"] == webhook_dropdown: # type: ignore webhook_url = hook["url"] # type: ignore break if not webhook_url: # TODO: Show this error on the page. raise HTTPException(status_code=404, detail="Webhook not found") # This is the webhook that will be used to send the feed to Discord. reader.set_tag(clean_feed_url, "webhook", webhook_url) # type: ignore reader.get_tag(clean_feed_url, "webhook") # This is the default message that will be sent to Discord. reader.set_tag(clean_feed_url, "custom_message", default_custom_message) # type: ignore reader.get_tag(clean_feed_url, "custom_message") # Update the full-text search index so our new feed is searchable. reader.update_search() return RedirectResponse(url=f"/feed/?feed_url={feed_url}", status_code=303) @app.post("/pause") async def post_pause_feed(feed_url=Form()): """Pause a feed. Args: feed_url: The feed to pause. """ reader.disable_feed_updates(feed_url) clean_url: str = urllib.parse.quote(feed_url) return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303) @app.post("/unpause") async def post_unpause_feed(feed_url=Form()): """Unpause a feed. Args: feed_url: The Feed to unpause. """ reader.enable_feed_updates(feed_url) clean_url: str = urllib.parse.quote(feed_url) return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303) @app.post("/whitelist") async def post_set_whitelist( whitelist_title=Form(None), whitelist_summary=Form(None), whitelist_content=Form(None), feed_url=Form(), ): """Set what the whitelist should be sent, if you have this set only words in the whitelist will be sent. Args: whitelist_title: Whitelisted words for when checking the title. whitelist_summary: Whitelisted words for when checking the title. whitelist_content: Whitelisted words for when checking the title. feed_url: The feed we should set the whitelist for. """ if whitelist_title: reader.set_tag(feed_url, "whitelist_title", whitelist_title) if whitelist_summary: reader.set_tag(feed_url, "whitelist_summary", whitelist_summary) if whitelist_content: reader.set_tag(feed_url, "whitelist_content", whitelist_content) clean_url: str = urllib.parse.quote(feed_url) return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303) @app.get("/whitelist", response_class=HTMLResponse) async def get_whitelist(feed_url, request: Request): """Get the whitelist. Args: feed_url: What feed we should get the whitelist for. request: The HTTP request. """ # Make feed_url a valid URL. url: str = urllib.parse.unquote(feed_url) feed: Feed = reader.get_feed(url) # Get previous data, this is used when creating the form. whitelist_title: str = get_whitelist_title(reader, feed) whitelist_summary: str = get_whitelist_summary(reader, feed) whitelist_content: str = get_whitelist_content(reader, feed) context = { "request": request, "feed": feed, "whitelist_title": whitelist_title, "whitelist_summary": whitelist_summary, "whitelist_content": whitelist_content, } return templates.TemplateResponse("whitelist.html", context) @app.post("/blacklist") async def post_set_blacklist( blacklist_title=Form(None), blacklist_summary=Form(None), blacklist_content=Form(None), feed_url=Form(), ): """Set the blacklist, if this is set we will check if words are in the title, summary or content and then don't send that entry. Args: blacklist_title: Blacklisted words for when checking the title. blacklist_summary: Blacklisted words for when checking the summary. blacklist_content: Blacklisted words for when checking the content. feed_url: What feed we should set the blacklist for. """ # Add the blacklist to the feed. if blacklist_title: reader.set_tag(feed_url, "blacklist_title", blacklist_title) if blacklist_summary: reader.set_tag(feed_url, "blacklist_summary", blacklist_summary) if blacklist_content: reader.set_tag(feed_url, "blacklist_content", blacklist_content) clean_url = urllib.parse.quote(feed_url) return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303) @app.get("/blacklist", response_class=HTMLResponse) async def get_blacklist(feed_url, request: Request): # Make feed_url a valid URL. url: str = urllib.parse.unquote(feed_url) feed: Feed = reader.get_feed(url) # Get previous data, this is used when creating the form. blacklist_title: str = get_blacklist_title(reader, feed) blacklist_summary: str = get_blacklist_summary(reader, feed) blacklist_content: str = get_blacklist_content(reader, feed) context = { "request": request, "feed": feed, "blacklist_title": blacklist_title, "blacklist_summary": blacklist_summary, "blacklist_content": blacklist_content, } return templates.TemplateResponse("blacklist.html", context) @app.post("/custom") async def post_set_custom(custom_message=Form(""), feed_url=Form()): """ Set the custom message, this is used when sending the message. Args: custom_message: The custom message. feed_url: The feed we should set the custom message for. """ if custom_message := custom_message.strip(): reader.set_tag(feed_url, "custom_message", custom_message) # type: ignore else: reader.set_tag(feed_url, "custom_message", settings.default_custom_message) # type: ignore clean_url: str = urllib.parse.quote(feed_url) return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303) @app.get("/custom", response_class=HTMLResponse) async def get_custom(feed_url, request: Request): """Get the custom message. This is used when sending the message to Discord. Args: feed_url: What feed we should get the custom message for. request: The HTTP request. Returns: custom.html """ # Make feed_url a valid URL. url: str = urllib.parse.unquote(feed_url) feed: Feed = reader.get_feed(url) # Get previous data, this is used when creating the form. custom_message: str = get_custom_message(reader, feed) context = {"request": request, "feed": feed, "custom_message": custom_message} # Get the first entry, this is used to show the user what the custom message will look like. entries: Iterable[Entry] = reader.get_entries(feed=feed, limit=1) for entry in entries: # Append to context. context["entry"] = entry return templates.TemplateResponse("custom.html", context) @app.get("/embed", response_class=HTMLResponse) async def get_embed_page(feed_url, request: Request): """Get the custom message. This is used when sending the message to Discord. Args: feed_url: What feed we should get the custom message for. request: The HTTP request. Returns: custom.html """ # Make feed_url a valid URL. url: str = urllib.parse.unquote(feed_url) feed: Feed = reader.get_feed(url) # Get previous data, this is used when creating the form. embed: CustomEmbed = get_embed(reader, feed) context = { "request": request, "feed": feed, "title": embed.title, "description": embed.description, "color": embed.color, "image_url": embed.image_url, "thumbnail_url": embed.thumbnail_url, "author_name": embed.author_name, "author_url": embed.author_url, "author_icon_url": embed.author_icon_url, "footer_text": embed.footer_text, "footer_icon_url": embed.footer_icon_url, } # Get the first entry, this is used to show the user what the custom message will look like. entries: Iterable[Entry] = reader.get_entries(feed=feed, limit=1) if custom_embed := get_embed(reader, feed_url): context["custom_embed"] = custom_embed for entry in entries: # Append to context. context["entry"] = entry return templates.TemplateResponse("embed.html", context) @app.post("/embed", response_class=HTMLResponse) async def post_embed( feed_url=Form(), title=Form(""), description=Form(""), color=Form(""), image_url=Form(""), thumbnail_url=Form(""), author_name=Form(""), author_url=Form(""), author_icon_url=Form(""), footer_text=Form(""), footer_icon_url=Form(""), ): """Set the embed settings. Args: feed_url: What feed we should get the custom message for. request: The HTTP request. Returns: custom.html """ # Make feed_url a valid URL. url: str = urllib.parse.unquote(feed_url) feed: Feed = reader.get_feed(url) custom_embed: CustomEmbed = get_embed(reader, feed) # Get the data from the form. custom_embed.title = title custom_embed.description = description custom_embed.color = color custom_embed.image_url = image_url custom_embed.thumbnail_url = thumbnail_url custom_embed.author_name = author_name custom_embed.author_url = author_url custom_embed.author_icon_url = author_icon_url custom_embed.footer_text = footer_text custom_embed.footer_icon_url = footer_icon_url # Save the data. save_embed(reader, feed_url, custom_embed) clean_url: str = urllib.parse.quote(feed_url) return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303) @app.post("/use_embed") async def post_use_embed(feed_url=Form()): url: str = urllib.parse.unquote(feed_url) feed: Feed = reader.get_feed(url) reader.set_tag(feed, "should_send_embed", True) # type: ignore return RedirectResponse(url=f"/feed/?feed_url={feed_url}", status_code=303) @app.post("/use_text") async def post_use_text(feed_url=Form()): url: str = urllib.parse.unquote(feed_url) feed: Feed = reader.get_feed(url) reader.set_tag(feed, "should_send_embed", False) # type: ignore return RedirectResponse(url=f"/feed/?feed_url={feed_url}", status_code=303) @app.get("/add", response_class=HTMLResponse) def get_add(request: Request): """ Page for adding a new feed. Args: request: The request. """ context = make_context_index(request) return templates.TemplateResponse("add.html", context) @app.get("/feed", response_class=HTMLResponse) async def get_feed(feed_url, request: Request): """ Get a feed by URL. Args: request: The request. feed_url: The feed to add. """ # Make feed_url a valid URL. url: str = urllib.parse.unquote(feed_url) feed: Feed = reader.get_feed(url) # Get entries from the feed. entries: Iterable[Entry] = reader.get_entries(feed=url) # Get the entries in the feed. feed_counts: FeedCounts = reader.get_feed_counts(feed=url) # Create the html for the entries. html: str = create_html_for_feed(entries) try: should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed")) except TagNotFoundError: add_missing_tags(reader) should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed")) context = { "request": request, "feed": feed, "entries": entries, "feed_counts": feed_counts, "html": html, "should_send_embed": should_send_embed, } return templates.TemplateResponse("feed.html", context) def create_html_for_feed(entries: Iterable[Entry]) -> str: """Create HTML for the search results. Args: search_results: The search results. custom_reader: The reader. If None, we will get the reader from the settings. """ html: str = "" for entry in entries: # Get first image. first_image = "" first_image_text = "" if images := get_images_from_entry(entry=entry): first_image: str = images[0][0] first_image_text: str = images[0][1] # Get the text from the entry. text = replace_tags_in_text_message(entry.feed, entry) if not text: text = "