from __future__ import annotations
import contextlib
import re
import urllib.parse
from dataclasses import dataclass
from dataclasses import field
from datetime import UTC
from datetime import datetime
from typing import TYPE_CHECKING
from typing import cast
from unittest.mock import MagicMock
from unittest.mock import patch
from fastapi.testclient import TestClient
import discord_rss_bot.main as main_module
from discord_rss_bot import feeds
from discord_rss_bot.main import app
from discord_rss_bot.main import create_html_for_feed
from discord_rss_bot.main import get_reader_dependency
if TYPE_CHECKING:
from pathlib import Path
import pytest
from httpx import Response
from reader import Entry
from reader import Reader
client: TestClient = TestClient(app)
webhook_name: str = "Hello, I am a webhook!"
webhook_url: str = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz"
feed_url: str = "https://lovinator.space/rss_test.xml"
type TestTagValue = str | bool | int | list[dict[str, str]] | feeds.JsonValue | None
type TestKwargValue = str | int | None
def encoded_feed_url(url: str) -> str:
return urllib.parse.quote(feed_url) if url else ""
def ensure_preview_feed_exists() -> Reader:
reader: Reader = get_reader_dependency()
with contextlib.suppress(Exception):
reader.add_feed(feed_url)
with contextlib.suppress(Exception):
reader.update_feed(feed_url)
return reader
def assert_social_preview_metadata(
response: Response,
*,
title: str,
description: str,
) -> None:
assert title in response.text
assert description in response.text
def test_search() -> None:
"""Test the /search page."""
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get("/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": encoded_feed_url(feed_url)})
# Delete the webhook if it already exists before we run the test.
response: Response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
assert response.status_code == 200, f"Failed to delete webhook: {response.text}"
# Add the webhook.
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Check that the feed was added.
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
# Search for an entry.
response: Response = client.get(url="/search/?query=a")
assert response.status_code == 200, f"Failed to search for entry: {response.text}"
def test_add_webhook() -> None:
"""Test the /add_webhook page."""
# Delete the webhook if it already exists before we run the test.
response: Response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
assert response.status_code == 200, f"Failed to delete webhook: {response.text}"
# Add the webhook.
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Check that the webhook was added.
response = client.get(url="/webhooks")
assert response.status_code == 200, f"Failed to get /webhooks: {response.text}"
assert webhook_name in response.text, f"Webhook not found in /webhooks: {response.text}"
def test_add_webhook_rejects_invalid_url() -> None:
"""Adding a webhook with a non-URL value should fail validation."""
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": "Invalid URL Hook", "webhook_url": "not-a-url"},
)
assert response.status_code == 400, f"Expected invalid webhook URL to be rejected: {response.text}"
assert "Invalid webhook URL" in response.text
def test_add_webhook_allows_valid_url_after_invalid_attempt() -> None:
"""A rejected invalid webhook URL should not prevent a later valid add."""
response: Response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
assert response.status_code == 200, f"Failed to delete webhook: {response.text}"
response = client.post(
url="/add_webhook",
data={"webhook_name": "Invalid URL Hook", "webhook_url": "not-a-url"},
)
assert response.status_code == 400, f"Expected invalid webhook URL to be rejected: {response.text}"
assert "Invalid webhook URL" in response.text
response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook after invalid attempt: {response.text}"
response = client.get(url="/webhooks")
assert response.status_code == 200, f"Failed to get /webhooks: {response.text}"
assert webhook_name in response.text, f"Webhook not found in /webhooks: {response.text}"
response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
assert response.status_code == 200, f"Failed to delete webhook: {response.text}"
def test_webhooks_page_handles_invalid_stored_webhook_url() -> None:
"""/webhooks should render even if a malformed webhook URL is present in storage."""
reader: Reader = get_reader_dependency()
malformed_webhook_name = "Malformed hook"
malformed_webhook_url = "definitely-not-a-url"
reader.set_tag((), "webhooks", [{"name": malformed_webhook_name, "url": malformed_webhook_url}]) # pyright: ignore[reportArgumentType]
response: Response = client.get(url="/webhooks")
assert response.status_code == 200, f"/webhooks should not crash for malformed URLs: {response.text}"
assert malformed_webhook_name in response.text
def test_create_feed() -> None:
"""Test the /create_feed page."""
# Ensure webhook exists for this test regardless of test order.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get(url="/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": encoded_feed_url(feed_url)})
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Check that the feed was added.
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
def test_get() -> None:
"""Test the /create_feed page."""
# Ensure webhook exists for this test regardless of test order.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get("/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": encoded_feed_url(feed_url)})
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Check that the feed was added.
response = client.get("/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
response: Response = client.get(url="/add")
assert response.status_code == 200, f"/add failed: {response.text}"
response: Response = client.get(url="/add_webhook")
assert response.status_code == 200, f"/add_webhook failed: {response.text}"
response: Response = client.get(url="/blacklist", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/blacklist failed: {response.text}"
response: Response = client.get(url="/custom", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/custom failed: {response.text}"
response: Response = client.get(url="/embed", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/embed failed: {response.text}"
response: Response = client.get(url="/feed", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/feed failed: {response.text}"
assert "Feed Summary" in response.text
assert "This feed" in response.text
assert "Screenshot Delivery" in response.text
assert "Image Delivery" in response.text
assert 'type="range"' in response.text
assert 'max="10"' in response.text
response: Response = client.get(url="/")
assert response.status_code == 200, f"/ failed: {response.text}"
response: Response = client.get(url="/webhooks")
assert response.status_code == 200, f"/webhooks failed: {response.text}"
response = client.get(url="/webhook_entries", params={"webhook_url": webhook_url})
assert response.status_code == 200, f"/webhook_entries failed: {response.text}"
response: Response = client.get(url="/whitelist", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/whitelist failed: {response.text}"
def test_blacklist_page_uses_live_preview_layout() -> None:
ensure_preview_feed_exists()
response: Response = client.get(url="/blacklist", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/blacklist failed: {response.text}"
assert 'hx-get="/blacklist_preview"' in response.text
assert 'id="filter-preview"' in response.text
assert "Blacklist Rules" in response.text
def test_whitelist_page_uses_live_preview_layout() -> None:
ensure_preview_feed_exists()
response: Response = client.get(url="/whitelist", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/whitelist failed: {response.text}"
assert 'hx-get="/whitelist_preview"' in response.text
assert 'id="filter-preview"' in response.text
assert "Whitelist Rules" in response.text
def test_blacklist_preview_does_not_persist_unsaved_rules() -> None:
reader: Reader = ensure_preview_feed_exists()
reader.set_tag(feed_url, "blacklist_title", "saved-blacklist") # pyright: ignore[reportArgumentType]
try:
response: Response = client.get(
url="/blacklist_preview",
params={
"feed_url": feed_url,
"blacklist_title": "fvnnnfnfdnfdnfd",
},
)
assert response.status_code == 200, f"/blacklist_preview failed: {response.text}"
assert "Live preview" in response.text
assert reader.get_tag(feed_url, "blacklist_title", "") == "saved-blacklist"
finally:
with contextlib.suppress(Exception):
reader.delete_tag(feed_url, "blacklist_title")
def test_whitelist_preview_shows_blacklist_precedence() -> None:
reader: Reader = ensure_preview_feed_exists()
reader.set_tag(feed_url, "blacklist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
try:
response: Response = client.get(
url="/whitelist_preview",
params={
"feed_url": feed_url,
"whitelist_title": "fvnnnfnfdnfdnfd",
},
)
assert response.status_code == 200, f"/whitelist_preview failed: {response.text}"
assert "blacklist overrides whitelist" in response.text
assert "Skipped" in response.text
finally:
with contextlib.suppress(Exception):
reader.delete_tag(feed_url, "blacklist_title")
def test_blacklist_preview_uses_50_entry_limit() -> None:
@dataclass(slots=True)
class DummyContent:
value: str
@dataclass(slots=True)
class DummyFeed:
url: str
title: str
@dataclass(slots=True)
class DummyEntry:
id: str
feed: DummyFeed
title: str
summary: str
author: str
link: str
published: datetime | None
content: list[DummyContent] = field(default_factory=lambda: [DummyContent("content")])
class StubReader:
def __init__(self) -> None:
self.feed = DummyFeed(url="https://example.com/filter-preview.xml", title="Preview Feed")
self.recorded_limit: int | None = None
self.entries: list[Entry] = [
cast(
"Entry",
DummyEntry(
id=f"entry-{index}",
feed=self.feed,
title=f"Entry {index}",
summary=f"Summary {index}",
author="Author",
link=f"https://example.com/entry-{index}",
published=datetime(2024, 1, 1, tzinfo=UTC),
),
)
for index in range(60)
]
def get_feed(self, _feed_url: str) -> DummyFeed:
return self.feed
def get_entries(self, **kwargs: TestKwargValue) -> list[Entry]:
limit = kwargs.get("limit")
self.recorded_limit = limit if isinstance(limit, int) else None
if isinstance(limit, int):
return self.entries[:limit]
return self.entries
def get_tag(self, _resource: str | DummyFeed, _key: str, default: TestTagValue = None) -> TestTagValue:
return default
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
try:
with patch("discord_rss_bot.main.create_html_for_feed", return_value="
Rendered
"):
response: Response = client.get(
url="/blacklist_preview",
params={"feed_url": stub_reader.feed.url},
)
assert response.status_code == 200, f"/blacklist_preview failed: {response.text}"
assert stub_reader.recorded_limit == 50, (
f"Expected preview to request 50 entries, got {stub_reader.recorded_limit}"
)
assert "50 checked" in response.text
finally:
app.dependency_overrides = {}
def test_blacklist_preview_shows_labeled_field_values_for_substring_match() -> None:
@dataclass(slots=True)
class DummyContent:
value: str
@dataclass(slots=True)
class DummyFeed:
url: str
title: str
@dataclass(slots=True)
class DummyEntry:
id: str
feed: DummyFeed
title: str
summary: str
author: str
link: str
published: datetime | None
content: list[DummyContent] = field(default_factory=list)
class StubReader:
def __init__(self) -> None:
self.feed = DummyFeed(url="https://example.com/wow.xml", title="Warcraft Feed")
self.entries: list[Entry] = [
cast(
"Entry",
DummyEntry(
id="wow-1",
feed=self.feed,
title="World of Warcraft",
summary="Massive MMO news update
",
author="Blizzard",
link="https://example.com/wow-1",
published=datetime(2024, 1, 1, tzinfo=UTC),
content=[DummyContent("The expansion launches soon.
")],
),
),
]
def get_feed(self, _feed_url: str) -> DummyFeed:
return self.feed
def get_entries(self, **_kwargs: TestKwargValue) -> list[Entry]:
return self.entries
def get_tag(self, _resource: str | DummyFeed, _key: str, default: TestTagValue = None) -> TestTagValue:
return default
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
try:
with patch("discord_rss_bot.main.create_html_for_feed", return_value="Rendered
"):
response: Response = client.get(
url="/blacklist_preview",
params={
"feed_url": stub_reader.feed.url,
"blacklist_title": "orld",
},
)
assert response.status_code == 200, f"/blacklist_preview failed: {response.text}"
assert "Skipped" in response.text
assert "World of Warcraft" in response.text
assert "Title" in response.text
assert "Author" in response.text
assert "Description" in response.text
assert "Content" in response.text
assert "filter-preview__field-row" in response.text
assert "filter-preview__match" in response.text
assert 'orld' in response.text
assert "Massive MMO news update" in response.text
assert "The expansion launches soon." in response.text
finally:
app.dependency_overrides = {}
def test_settings_page_shows_screenshot_layout_setting() -> None:
response: Response = client.get(url="/settings")
assert response.status_code == 200, f"/settings failed: {response.text}"
assert "Default delivery mode for new feeds" in response.text
assert "Default screenshot layout for new feeds" in response.text
assert "uv run playwright install chromium" in response.text
def test_set_global_delivery_mode() -> None:
response: Response = client.post(url="/set_global_delivery_mode", data={"delivery_mode": "text"})
assert response.status_code == 200, f"Failed to set global delivery mode: {response.text}"
response = client.get(url="/settings")
assert response.status_code == 200, f"/settings failed after setting delivery mode: {response.text}"
assert re.search(r"