Add tests
This commit is contained in:
@ -24,27 +24,31 @@ Exceptions:
|
||||
from typing import Iterable
|
||||
|
||||
from discord_webhook import DiscordWebhook
|
||||
from reader import Entry
|
||||
from reader import Entry, Reader
|
||||
from requests import Response
|
||||
|
||||
from discord_rss_bot.settings import reader
|
||||
from discord_rss_bot.settings import get_reader
|
||||
|
||||
|
||||
def send_to_discord(feed=None) -> None:
|
||||
def send_to_discord(reader: Reader = None, feed=None, do_once=False) -> None:
|
||||
"""
|
||||
Send entries to Discord.
|
||||
|
||||
If response was not ok, we will log the error and mark the entry as unread, so it will be sent again next time.
|
||||
|
||||
Args:
|
||||
reader: If we should use a custom reader instead of the default one.
|
||||
feed: The entry to send.
|
||||
|
||||
Raises:
|
||||
NoWebhookFoundError: If no webhook is found.
|
||||
do_once: If we should only send one entry. This is used in the test.
|
||||
|
||||
Returns:
|
||||
Response: The response from the webhook.
|
||||
"""
|
||||
# Get the default reader if we didn't get a custom one.
|
||||
if reader is None:
|
||||
reader = get_reader()
|
||||
|
||||
# If we should get all entries, or just the entries from a specific feed.
|
||||
if feed is None:
|
||||
reader.update_feeds()
|
||||
entries: Iterable[Entry] = reader.get_entries(read=False)
|
||||
@ -53,13 +57,21 @@ def send_to_discord(feed=None) -> None:
|
||||
entries: Iterable[Entry] = reader.get_entries(feed=feed, read=False)
|
||||
|
||||
for entry in entries:
|
||||
# Set the webhook to read, so we don't send it again.
|
||||
reader.set_entry_read(entry, True)
|
||||
|
||||
# Get the webhook from the feed.
|
||||
webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook"))
|
||||
webhook_message: str = f":robot: :mega: {entry.title}\n{entry.link}"
|
||||
webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True)
|
||||
|
||||
# Send the webhook.
|
||||
response: Response = webhook.execute()
|
||||
if not response.ok:
|
||||
reader.set_entry_read(entry, False)
|
||||
|
||||
# If we only want to send one entry, we will break the loop. This is used when testing this function.
|
||||
if do_once:
|
||||
break
|
||||
|
||||
reader.update_search()
|
||||
|
@ -49,11 +49,13 @@ from tomlkit.toml_document import TOMLDocument
|
||||
|
||||
from discord_rss_bot.feeds import send_to_discord
|
||||
from discord_rss_bot.search import create_html_for_search_results
|
||||
from discord_rss_bot.settings import read_settings_file, reader
|
||||
from discord_rss_bot.settings import get_reader, read_settings_file
|
||||
|
||||
app: FastAPI = FastAPI()
|
||||
app.mount("/static", StaticFiles(directory="static"), name="static")
|
||||
templates: Jinja2Templates = Jinja2Templates(directory="templates")
|
||||
app.mount("/static", StaticFiles(directory="discord_rss_bot/static"), name="static")
|
||||
templates: Jinja2Templates = Jinja2Templates(directory="discord_rss_bot/templates")
|
||||
|
||||
reader = get_reader()
|
||||
|
||||
|
||||
def encode_url(url_to_quote: str) -> str:
|
||||
|
@ -1,22 +1,28 @@
|
||||
import urllib.parse
|
||||
from typing import Iterable
|
||||
|
||||
from reader import EntrySearchResult, Feed, HighlightedString
|
||||
from reader import EntrySearchResult, Feed, HighlightedString, Reader
|
||||
|
||||
from discord_rss_bot.settings import reader
|
||||
from discord_rss_bot.settings import get_reader
|
||||
|
||||
|
||||
def create_html_for_search_results(search_results: Iterable[EntrySearchResult]) -> str:
|
||||
def create_html_for_search_results(search_results: Iterable[EntrySearchResult], reader: Reader = None) -> str:
|
||||
"""Create HTML for the search results.
|
||||
|
||||
Args:
|
||||
search_results: The search results.
|
||||
reader: The reader. If None, we will get the reader from the settings.
|
||||
|
||||
Returns:
|
||||
str: The HTML.
|
||||
"""
|
||||
# TODO: There is a .content that also contains text, we should use that if .summary is not available.
|
||||
# TODO: We should also add <span> tags to the title.
|
||||
|
||||
# Get the default reader if we didn't get a custom one.
|
||||
if reader is None:
|
||||
reader = get_reader()
|
||||
|
||||
html: str = ""
|
||||
for result in search_results:
|
||||
if ".summary" in result.content:
|
||||
|
@ -51,31 +51,52 @@ def create_settings_file(settings_file_location) -> None:
|
||||
f.write(doc.as_string())
|
||||
|
||||
|
||||
def get_db_location(custom_name: str = "db.sqlite") -> str:
|
||||
def get_db_location(custom_location: str = "") -> str:
|
||||
"""Where we store the database file.
|
||||
|
||||
Args:
|
||||
custom_name: The name of the database file, defaults to db.sqlite.
|
||||
custom_location: Where the database file should be stored. This should be with the file name.
|
||||
|
||||
Returns:
|
||||
The database location.
|
||||
"""
|
||||
return os.path.join(data_dir, custom_name)
|
||||
# Use the custom location if it is provided.
|
||||
if custom_location:
|
||||
return custom_location
|
||||
else:
|
||||
return os.path.join(data_dir, "db.sqlite")
|
||||
|
||||
|
||||
def read_settings_file(custom_name: str = "settings.toml") -> TOMLDocument:
|
||||
def read_settings_file(custom_location: str = "") -> TOMLDocument:
|
||||
"""Read the settings file and return the settings as a dict.
|
||||
|
||||
Args:
|
||||
custom_name: The name of the settings file, defaults to settings.toml.
|
||||
custom_location: The name of the settings file, defaults to settings.toml.
|
||||
|
||||
Returns:
|
||||
dict: The settings file as a dict.
|
||||
"""
|
||||
settings_file: str = os.path.join(data_dir, custom_name)
|
||||
with open(settings_file, encoding="utf-8") as f:
|
||||
# Use the custom location if it is provided.
|
||||
if custom_location:
|
||||
settings_location = custom_location
|
||||
else:
|
||||
settings_location = os.path.join(data_dir, "settings.toml")
|
||||
|
||||
# Create the settings file if it doesn't exist.
|
||||
if not os.path.exists(settings_location):
|
||||
create_settings_file(settings_location)
|
||||
|
||||
# Read the settings file and return it as a dict.
|
||||
with open(settings_location, encoding="utf-8") as f:
|
||||
return parse(f.read())
|
||||
|
||||
|
||||
db_location: str = get_db_location()
|
||||
reader: Reader = make_reader(db_location)
|
||||
def get_reader(custom_location: str = "") -> Reader:
|
||||
"""Get the reader.
|
||||
|
||||
Args:
|
||||
custom_location: The location of the database file.
|
||||
|
||||
"""
|
||||
db_location: str = get_db_location(custom_location)
|
||||
return make_reader(url=db_location)
|
||||
|
Reference in New Issue
Block a user