Add type hints and other small stuff
This commit is contained in:
@ -30,7 +30,7 @@ from requests import Response
|
|||||||
from discord_rss_bot.settings import get_reader
|
from discord_rss_bot.settings import get_reader
|
||||||
|
|
||||||
|
|
||||||
def send_to_discord(reader: Reader = None, feed=None, do_once=False) -> None:
|
def send_to_discord(custom_reader: Reader | None = None, feed=None, do_once=False) -> None:
|
||||||
"""
|
"""
|
||||||
Send entries to Discord.
|
Send entries to Discord.
|
||||||
|
|
||||||
@ -45,8 +45,7 @@ def send_to_discord(reader: Reader = None, feed=None, do_once=False) -> None:
|
|||||||
Response: The response from the webhook.
|
Response: The response from the webhook.
|
||||||
"""
|
"""
|
||||||
# Get the default reader if we didn't get a custom one.
|
# Get the default reader if we didn't get a custom one.
|
||||||
if reader is None:
|
reader: Reader = get_reader() if custom_reader is None else custom_reader
|
||||||
reader = get_reader()
|
|
||||||
|
|
||||||
# If we should get all entries, or just the entries from a specific feed.
|
# If we should get all entries, or just the entries from a specific feed.
|
||||||
if feed is None:
|
if feed is None:
|
||||||
|
@ -44,6 +44,7 @@ from reader import (
|
|||||||
EntrySearchResult,
|
EntrySearchResult,
|
||||||
Feed,
|
Feed,
|
||||||
FeedCounts,
|
FeedCounts,
|
||||||
|
Reader,
|
||||||
)
|
)
|
||||||
from starlette.templating import _TemplateResponse
|
from starlette.templating import _TemplateResponse
|
||||||
from tomlkit.toml_document import TOMLDocument
|
from tomlkit.toml_document import TOMLDocument
|
||||||
@ -56,7 +57,7 @@ app: FastAPI = FastAPI()
|
|||||||
app.mount("/static", StaticFiles(directory="discord_rss_bot/static"), name="static")
|
app.mount("/static", StaticFiles(directory="discord_rss_bot/static"), name="static")
|
||||||
templates: Jinja2Templates = Jinja2Templates(directory="discord_rss_bot/templates")
|
templates: Jinja2Templates = Jinja2Templates(directory="discord_rss_bot/templates")
|
||||||
|
|
||||||
reader = get_reader()
|
reader: Reader = get_reader()
|
||||||
|
|
||||||
|
|
||||||
def encode_url(url_to_quote: str) -> str:
|
def encode_url(url_to_quote: str) -> str:
|
||||||
@ -99,8 +100,8 @@ async def create_feed(feed_url: str = Form(), webhook_dropdown: str = Form()) ->
|
|||||||
reader.set_entry_read(entry, True)
|
reader.set_entry_read(entry, True)
|
||||||
|
|
||||||
settings: TOMLDocument = read_settings_file()
|
settings: TOMLDocument = read_settings_file()
|
||||||
webhook_url: str = str(settings["webhooks"][webhook_dropdown])
|
webhook_url: str = str(settings["webhooks"][webhook_dropdown]) # type: ignore
|
||||||
reader.set_tag(clean_feed_url, "webhook", webhook_url)
|
reader.set_tag(clean_feed_url, "webhook", webhook_url) # type: ignore
|
||||||
reader.get_tag(clean_feed_url, "webhook")
|
reader.get_tag(clean_feed_url, "webhook")
|
||||||
|
|
||||||
reader.update_search()
|
reader.update_search()
|
||||||
@ -112,8 +113,9 @@ def create_list_of_webhooks() -> list[dict[str, str]]:
|
|||||||
"""List with webhooks."""
|
"""List with webhooks."""
|
||||||
settings: TOMLDocument = read_settings_file()
|
settings: TOMLDocument = read_settings_file()
|
||||||
list_of_webhooks: list[dict[str, str]] = []
|
list_of_webhooks: list[dict[str, str]] = []
|
||||||
for hook in settings["webhooks"]:
|
|
||||||
list_of_webhooks.append({"name": hook, "url": settings["webhooks"][hook]})
|
for hook in settings["webhooks"]: # type: ignore
|
||||||
|
list_of_webhooks.append({"name": hook, "url": settings["webhooks"][hook]}) # type: ignore
|
||||||
|
|
||||||
return list_of_webhooks
|
return list_of_webhooks
|
||||||
|
|
||||||
@ -230,7 +232,7 @@ async def remove_feed(feed_url: str = Form()) -> RedirectResponse:
|
|||||||
reader.delete_feed(feed_url)
|
reader.delete_feed(feed_url)
|
||||||
reader.update_search()
|
reader.update_search()
|
||||||
|
|
||||||
return RedirectResponse(url=f"/", status_code=303)
|
return RedirectResponse(url="/", status_code=303)
|
||||||
|
|
||||||
|
|
||||||
@app.get("/search", response_class=HTMLResponse)
|
@app.get("/search", response_class=HTMLResponse)
|
||||||
|
@ -6,7 +6,9 @@ from reader import EntrySearchResult, Feed, HighlightedString, Reader
|
|||||||
from discord_rss_bot.settings import get_reader
|
from discord_rss_bot.settings import get_reader
|
||||||
|
|
||||||
|
|
||||||
def create_html_for_search_results(search_results: Iterable[EntrySearchResult], reader: Reader = None) -> str:
|
def create_html_for_search_results(
|
||||||
|
search_results: Iterable[EntrySearchResult], custom_reader: Reader | None = None
|
||||||
|
) -> str:
|
||||||
"""Create HTML for the search results.
|
"""Create HTML for the search results.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -20,8 +22,7 @@ def create_html_for_search_results(search_results: Iterable[EntrySearchResult],
|
|||||||
# TODO: We should also add <span> tags to the title.
|
# TODO: We should also add <span> tags to the title.
|
||||||
|
|
||||||
# Get the default reader if we didn't get a custom one.
|
# Get the default reader if we didn't get a custom one.
|
||||||
if reader is None:
|
reader: Reader = get_reader() if custom_reader is None else custom_reader
|
||||||
reader = get_reader()
|
|
||||||
|
|
||||||
html: str = ""
|
html: str = ""
|
||||||
for result in search_results:
|
for result in search_results:
|
||||||
|
@ -47,7 +47,7 @@ def create_settings_file(settings_file_location) -> None:
|
|||||||
doc.add("database", database)
|
doc.add("database", database)
|
||||||
|
|
||||||
# Write the settings file
|
# Write the settings file
|
||||||
with open(settings_file_location, "w") as f:
|
with open(settings_file_location, "w", encoding="utf-8") as f:
|
||||||
f.write(doc.as_string())
|
f.write(doc.as_string())
|
||||||
|
|
||||||
|
|
||||||
@ -61,10 +61,7 @@ def get_db_location(custom_location: str = "") -> str:
|
|||||||
The database location.
|
The database location.
|
||||||
"""
|
"""
|
||||||
# Use the custom location if it is provided.
|
# Use the custom location if it is provided.
|
||||||
if custom_location:
|
return custom_location or os.path.join(data_dir, "db.sqlite")
|
||||||
return custom_location
|
|
||||||
else:
|
|
||||||
return os.path.join(data_dir, "db.sqlite")
|
|
||||||
|
|
||||||
|
|
||||||
def read_settings_file(custom_location: str = "") -> TOMLDocument:
|
def read_settings_file(custom_location: str = "") -> TOMLDocument:
|
||||||
@ -77,10 +74,7 @@ def read_settings_file(custom_location: str = "") -> TOMLDocument:
|
|||||||
dict: The settings file as a dict.
|
dict: The settings file as a dict.
|
||||||
"""
|
"""
|
||||||
# Use the custom location if it is provided.
|
# Use the custom location if it is provided.
|
||||||
if custom_location:
|
settings_location: str = custom_location or os.path.join(data_dir, "settings.toml")
|
||||||
settings_location = custom_location
|
|
||||||
else:
|
|
||||||
settings_location = os.path.join(data_dir, "settings.toml")
|
|
||||||
|
|
||||||
# Create the settings file if it doesn't exist.
|
# Create the settings file if it doesn't exist.
|
||||||
if not os.path.exists(settings_location):
|
if not os.path.exists(settings_location):
|
||||||
|
@ -2,7 +2,7 @@ import os
|
|||||||
import tempfile
|
import tempfile
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from reader import make_reader
|
from reader import Feed, Reader, make_reader
|
||||||
|
|
||||||
from discord_rss_bot.feeds import send_to_discord
|
from discord_rss_bot.feeds import send_to_discord
|
||||||
|
|
||||||
@ -15,7 +15,7 @@ def test_send_to_discord() -> None:
|
|||||||
assert os.path.exists(temp_dir)
|
assert os.path.exists(temp_dir)
|
||||||
|
|
||||||
# Create a temporary reader.
|
# Create a temporary reader.
|
||||||
reader = make_reader(url=str(Path(temp_dir, "test_db.sqlite")))
|
reader: Reader = make_reader(url=str(Path(temp_dir, "test_db.sqlite")))
|
||||||
assert reader is not None
|
assert reader is not None
|
||||||
|
|
||||||
# Add a feed to the reader.
|
# Add a feed to the reader.
|
||||||
@ -25,19 +25,19 @@ def test_send_to_discord() -> None:
|
|||||||
reader.update_feeds()
|
reader.update_feeds()
|
||||||
|
|
||||||
# Get the feed.
|
# Get the feed.
|
||||||
feed = reader.get_feed("https://www.reddit.com/r/Python/.rss")
|
feed: Feed = reader.get_feed("https://www.reddit.com/r/Python/.rss")
|
||||||
assert feed is not None
|
assert feed is not None
|
||||||
|
|
||||||
# Get the webhook.
|
# Get the webhook.
|
||||||
webhook_url = os.environ.get("TEST_WEBHOOK_URL")
|
webhook_url: str | None = os.environ.get("TEST_WEBHOOK_URL")
|
||||||
assert webhook_url is not None
|
assert webhook_url is not None
|
||||||
|
|
||||||
# Add tag to the feed and check if it's there.
|
# Add tag to the feed and check if it's there.
|
||||||
reader.set_tag(feed, "webhook", webhook_url)
|
reader.set_tag(feed, "webhook", webhook_url) # type: ignore
|
||||||
assert reader.get_tag(feed, "webhook") == webhook_url
|
assert reader.get_tag(feed, "webhook") == webhook_url
|
||||||
|
|
||||||
# Send the feed to Discord.
|
# Send the feed to Discord.
|
||||||
send_to_discord(reader=reader, feed=feed, do_once=True)
|
send_to_discord(custom_reader=reader, feed=feed, do_once=True)
|
||||||
|
|
||||||
# Close the reader, so we can delete the directory.
|
# Close the reader, so we can delete the directory.
|
||||||
reader.close()
|
reader.close()
|
||||||
|
@ -1,26 +1,33 @@
|
|||||||
|
from typing import Literal
|
||||||
|
|
||||||
from fastapi.testclient import TestClient
|
from fastapi.testclient import TestClient
|
||||||
|
from httpx import Response
|
||||||
|
|
||||||
from discord_rss_bot.main import app, create_list_of_webhooks, encode_url
|
from discord_rss_bot.main import app, encode_url
|
||||||
|
|
||||||
client = TestClient(app)
|
client: TestClient = TestClient(app)
|
||||||
|
|
||||||
|
|
||||||
def test_read_main():
|
def test_read_main() -> None:
|
||||||
response = client.get("/")
|
"""Test the main page."""
|
||||||
|
response: Response = client.get("/")
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
def test_add():
|
def test_add() -> None:
|
||||||
response = client.get("/add")
|
"""Test the /add page."""
|
||||||
|
response: Response = client.get("/add")
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
def test_search():
|
def test_search() -> None:
|
||||||
response = client.get("/search/?query=a")
|
"""Test the /search page."""
|
||||||
|
response: Response = client.get("/search/?query=a")
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
def test_encode_url():
|
def test_encode_url() -> None:
|
||||||
before = "https://www.google.com/"
|
"""Test the encode_url function."""
|
||||||
after = "https%3A//www.google.com/"
|
before: Literal["https://www.google.com/"] = "https://www.google.com/"
|
||||||
|
after: Literal["https%3A//www.google.com/"] = "https%3A//www.google.com/"
|
||||||
assert encode_url(url_to_quote=before) == after
|
assert encode_url(url_to_quote=before) == after
|
||||||
|
@ -1,10 +1,11 @@
|
|||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from typing import Iterable
|
||||||
|
|
||||||
from reader import make_reader
|
from reader import EntrySearchResult, Feed, Reader, make_reader
|
||||||
|
|
||||||
from discord_rss_bot.search import add_span_with_slice, create_html_for_search_results
|
from discord_rss_bot.search import create_html_for_search_results
|
||||||
|
|
||||||
|
|
||||||
def test_create_html_for_search_results() -> None:
|
def test_create_html_for_search_results() -> None:
|
||||||
@ -16,7 +17,7 @@ def test_create_html_for_search_results() -> None:
|
|||||||
assert os.path.exists(temp_dir)
|
assert os.path.exists(temp_dir)
|
||||||
|
|
||||||
# Create a temporary reader.
|
# Create a temporary reader.
|
||||||
reader = make_reader(url=str(Path(temp_dir, "test_db.sqlite")))
|
reader: Reader = make_reader(url=str(Path(temp_dir, "test_db.sqlite")))
|
||||||
assert reader is not None
|
assert reader is not None
|
||||||
|
|
||||||
# Add a feed to the reader.
|
# Add a feed to the reader.
|
||||||
@ -26,7 +27,7 @@ def test_create_html_for_search_results() -> None:
|
|||||||
reader.update_feeds()
|
reader.update_feeds()
|
||||||
|
|
||||||
# Get the feed.
|
# Get the feed.
|
||||||
feed = reader.get_feed("https://www.reddit.com/r/Python/.rss")
|
feed: Feed = reader.get_feed("https://www.reddit.com/r/Python/.rss")
|
||||||
assert feed is not None
|
assert feed is not None
|
||||||
|
|
||||||
# Update the search index.
|
# Update the search index.
|
||||||
@ -34,7 +35,7 @@ def test_create_html_for_search_results() -> None:
|
|||||||
reader.update_search()
|
reader.update_search()
|
||||||
|
|
||||||
# Get the HTML for the search results.
|
# Get the HTML for the search results.
|
||||||
search_results = reader.search_entries("a", feed=feed)
|
search_results: Iterable[EntrySearchResult] = reader.search_entries("a", feed=feed)
|
||||||
|
|
||||||
# Create the HTML and check if it is not empty.
|
# Create the HTML and check if it is not empty.
|
||||||
search_html: str = create_html_for_search_results(search_results, reader)
|
search_html: str = create_html_for_search_results(search_results, reader)
|
||||||
|
@ -6,13 +6,19 @@ from platformdirs import user_data_dir
|
|||||||
from reader import Reader
|
from reader import Reader
|
||||||
from tomlkit import TOMLDocument
|
from tomlkit import TOMLDocument
|
||||||
|
|
||||||
from discord_rss_bot.settings import create_settings_file, data_dir, get_db_location, get_reader, read_settings_file
|
from discord_rss_bot.settings import (
|
||||||
|
create_settings_file,
|
||||||
|
data_dir,
|
||||||
|
get_db_location,
|
||||||
|
get_reader,
|
||||||
|
read_settings_file,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_read_settings_file():
|
def test_read_settings_file() -> None:
|
||||||
"""Test reading the settings file."""
|
"""Test reading the settings file."""
|
||||||
with tempfile.TemporaryDirectory() as temp_dir:
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
custom_loc = os.path.join(temp_dir, "test_settings.toml")
|
custom_loc: str = os.path.join(temp_dir, "test_settings.toml")
|
||||||
|
|
||||||
# File should not exist yet should this should fail.
|
# File should not exist yet should this should fail.
|
||||||
assert not os.path.exists(custom_loc)
|
assert not os.path.exists(custom_loc)
|
||||||
@ -31,10 +37,10 @@ def test_read_settings_file():
|
|||||||
assert settings["database"] == {}
|
assert settings["database"] == {}
|
||||||
|
|
||||||
|
|
||||||
def test_get_db_location():
|
def test_get_db_location() -> None:
|
||||||
"""Test getting the database location."""
|
"""Test getting the database location."""
|
||||||
with tempfile.TemporaryDirectory() as temp_dir:
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
custom_loc = os.path.join(temp_dir, "test_db.sqlite")
|
custom_loc: str = os.path.join(temp_dir, "test_db.sqlite")
|
||||||
|
|
||||||
# File should not exist yet.
|
# File should not exist yet.
|
||||||
assert not os.path.exists(custom_loc)
|
assert not os.path.exists(custom_loc)
|
||||||
@ -43,13 +49,13 @@ def test_get_db_location():
|
|||||||
assert get_db_location(custom_location=custom_loc) == os.path.join(temp_dir, "test_db.sqlite")
|
assert get_db_location(custom_location=custom_loc) == os.path.join(temp_dir, "test_db.sqlite")
|
||||||
|
|
||||||
# Test with the default location
|
# Test with the default location
|
||||||
loc = user_data_dir(appname="discord_rss_bot", appauthor="TheLovinator", roaming=True)
|
loc: str = user_data_dir(appname="discord_rss_bot", appauthor="TheLovinator", roaming=True)
|
||||||
assert get_db_location() == os.path.join(loc, "db.sqlite")
|
assert get_db_location() == os.path.join(loc, "db.sqlite")
|
||||||
|
|
||||||
|
|
||||||
def test_reader():
|
def test_reader() -> None:
|
||||||
"""Test the reader."""
|
"""Test the reader."""
|
||||||
reader = get_reader()
|
reader: Reader = get_reader()
|
||||||
assert isinstance(reader, Reader)
|
assert isinstance(reader, Reader)
|
||||||
|
|
||||||
# Test the reader with a custom location.
|
# Test the reader with a custom location.
|
||||||
@ -57,15 +63,15 @@ def test_reader():
|
|||||||
# Create the temp directory
|
# Create the temp directory
|
||||||
os.makedirs(temp_dir, exist_ok=True)
|
os.makedirs(temp_dir, exist_ok=True)
|
||||||
|
|
||||||
custom_loc = pathlib.Path(temp_dir, "custom_loc_db.sqlite")
|
custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite")
|
||||||
custom_reader = get_reader(custom_location=str(custom_loc))
|
custom_reader: Reader = get_reader(custom_location=str(custom_loc))
|
||||||
assert isinstance(custom_reader, Reader)
|
assert isinstance(custom_reader, Reader)
|
||||||
|
|
||||||
# Close the reader, so we can delete the directory.
|
# Close the reader, so we can delete the directory.
|
||||||
custom_reader.close()
|
custom_reader.close()
|
||||||
|
|
||||||
|
|
||||||
def test_create_settings_file():
|
def test_create_settings_file() -> None:
|
||||||
"""Test creating the settings file."""
|
"""Test creating the settings file."""
|
||||||
with tempfile.TemporaryDirectory() as temp_dir:
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
settings_file_location: str = os.path.join(temp_dir, "settings.toml")
|
settings_file_location: str = os.path.join(temp_dir, "settings.toml")
|
||||||
@ -78,6 +84,6 @@ def test_create_settings_file():
|
|||||||
assert os.path.exists(settings_file_location)
|
assert os.path.exists(settings_file_location)
|
||||||
|
|
||||||
|
|
||||||
def test_data_dir():
|
def test_data_dir() -> None:
|
||||||
"""Test the data directory."""
|
"""Test the data directory."""
|
||||||
assert os.path.exists(data_dir)
|
assert os.path.exists(data_dir)
|
||||||
|
Reference in New Issue
Block a user