from __future__ import annotations
import contextlib
import re
import urllib.parse
from dataclasses import dataclass
from dataclasses import field
from datetime import UTC
from datetime import datetime
from types import SimpleNamespace
from typing import TYPE_CHECKING
from typing import cast
from unittest.mock import MagicMock
from unittest.mock import patch
from fastapi.testclient import TestClient
import discord_rss_bot.main as main_module
from discord_rss_bot import feeds
from discord_rss_bot.main import app
from discord_rss_bot.main import create_html_for_feed
from discord_rss_bot.main import get_reader_dependency
if TYPE_CHECKING:
from pathlib import Path
import pytest
from httpx2 import Response
from reader import Entry
from reader import Reader
client: TestClient = TestClient(app)
webhook_name: str = "Hello, I am a webhook!"
webhook_url: str = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz"
feed_url: str = "https://lovinator.space/rss_test.xml"
type TestTagValue = str | bool | int | list[dict[str, str]] | feeds.JsonValue | None
type TestKwargValue = str | int | None
def encoded_feed_url(url: str) -> str:
return urllib.parse.quote(feed_url) if url else ""
def ensure_preview_feed_exists() -> Reader:
reader: Reader = get_reader_dependency()
with contextlib.suppress(Exception):
reader.add_feed(feed_url)
with contextlib.suppress(Exception):
reader.update_feed(feed_url)
return reader
def assert_social_preview_metadata(
response: Response,
*,
title: str,
description: str,
) -> None:
assert title in response.text
assert description in response.text
def test_search() -> None:
"""Test the /search page."""
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get("/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": encoded_feed_url(feed_url)})
# Delete the webhook if it already exists before we run the test.
response: Response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
assert response.status_code == 200, f"Failed to delete webhook: {response.text}"
# Add the webhook.
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Check that the feed was added.
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
# Search for an entry.
response: Response = client.get(url="/search/?query=a")
assert response.status_code == 200, f"Failed to search for entry: {response.text}"
def test_add_webhook() -> None:
"""Test the /add_webhook page."""
# Delete the webhook if it already exists before we run the test.
response: Response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
assert response.status_code == 200, f"Failed to delete webhook: {response.text}"
# Add the webhook.
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Check that the webhook was added.
response = client.get(url="/webhooks")
assert response.status_code == 200, f"Failed to get /webhooks: {response.text}"
assert webhook_name in response.text, f"Webhook not found in /webhooks: {response.text}"
def test_add_webhook_rejects_invalid_url() -> None:
"""Adding a webhook with a non-URL value should fail validation."""
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": "Invalid URL Hook", "webhook_url": "not-a-url"},
)
assert response.status_code == 400, f"Expected invalid webhook URL to be rejected: {response.text}"
assert "Invalid webhook URL" in response.text
def test_add_webhook_allows_valid_url_after_invalid_attempt() -> None:
"""A rejected invalid webhook URL should not prevent a later valid add."""
response: Response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
assert response.status_code == 200, f"Failed to delete webhook: {response.text}"
response = client.post(
url="/add_webhook",
data={"webhook_name": "Invalid URL Hook", "webhook_url": "not-a-url"},
)
assert response.status_code == 400, f"Expected invalid webhook URL to be rejected: {response.text}"
assert "Invalid webhook URL" in response.text
response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook after invalid attempt: {response.text}"
response = client.get(url="/webhooks")
assert response.status_code == 200, f"Failed to get /webhooks: {response.text}"
assert webhook_name in response.text, f"Webhook not found in /webhooks: {response.text}"
response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
assert response.status_code == 200, f"Failed to delete webhook: {response.text}"
def test_webhooks_page_handles_invalid_stored_webhook_url() -> None:
"""/webhooks should render even if a malformed webhook URL is present in storage."""
reader: Reader = get_reader_dependency()
malformed_webhook_name = "Malformed hook"
malformed_webhook_url = "definitely-not-a-url"
reader.set_tag((), "webhooks", [{"name": malformed_webhook_name, "url": malformed_webhook_url}]) # pyright: ignore[reportArgumentType]
response: Response = client.get(url="/webhooks")
assert response.status_code == 200, f"/webhooks should not crash for malformed URLs: {response.text}"
assert malformed_webhook_name in response.text
def test_create_feed() -> None:
"""Test the /create_feed page."""
# Ensure webhook exists for this test regardless of test order.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get(url="/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": encoded_feed_url(feed_url)})
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Check that the feed was added.
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
def test_create_feed_suggests_autodiscovered_links() -> None:
"""A page URL that fails to parse should render its advertised feed links."""
submitted_url = "https://example.com/blog"
discovered_url = "https://example.com/rss.xml"
stub_reader = MagicMock()
def get_tag(resource: str | tuple[()], key: str, default: TestTagValue = None) -> TestTagValue:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if resource == () and key == "delivery_mode":
return "embed"
if resource == submitted_url and key == ".reader.autodiscover":
return [{"href": discovered_url, "title": "Example feed", "type": "application/rss+xml"}]
return default
stub_reader.get_tag.side_effect = get_tag
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
try:
with patch.object(
main_module,
"create_feed",
side_effect=feeds.FeedUpdateError(status_code=404, detail="Error updating feed"),
):
response: Response = client.post(
url="/add",
data={"feed_url": submitted_url, "webhook_dropdown": webhook_name},
)
finally:
app.dependency_overrides = {}
assert response.status_code == 404
assert "Discovered feed links" in response.text
assert "Example feed" in response.text
assert discovered_url in response.text
assert "application/rss+xml" in response.text
assert f'value="{submitted_url}"' in response.text
assert f'value="{webhook_name}"' in response.text
def test_get() -> None:
"""Test the /create_feed page."""
# Ensure webhook exists for this test regardless of test order.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get("/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": encoded_feed_url(feed_url)})
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Check that the feed was added.
response = client.get("/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
response: Response = client.get(url="/add")
assert response.status_code == 200, f"/add failed: {response.text}"
response: Response = client.get(url="/add_webhook")
assert response.status_code == 200, f"/add_webhook failed: {response.text}"
response: Response = client.get(url="/blacklist", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/blacklist failed: {response.text}"
response: Response = client.get(url="/custom", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/custom failed: {response.text}"
response: Response = client.get(url="/embed", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/embed failed: {response.text}"
response: Response = client.get(url="/feed", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/feed failed: {response.text}"
assert "Feed Summary" in response.text
assert "This feed" in response.text
assert "Screenshot Delivery" in response.text
assert "Image Delivery" in response.text
assert 'type="range"' in response.text
assert 'max="10"' in response.text
response: Response = client.get(url="/")
assert response.status_code == 200, f"/ failed: {response.text}"
response: Response = client.get(url="/webhooks")
assert response.status_code == 200, f"/webhooks failed: {response.text}"
response = client.get(url="/webhook_entries", params={"webhook_url": webhook_url})
assert response.status_code == 200, f"/webhook_entries failed: {response.text}"
response: Response = client.get(url="/whitelist", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/whitelist failed: {response.text}"
def test_blacklist_page_uses_live_preview_layout() -> None:
ensure_preview_feed_exists()
response: Response = client.get(url="/blacklist", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/blacklist failed: {response.text}"
assert 'hx-get="/blacklist_preview"' in response.text
assert 'id="filter-preview"' in response.text
assert "Blacklist Rules" in response.text
def test_whitelist_page_uses_live_preview_layout() -> None:
ensure_preview_feed_exists()
response: Response = client.get(url="/whitelist", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/whitelist failed: {response.text}"
assert 'hx-get="/whitelist_preview"' in response.text
assert 'id="filter-preview"' in response.text
assert "Whitelist Rules" in response.text
def test_blacklist_preview_does_not_persist_unsaved_rules() -> None:
reader: Reader = ensure_preview_feed_exists()
reader.set_tag(feed_url, "blacklist_title", "saved-blacklist") # pyright: ignore[reportArgumentType]
try:
response: Response = client.get(
url="/blacklist_preview",
params={
"feed_url": feed_url,
"blacklist_title": "fvnnnfnfdnfdnfd",
},
)
assert response.status_code == 200, f"/blacklist_preview failed: {response.text}"
assert "Live preview" in response.text
assert reader.get_tag(feed_url, "blacklist_title", "") == "saved-blacklist"
finally:
with contextlib.suppress(Exception):
reader.delete_tag(feed_url, "blacklist_title")
def test_whitelist_preview_shows_blacklist_precedence() -> None:
reader: Reader = ensure_preview_feed_exists()
reader.set_tag(feed_url, "blacklist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
try:
response: Response = client.get(
url="/whitelist_preview",
params={
"feed_url": feed_url,
"whitelist_title": "fvnnnfnfdnfdnfd",
},
)
assert response.status_code == 200, f"/whitelist_preview failed: {response.text}"
assert "blacklist overrides whitelist" in response.text
assert "Skipped" in response.text
finally:
with contextlib.suppress(Exception):
reader.delete_tag(feed_url, "blacklist_title")
def test_blacklist_preview_uses_50_entry_limit() -> None:
@dataclass(slots=True)
class DummyContent:
value: str
@dataclass(slots=True)
class DummyFeed:
url: str
title: str
@dataclass(slots=True)
class DummyEntry:
id: str
feed: DummyFeed
title: str
summary: str
author: str
authors_str: str
link: str
published: datetime | None
content: list[DummyContent] = field(default_factory=lambda: [DummyContent("content")])
class StubReader:
def __init__(self) -> None:
self.feed = DummyFeed(url="https://example.com/filter-preview.xml", title="Preview Feed")
self.recorded_limit: int | None = None
self.entries: list[Entry] = [
cast(
"Entry",
DummyEntry(
id=f"entry-{index}",
feed=self.feed,
title=f"Entry {index}",
summary=f"Summary {index}",
author="Author",
authors_str="Author",
link=f"https://example.com/entry-{index}",
published=datetime(2024, 1, 1, tzinfo=UTC),
),
)
for index in range(60)
]
def get_feed(self, _feed_url: str) -> DummyFeed:
return self.feed
def get_entries(self, **kwargs: TestKwargValue) -> list[Entry]:
limit = kwargs.get("limit")
self.recorded_limit = limit if isinstance(limit, int) else None
if isinstance(limit, int):
return self.entries[:limit]
return self.entries
def get_tag(self, _resource: str | DummyFeed, _key: str, default: TestTagValue = None) -> TestTagValue:
return default
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
try:
with patch("discord_rss_bot.main.create_html_for_feed", return_value="
Rendered
"):
response: Response = client.get(
url="/blacklist_preview",
params={"feed_url": stub_reader.feed.url},
)
assert response.status_code == 200, f"/blacklist_preview failed: {response.text}"
assert stub_reader.recorded_limit == 50, (
f"Expected preview to request 50 entries, got {stub_reader.recorded_limit}"
)
assert "50 checked" in response.text
finally:
app.dependency_overrides = {}
def test_blacklist_preview_shows_labeled_field_values_for_substring_match() -> None:
@dataclass(slots=True)
class DummyContent:
value: str
@dataclass(slots=True)
class DummyFeed:
url: str
title: str
@dataclass(slots=True)
class DummyEntry:
id: str
feed: DummyFeed
title: str
summary: str
author: str
authors_str: str
link: str
published: datetime | None
content: list[DummyContent] = field(default_factory=list)
class StubReader:
def __init__(self) -> None:
self.feed = DummyFeed(url="https://example.com/wow.xml", title="Warcraft Feed")
self.entries: list[Entry] = [
cast(
"Entry",
DummyEntry(
id="wow-1",
feed=self.feed,
title="World of Warcraft",
summary="Massive MMO news update
",
author="Legacy Blizzard Author",
authors_str="Blizzard Author One, Blizzard Author Two",
link="https://example.com/wow-1",
published=datetime(2024, 1, 1, tzinfo=UTC),
content=[DummyContent("The expansion launches soon.
")],
),
),
]
def get_feed(self, _feed_url: str) -> DummyFeed:
return self.feed
def get_entries(self, **_kwargs: TestKwargValue) -> list[Entry]:
return self.entries
def get_tag(self, _resource: str | DummyFeed, _key: str, default: TestTagValue = None) -> TestTagValue:
return default
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
try: # noqa: PLW0717
with patch("discord_rss_bot.main.create_html_for_feed", return_value="Rendered
"):
response: Response = client.get(
url="/blacklist_preview",
params={
"feed_url": stub_reader.feed.url,
"blacklist_title": "orld",
},
)
assert response.status_code == 200, f"/blacklist_preview failed: {response.text}"
assert "Skipped" in response.text
assert "World of Warcraft" in response.text
assert "Title" in response.text
assert "Author" in response.text
assert "Description" in response.text
assert "Content" in response.text
assert "filter-preview__field-row" in response.text
assert "filter-preview__match" in response.text
assert 'orld' in response.text
assert "Massive MMO news update" in response.text
assert "The expansion launches soon." in response.text
assert "By Blizzard Author One, Blizzard Author Two |" in response.text
assert "Legacy Blizzard Author" not in response.text
finally:
app.dependency_overrides = {}
def test_author_templates_render_authors_str() -> None:
request = SimpleNamespace(url="https://example.com/page", base_url="https://example.com/")
feed = SimpleNamespace(
title="Example Feed",
url="https://example.com/feed.xml",
author="Legacy Feed Author",
authors_str="Feed Author One, Feed Author Two",
added=None,
last_exception=None,
last_updated=None,
link="https://example.com/feed",
subtitle="",
updated=None,
updates_enabled=True,
user_title="",
version="atom10",
)
entry = SimpleNamespace(
id="entry-1",
title="Entry Title",
link="https://example.com/entry-1",
author="Legacy Entry Author",
authors_str="Entry Author One, Entry Author Two",
added=None,
content=[],
important=False,
published=None,
read=False,
read_modified=None,
summary="Summary",
updated=None,
)
filter_row = SimpleNamespace(
entry=entry,
published_label="Never",
status_class="success",
status_label="Sent",
decision=SimpleNamespace(reason="Sent", blacklist_match=None, whitelist_match=None),
field_rows=[],
first_image="",
)
preview_summary = SimpleNamespace(
total=1,
sent=1,
skipped=0,
blacklist_matches=0,
whitelist_matches=0,
)
custom_html: str = main_module.templates.get_template("custom.html").render(
request=request,
feed=feed,
entry=entry,
custom_message="",
)
embed_html: str = main_module.templates.get_template("embed.html").render(
request=request,
feed=feed,
entry=entry,
)
filter_preview_html: str = main_module.templates.get_template("_filter_preview.html").render(
feed=feed,
preview_limit=50,
preview_summary=preview_summary,
preview_helper_text="",
preview_rows=[filter_row],
)
for html in (custom_html, embed_html):
assert "Feed Author One, Feed Author Two" in html
assert "Entry Author One, Entry Author Two" in html
assert "Legacy Feed Author" not in html
assert "Legacy Entry Author" not in html
assert "By Entry Author One, Entry Author Two |" in filter_preview_html
assert "Legacy Entry Author" not in filter_preview_html
def test_settings_page_shows_screenshot_layout_setting() -> None:
response: Response = client.get(url="/settings")
assert response.status_code == 200, f"/settings failed: {response.text}"
assert "Default delivery mode for new feeds" in response.text
assert "Default screenshot layout for new feeds" in response.text
assert "uv run playwright install chromium" in response.text
def test_set_global_delivery_mode() -> None:
response: Response = client.post(url="/set_global_delivery_mode", data={"delivery_mode": "text"})
assert response.status_code == 200, f"Failed to set global delivery mode: {response.text}"
response = client.get(url="/settings")
assert response.status_code == 200, f"/settings failed after setting delivery mode: {response.text}"
assert re.search(r"