from __future__ import annotations
import contextlib
import re
import urllib.parse
from dataclasses import dataclass
from dataclasses import field
from datetime import UTC
from datetime import datetime
from typing import TYPE_CHECKING
from typing import cast
from unittest.mock import MagicMock
from unittest.mock import patch
from fastapi.testclient import TestClient
import discord_rss_bot.main as main_module
from discord_rss_bot.main import app
from discord_rss_bot.main import create_html_for_feed
from discord_rss_bot.main import get_reader_dependency
if TYPE_CHECKING:
from pathlib import Path
import pytest
from httpx import Response
from reader import Entry
from reader import Reader
client: TestClient = TestClient(app)
webhook_name: str = "Hello, I am a webhook!"
webhook_url: str = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz"
feed_url: str = "https://lovinator.space/rss_test.xml"
def encoded_feed_url(url: str) -> str:
return urllib.parse.quote(feed_url) if url else ""
def ensure_preview_feed_exists() -> Reader:
reader: Reader = get_reader_dependency()
with contextlib.suppress(Exception):
reader.add_feed(feed_url)
with contextlib.suppress(Exception):
reader.update_feed(feed_url)
return reader
def assert_social_preview_metadata(
response: Response,
*,
title: str,
description: str,
) -> None:
assert response.status_code == 200, f"Expected page to render successfully: {response.text}"
assert f"
{title}" in response.text
assert re.search(
rf'',
response.text,
)
assert f'' in response.text
assert f'' in response.text
assert '' in response.text
assert '' in response.text
assert f'' in response.text
assert f'' in response.text
def test_search() -> None:
"""Test the /search page."""
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get("/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": encoded_feed_url(feed_url)})
# Delete the webhook if it already exists before we run the test.
response: Response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
assert response.status_code == 200, f"Failed to delete webhook: {response.text}"
# Add the webhook.
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Check that the feed was added.
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
# Search for an entry.
response: Response = client.get(url="/search/?query=a")
assert response.status_code == 200, f"Failed to search for entry: {response.text}"
def test_add_webhook() -> None:
"""Test the /add_webhook page."""
# Delete the webhook if it already exists before we run the test.
response: Response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
assert response.status_code == 200, f"Failed to delete webhook: {response.text}"
# Add the webhook.
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Check that the webhook was added.
response = client.get(url="/webhooks")
assert response.status_code == 200, f"Failed to get /webhooks: {response.text}"
assert webhook_name in response.text, f"Webhook not found in /webhooks: {response.text}"
def test_add_webhook_rejects_invalid_url() -> None:
"""Adding a webhook with a non-URL value should fail validation."""
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": "Invalid URL Hook", "webhook_url": "not-a-url"},
)
assert response.status_code == 400, f"Expected invalid webhook URL to be rejected: {response.text}"
assert "Invalid webhook URL" in response.text
def test_add_webhook_allows_valid_url_after_invalid_attempt() -> None:
"""A rejected invalid webhook URL should not prevent a later valid add."""
response: Response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
assert response.status_code == 200, f"Failed to delete webhook: {response.text}"
response = client.post(
url="/add_webhook",
data={"webhook_name": "Invalid URL Hook", "webhook_url": "not-a-url"},
)
assert response.status_code == 400, f"Expected invalid webhook URL to be rejected: {response.text}"
assert "Invalid webhook URL" in response.text
response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook after invalid attempt: {response.text}"
response = client.get(url="/webhooks")
assert response.status_code == 200, f"Failed to get /webhooks: {response.text}"
assert webhook_name in response.text, f"Webhook not found in /webhooks: {response.text}"
response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
assert response.status_code == 200, f"Failed to delete webhook: {response.text}"
def test_webhooks_page_handles_invalid_stored_webhook_url() -> None:
"""/webhooks should render even if a malformed webhook URL is present in storage."""
reader: Reader = get_reader_dependency()
malformed_webhook_name = "Malformed hook"
malformed_webhook_url = "definitely-not-a-url"
reader.set_tag((), "webhooks", [{"name": malformed_webhook_name, "url": malformed_webhook_url}]) # pyright: ignore[reportArgumentType]
response: Response = client.get(url="/webhooks")
assert response.status_code == 200, f"/webhooks should not crash for malformed URLs: {response.text}"
assert malformed_webhook_name in response.text
def test_create_feed() -> None:
"""Test the /create_feed page."""
# Ensure webhook exists for this test regardless of test order.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get(url="/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": encoded_feed_url(feed_url)})
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Check that the feed was added.
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
def test_get() -> None:
"""Test the /create_feed page."""
# Ensure webhook exists for this test regardless of test order.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get("/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": encoded_feed_url(feed_url)})
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Check that the feed was added.
response = client.get("/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
response: Response = client.get(url="/add")
assert response.status_code == 200, f"/add failed: {response.text}"
response: Response = client.get(url="/add_webhook")
assert response.status_code == 200, f"/add_webhook failed: {response.text}"
response: Response = client.get(url="/blacklist", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/blacklist failed: {response.text}"
response: Response = client.get(url="/custom", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/custom failed: {response.text}"
response: Response = client.get(url="/embed", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/embed failed: {response.text}"
response: Response = client.get(url="/feed", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/feed failed: {response.text}"
response: Response = client.get(url="/")
assert response.status_code == 200, f"/ failed: {response.text}"
response: Response = client.get(url="/webhooks")
assert response.status_code == 200, f"/webhooks failed: {response.text}"
response = client.get(url="/webhook_entries", params={"webhook_url": webhook_url})
assert response.status_code == 200, f"/webhook_entries failed: {response.text}"
response: Response = client.get(url="/whitelist", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/whitelist failed: {response.text}"
def test_views_render_social_preview_metadata() -> None:
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
feeds: Response = client.get("/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": encoded_feed_url(feed_url)})
response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
assert_social_preview_metadata(
client.get(url="/"),
title="Feeds Dashboard | discord-rss-bot",
description="View configured feeds, broken sources, webhook groups, and delivery status across your Discord RSS bot dashboard.", # noqa: E501
)
assert_social_preview_metadata(
client.get(url="/add"),
title="Add Feed | discord-rss-bot",
description="Add a new RSS or Atom feed and attach it to a Discord webhook for delivery.",
)
assert_social_preview_metadata(
client.get(url="/add_webhook"),
title="Add Webhook | discord-rss-bot",
description="Register a Discord webhook so feeds can post updates into your server or thread.",
)
assert_social_preview_metadata(
client.get(url="/settings"),
title="Settings | discord-rss-bot",
description="Adjust default update intervals, delivery modes, and screenshot layout for feeds managed by your bot.", # noqa: E501
)
assert_social_preview_metadata(
client.get(url="/webhooks"),
title="Webhooks | discord-rss-bot",
description="Manage stored Discord webhooks and inspect the delivery endpoints connected to your feeds.",
)
assert_social_preview_metadata(
client.get(url="/search", params={"query": "a"}),
title="Search: a | discord-rss-bot",
description="Browse search results for a across tracked feed entries and feeds.",
)
feed_response = client.get(url="/feed", params={"feed_url": encoded_feed_url(feed_url)})
assert feed_response.status_code == 200, f"/feed failed: {feed_response.text}"
assert '' in webhook_entries_response.text
assert (
f"Review webhook settings, attached feeds, and latest entries delivered to {webhook_name}."
in webhook_entries_response.text
)
def test_blacklist_page_uses_live_preview_layout() -> None:
ensure_preview_feed_exists()
response: Response = client.get(url="/blacklist", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/blacklist failed: {response.text}"
assert 'hx-get="/blacklist_preview"' in response.text
assert 'id="filter-preview"' in response.text
assert "Blacklist Rules" in response.text
def test_whitelist_page_uses_live_preview_layout() -> None:
ensure_preview_feed_exists()
response: Response = client.get(url="/whitelist", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/whitelist failed: {response.text}"
assert 'hx-get="/whitelist_preview"' in response.text
assert 'id="filter-preview"' in response.text
assert "Whitelist Rules" in response.text
def test_blacklist_page_initial_preview_uses_saved_blacklist_rules() -> None:
@dataclass(slots=True)
class DummyFeed:
url: str
title: str
@dataclass(slots=True)
class DummyEntry:
id: str
feed: DummyFeed
title: str
summary: str
author: str
link: str
published: datetime | None
content: list[object] = field(default_factory=list)
class StubReader:
def __init__(self) -> None:
self.feed = DummyFeed(url="https://example.com/preview.xml", title="Preview Feed")
self.entries: list[Entry] = [
cast(
"Entry",
DummyEntry(
id="blocked-only",
feed=self.feed,
title="Blocked update",
summary="Summary",
author="Author",
link="https://example.com/blocked-only",
published=datetime(2024, 1, 1, tzinfo=UTC),
),
),
cast(
"Entry",
DummyEntry(
id="allowed-only",
feed=self.feed,
title="Allowed note",
summary="Summary",
author="Author",
link="https://example.com/allowed-only",
published=datetime(2024, 1, 2, tzinfo=UTC),
),
),
]
def get_feed(self, _feed_url: str) -> DummyFeed:
return self.feed
def get_entries(self, **_kwargs: object) -> list[Entry]:
return self.entries
def get_tag(self, _resource: object, key: str, default: object = None) -> object:
if key == "blacklist_title":
return "blocked"
return default
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
rendered_titles: list[str] = []
def fake_create_html_for_feed(*, entries: list[Entry], **_kwargs: object) -> str:
entry_titles: list[str] = [entry.title or entry.id for entry in entries]
rendered_titles.extend(entry_titles)
return " | ".join(entry_titles)
try:
with patch("discord_rss_bot.main.create_html_for_feed", side_effect=fake_create_html_for_feed):
response: Response = client.get(url="/blacklist", params={"feed_url": stub_reader.feed.url})
assert response.status_code == 200, f"/blacklist failed: {response.text}"
assert rendered_titles == ["Allowed note"]
finally:
app.dependency_overrides = {}
def test_whitelist_page_initial_preview_uses_saved_whitelist_rules() -> None:
@dataclass(slots=True)
class DummyFeed:
url: str
title: str
@dataclass(slots=True)
class DummyEntry:
id: str
feed: DummyFeed
title: str
summary: str
author: str
link: str
published: datetime | None
content: list[object] = field(default_factory=list)
class StubReader:
def __init__(self) -> None:
self.feed = DummyFeed(url="https://example.com/preview.xml", title="Preview Feed")
self.entries: list[Entry] = [
cast(
"Entry",
DummyEntry(
id="blocked-only",
feed=self.feed,
title="Blocked update",
summary="Summary",
author="Author",
link="https://example.com/blocked-only",
published=datetime(2024, 1, 1, tzinfo=UTC),
),
),
cast(
"Entry",
DummyEntry(
id="allowed-only",
feed=self.feed,
title="Allowed note",
summary="Summary",
author="Author",
link="https://example.com/allowed-only",
published=datetime(2024, 1, 2, tzinfo=UTC),
),
),
cast(
"Entry",
DummyEntry(
id="blocked-and-allowed",
feed=self.feed,
title="Blocked allowed",
summary="Summary",
author="Author",
link="https://example.com/blocked-and-allowed",
published=datetime(2024, 1, 3, tzinfo=UTC),
),
),
]
def get_feed(self, _feed_url: str) -> DummyFeed:
return self.feed
def get_entries(self, **_kwargs: object) -> list[Entry]:
return self.entries
def get_tag(self, _resource: object, key: str, default: object = None) -> object:
if key == "whitelist_title":
return "allowed"
return default
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
rendered_titles: list[str] = []
def fake_create_html_for_feed(*, entries: list[Entry], **_kwargs: object) -> str:
entry_titles: list[str] = [entry.title or entry.id for entry in entries]
rendered_titles.extend(entry_titles)
return " | ".join(entry_titles)
try:
with patch("discord_rss_bot.main.create_html_for_feed", side_effect=fake_create_html_for_feed):
response: Response = client.get(url="/whitelist", params={"feed_url": stub_reader.feed.url})
assert response.status_code == 200, f"/whitelist failed: {response.text}"
assert rendered_titles == ["Allowed note", "Blocked allowed"]
finally:
app.dependency_overrides = {}
def test_blacklist_preview_does_not_persist_unsaved_rules() -> None:
reader: Reader = ensure_preview_feed_exists()
reader.set_tag(feed_url, "blacklist_title", "saved-blacklist") # pyright: ignore[reportArgumentType]
try:
response: Response = client.get(
url="/blacklist_preview",
params={
"feed_url": feed_url,
"blacklist_title": "fvnnnfnfdnfdnfd",
},
)
assert response.status_code == 200, f"/blacklist_preview failed: {response.text}"
assert "Live preview" in response.text
assert reader.get_tag(feed_url, "blacklist_title", "") == "saved-blacklist"
finally:
with contextlib.suppress(Exception):
reader.delete_tag(feed_url, "blacklist_title")
def test_whitelist_preview_shows_precedence_over_blacklist() -> None:
reader: Reader = ensure_preview_feed_exists()
reader.set_tag(feed_url, "blacklist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
try:
response: Response = client.get(
url="/whitelist_preview",
params={
"feed_url": feed_url,
"whitelist_title": "fvnnnfnfdnfdnfd",
},
)
assert response.status_code == 200, f"/whitelist_preview failed: {response.text}"
assert "whitelist overrides blacklist" in response.text
assert "Sent" in response.text
finally:
with contextlib.suppress(Exception):
reader.delete_tag(feed_url, "blacklist_title")
def test_whitelist_preview_rendered_entries_respect_saved_blacklist_rules() -> None:
@dataclass(slots=True)
class DummyFeed:
url: str
title: str
@dataclass(slots=True)
class DummyEntry:
id: str
feed: DummyFeed
title: str
summary: str
author: str
link: str
published: datetime | None
content: list[object] = field(default_factory=list)
class StubReader:
def __init__(self) -> None:
self.feed = DummyFeed(url="https://example.com/preview.xml", title="Preview Feed")
self.entries: list[Entry] = [
cast(
"Entry",
DummyEntry(
id="blocked-only",
feed=self.feed,
title="Blocked update",
summary="Summary",
author="Author",
link="https://example.com/blocked-only",
published=datetime(2024, 1, 1, tzinfo=UTC),
),
),
cast(
"Entry",
DummyEntry(
id="allowed-only",
feed=self.feed,
title="Allowed note",
summary="Summary",
author="Author",
link="https://example.com/allowed-only",
published=datetime(2024, 1, 2, tzinfo=UTC),
),
),
cast(
"Entry",
DummyEntry(
id="blocked-and-allowed",
feed=self.feed,
title="Blocked allowed",
summary="Summary",
author="Author",
link="https://example.com/blocked-and-allowed",
published=datetime(2024, 1, 3, tzinfo=UTC),
),
),
]
def get_feed(self, _feed_url: str) -> DummyFeed:
return self.feed
def get_entries(self, **_kwargs: object) -> list[Entry]:
return self.entries
def get_tag(self, _resource: object, key: str, default: object = None) -> object:
if key == "blacklist_title":
return "blocked"
return default
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
rendered_titles: list[str] = []
def fake_create_html_for_feed(*, entries: list[Entry], **_kwargs: object) -> str:
entry_titles: list[str] = [entry.title or entry.id for entry in entries]
rendered_titles.extend(entry_titles)
return " | ".join(entry_titles)
try:
with patch("discord_rss_bot.main.create_html_for_feed", side_effect=fake_create_html_for_feed):
response: Response = client.get(
url="/whitelist_preview",
params={"feed_url": stub_reader.feed.url, "whitelist_title": "allowed"},
)
assert response.status_code == 200, f"/whitelist_preview failed: {response.text}"
assert rendered_titles == ["Allowed note", "Blocked allowed"]
finally:
app.dependency_overrides = {}
def test_blacklist_preview_rendered_entries_respect_saved_whitelist_rules() -> None:
@dataclass(slots=True)
class DummyFeed:
url: str
title: str
@dataclass(slots=True)
class DummyEntry:
id: str
feed: DummyFeed
title: str
summary: str
author: str
link: str
published: datetime | None
content: list[object] = field(default_factory=list)
class StubReader:
def __init__(self) -> None:
self.feed = DummyFeed(url="https://example.com/preview.xml", title="Preview Feed")
self.entries: list[Entry] = [
cast(
"Entry",
DummyEntry(
id="blocked-only",
feed=self.feed,
title="Blocked update",
summary="Summary",
author="Author",
link="https://example.com/blocked-only",
published=datetime(2024, 1, 1, tzinfo=UTC),
),
),
cast(
"Entry",
DummyEntry(
id="allowed-only",
feed=self.feed,
title="Allowed note",
summary="Summary",
author="Author",
link="https://example.com/allowed-only",
published=datetime(2024, 1, 2, tzinfo=UTC),
),
),
cast(
"Entry",
DummyEntry(
id="blocked-and-allowed",
feed=self.feed,
title="Blocked allowed",
summary="Summary",
author="Author",
link="https://example.com/blocked-and-allowed",
published=datetime(2024, 1, 3, tzinfo=UTC),
),
),
]
def get_feed(self, _feed_url: str) -> DummyFeed:
return self.feed
def get_entries(self, **_kwargs: object) -> list[Entry]:
return self.entries
def get_tag(self, _resource: object, key: str, default: object = None) -> object:
if key == "whitelist_title":
return "allowed"
return default
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
rendered_titles: list[str] = []
def fake_create_html_for_feed(*, entries: list[Entry], **_kwargs: object) -> str:
entry_titles: list[str] = [entry.title or entry.id for entry in entries]
rendered_titles.extend(entry_titles)
return " | ".join(entry_titles)
try:
with patch("discord_rss_bot.main.create_html_for_feed", side_effect=fake_create_html_for_feed):
response: Response = client.get(
url="/blacklist_preview",
params={"feed_url": stub_reader.feed.url, "blacklist_title": "blocked"},
)
assert response.status_code == 200, f"/blacklist_preview failed: {response.text}"
assert rendered_titles == ["Allowed note", "Blocked allowed"]
finally:
app.dependency_overrides = {}
def test_blacklist_preview_shows_no_rendered_entries_message_when_all_entries_are_skipped() -> None:
@dataclass(slots=True)
class DummyFeed:
url: str
title: str
@dataclass(slots=True)
class DummyEntry:
id: str
feed: DummyFeed
title: str
summary: str
author: str
link: str
published: datetime | None
content: list[object] = field(default_factory=list)
class StubReader:
def __init__(self) -> None:
self.feed = DummyFeed(url="https://example.com/preview.xml", title="Preview Feed")
self.entries: list[Entry] = [
cast(
"Entry",
DummyEntry(
id="blocked-only",
feed=self.feed,
title="Blocked update",
summary="Summary",
author="Author",
link="https://example.com/blocked-only",
published=datetime(2024, 1, 1, tzinfo=UTC),
),
),
cast(
"Entry",
DummyEntry(
id="allowed-only",
feed=self.feed,
title="Allowed note",
summary="Summary",
author="Author",
link="https://example.com/allowed-only",
published=datetime(2024, 1, 2, tzinfo=UTC),
),
),
cast(
"Entry",
DummyEntry(
id="blocked-and-allowed",
feed=self.feed,
title="Blocked allowed",
summary="Summary",
author="Author",
link="https://example.com/blocked-and-allowed",
published=datetime(2024, 1, 3, tzinfo=UTC),
),
),
]
def get_feed(self, _feed_url: str) -> DummyFeed:
return self.feed
def get_entries(self, **_kwargs: object) -> list[Entry]:
return self.entries
def get_tag(self, _resource: object, _key: str, default: object = None) -> object:
return default
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
try:
with patch("discord_rss_bot.main.create_html_for_feed") as create_html_mock:
response: Response = client.get(
url="/blacklist_preview",
params={"feed_url": stub_reader.feed.url, "blacklist_title": "blocked,allowed,note"},
)
assert response.status_code == 200, f"/blacklist_preview failed: {response.text}"
create_html_mock.assert_not_called()
assert "No entries would be sent with the current rules." in response.text
finally:
app.dependency_overrides = {}
def test_blacklist_preview_uses_50_entry_limit() -> None:
@dataclass(slots=True)
class DummyContent:
value: str
@dataclass(slots=True)
class DummyFeed:
url: str
title: str
@dataclass(slots=True)
class DummyEntry:
id: str
feed: DummyFeed
title: str
summary: str
author: str
link: str
published: datetime | None
content: list[DummyContent] = field(default_factory=lambda: [DummyContent("content")])
class StubReader:
def __init__(self) -> None:
self.feed = DummyFeed(url="https://example.com/filter-preview.xml", title="Preview Feed")
self.recorded_limit: int | None = None
self.entries: list[Entry] = [
cast(
"Entry",
DummyEntry(
id=f"entry-{index}",
feed=self.feed,
title=f"Entry {index}",
summary=f"Summary {index}",
author="Author",
link=f"https://example.com/entry-{index}",
published=datetime(2024, 1, 1, tzinfo=UTC),
),
)
for index in range(60)
]
def get_feed(self, _feed_url: str) -> DummyFeed:
return self.feed
def get_entries(self, **kwargs: object) -> list[Entry]:
limit = kwargs.get("limit")
self.recorded_limit = limit if isinstance(limit, int) else None
if isinstance(limit, int):
return self.entries[:limit]
return self.entries
def get_tag(self, _resource: object, _key: str, default: object = None) -> object:
return default
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
try:
with patch("discord_rss_bot.main.create_html_for_feed", return_value="Rendered
"):
response: Response = client.get(
url="/blacklist_preview",
params={"feed_url": stub_reader.feed.url},
)
assert response.status_code == 200, f"/blacklist_preview failed: {response.text}"
assert stub_reader.recorded_limit == 50, (
f"Expected preview to request 50 entries, got {stub_reader.recorded_limit}"
)
assert "50 checked" in response.text
finally:
app.dependency_overrides = {}
def test_blacklist_preview_shows_labeled_field_values_for_substring_match() -> None:
@dataclass(slots=True)
class DummyContent:
value: str
@dataclass(slots=True)
class DummyFeed:
url: str
title: str
@dataclass(slots=True)
class DummyEntry:
id: str
feed: DummyFeed
title: str
summary: str
author: str
link: str
published: datetime | None
content: list[DummyContent] = field(default_factory=list)
class StubReader:
def __init__(self) -> None:
self.feed = DummyFeed(url="https://example.com/wow.xml", title="Warcraft Feed")
self.entries: list[Entry] = [
cast(
"Entry",
DummyEntry(
id="wow-1",
feed=self.feed,
title="World of Warcraft",
summary="Massive MMO news update
",
author="Blizzard",
link="https://example.com/wow-1",
published=datetime(2024, 1, 1, tzinfo=UTC),
content=[DummyContent("The expansion launches soon.
")],
),
),
]
def get_feed(self, _feed_url: str) -> DummyFeed:
return self.feed
def get_entries(self, **_kwargs: object) -> list[Entry]:
return self.entries
def get_tag(self, _resource: object, _key: str, default: object = None) -> object:
return default
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
try:
with patch("discord_rss_bot.main.create_html_for_feed", return_value="Rendered
"):
response: Response = client.get(
url="/blacklist_preview",
params={
"feed_url": stub_reader.feed.url,
"blacklist_title": "orld",
},
)
assert response.status_code == 200, f"/blacklist_preview failed: {response.text}"
assert "Skipped" in response.text
assert "World of Warcraft" in response.text
assert "Title" in response.text
assert "Author" in response.text
assert "Description" in response.text
assert "Content" in response.text
assert "filter-preview__field-row" in response.text
assert "filter-preview__match" in response.text
assert 'orld' in response.text
assert "Massive MMO news update" in response.text
assert "The expansion launches soon." in response.text
finally:
app.dependency_overrides = {}
def test_settings_page_shows_screenshot_layout_setting() -> None:
response: Response = client.get(url="/settings")
assert response.status_code == 200, f"/settings failed: {response.text}"
assert "Default delivery mode for new feeds" in response.text
assert "Default screenshot layout for new feeds" in response.text
assert "uv run playwright install chromium" in response.text
def test_set_global_delivery_mode() -> None:
response: Response = client.post(url="/set_global_delivery_mode", data={"delivery_mode": "text"})
assert response.status_code == 200, f"Failed to set global delivery mode: {response.text}"
response = client.get(url="/settings")
assert response.status_code == 200, f"/settings failed after setting delivery mode: {response.text}"
assert re.search(r"