discord-rss-bot/tests/test_main.py

2124 lines
90 KiB
Python

from __future__ import annotations
import contextlib
import re
import urllib.parse
from dataclasses import dataclass
from dataclasses import field
from datetime import UTC
from datetime import datetime
from typing import TYPE_CHECKING
from typing import cast
from unittest.mock import MagicMock
from unittest.mock import patch
from fastapi.testclient import TestClient
import discord_rss_bot.main as main_module
from discord_rss_bot.main import app
from discord_rss_bot.main import create_html_for_feed
from discord_rss_bot.main import get_reader_dependency
if TYPE_CHECKING:
from pathlib import Path
import pytest
from httpx import Response
from reader import Entry
from reader import Reader
client: TestClient = TestClient(app)
webhook_name: str = "Hello, I am a webhook!"
webhook_url: str = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz"
feed_url: str = "https://lovinator.space/rss_test.xml"
def encoded_feed_url(url: str) -> str:
return urllib.parse.quote(feed_url) if url else ""
def test_search() -> None:
"""Test the /search page."""
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get("/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": encoded_feed_url(feed_url)})
# Delete the webhook if it already exists before we run the test.
response: Response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
assert response.status_code == 200, f"Failed to delete webhook: {response.text}"
# Add the webhook.
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Check that the feed was added.
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
# Search for an entry.
response: Response = client.get(url="/search/?query=a")
assert response.status_code == 200, f"Failed to search for entry: {response.text}"
def test_add_webhook() -> None:
"""Test the /add_webhook page."""
# Delete the webhook if it already exists before we run the test.
response: Response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
assert response.status_code == 200, f"Failed to delete webhook: {response.text}"
# Add the webhook.
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Check that the webhook was added.
response = client.get(url="/webhooks")
assert response.status_code == 200, f"Failed to get /webhooks: {response.text}"
assert webhook_name in response.text, f"Webhook not found in /webhooks: {response.text}"
def test_add_webhook_rejects_invalid_url() -> None:
"""Adding a webhook with a non-URL value should fail validation."""
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": "Invalid URL Hook", "webhook_url": "not-a-url"},
)
assert response.status_code == 400, f"Expected invalid webhook URL to be rejected: {response.text}"
assert "Invalid webhook URL" in response.text
def test_add_webhook_allows_valid_url_after_invalid_attempt() -> None:
"""A rejected invalid webhook URL should not prevent a later valid add."""
response: Response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
assert response.status_code == 200, f"Failed to delete webhook: {response.text}"
response = client.post(
url="/add_webhook",
data={"webhook_name": "Invalid URL Hook", "webhook_url": "not-a-url"},
)
assert response.status_code == 400, f"Expected invalid webhook URL to be rejected: {response.text}"
assert "Invalid webhook URL" in response.text
response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook after invalid attempt: {response.text}"
response = client.get(url="/webhooks")
assert response.status_code == 200, f"Failed to get /webhooks: {response.text}"
assert webhook_name in response.text, f"Webhook not found in /webhooks: {response.text}"
response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
assert response.status_code == 200, f"Failed to delete webhook: {response.text}"
def test_webhooks_page_handles_invalid_stored_webhook_url() -> None:
"""/webhooks should render even if a malformed webhook URL is present in storage."""
reader: Reader = get_reader_dependency()
malformed_webhook_name = "Malformed hook"
malformed_webhook_url = "definitely-not-a-url"
reader.set_tag((), "webhooks", [{"name": malformed_webhook_name, "url": malformed_webhook_url}]) # pyright: ignore[reportArgumentType]
response: Response = client.get(url="/webhooks")
assert response.status_code == 200, f"/webhooks should not crash for malformed URLs: {response.text}"
assert malformed_webhook_name in response.text
def test_create_feed() -> None:
"""Test the /create_feed page."""
# Ensure webhook exists for this test regardless of test order.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get(url="/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": encoded_feed_url(feed_url)})
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Check that the feed was added.
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
def test_get() -> None:
"""Test the /create_feed page."""
# Ensure webhook exists for this test regardless of test order.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get("/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": encoded_feed_url(feed_url)})
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Check that the feed was added.
response = client.get("/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
response: Response = client.get(url="/add")
assert response.status_code == 200, f"/add failed: {response.text}"
response: Response = client.get(url="/add_webhook")
assert response.status_code == 200, f"/add_webhook failed: {response.text}"
response: Response = client.get(url="/blacklist", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/blacklist failed: {response.text}"
response: Response = client.get(url="/custom", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/custom failed: {response.text}"
response: Response = client.get(url="/embed", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/embed failed: {response.text}"
response: Response = client.get(url="/feed", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/feed failed: {response.text}"
response: Response = client.get(url="/")
assert response.status_code == 200, f"/ failed: {response.text}"
response: Response = client.get(url="/webhooks")
assert response.status_code == 200, f"/webhooks failed: {response.text}"
response = client.get(url="/webhook_entries", params={"webhook_url": webhook_url})
assert response.status_code == 200, f"/webhook_entries failed: {response.text}"
response: Response = client.get(url="/whitelist", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/whitelist failed: {response.text}"
def test_post_blacklist_apply_to_domain_updates_global_domain_blacklist() -> None:
"""Posting blacklist with apply_to_domain should save domain-wide blacklist values."""
reader: Reader = get_reader_dependency()
# Ensure webhook exists and feed can be created.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
response = client.post(
url="/blacklist",
data={
"feed_url": feed_url,
"blacklist_author": "TheLovinator",
"apply_to_domain": "true",
},
)
assert response.status_code == 200, f"Failed to post blacklist: {response.text}"
domain_blacklist = reader.get_tag((), "domain_blacklist", {})
assert isinstance(domain_blacklist, dict), "domain_blacklist should be a dict"
assert "lovinator.space" in domain_blacklist, "Expected domain key in domain_blacklist"
assert domain_blacklist["lovinator.space"]["blacklist_author"] == "TheLovinator"
def test_post_whitelist_apply_to_domain_updates_global_domain_whitelist() -> None:
"""Posting whitelist with apply_to_domain should save domain-wide whitelist values."""
reader: Reader = get_reader_dependency()
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
response = client.post(
url="/whitelist",
data={
"feed_url": feed_url,
"whitelist_author": "TheLovinator",
"apply_to_domain": "true",
},
)
assert response.status_code == 200, f"Failed to post whitelist: {response.text}"
domain_whitelist = reader.get_tag((), "domain_whitelist", {})
assert isinstance(domain_whitelist, dict), "domain_whitelist should be a dict"
assert "lovinator.space" in domain_whitelist, "Expected domain key in domain_whitelist"
assert domain_whitelist["lovinator.space"]["whitelist_author"] == "TheLovinator"
def test_domain_filter_pages_show_domain_enabled_notice() -> None:
"""Blacklist and whitelist pages should show domain-wide enabled notices when configured."""
reader: Reader = get_reader_dependency()
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
reader.set_tag(
(),
"domain_blacklist",
{"lovinator.space": {"blacklist_title": "spoiler"}},
) # pyright: ignore[reportArgumentType]
reader.set_tag(
(),
"domain_whitelist",
{"lovinator.space": {"whitelist_title": "release"}},
) # pyright: ignore[reportArgumentType]
response = client.get(url="/blacklist", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/blacklist failed: {response.text}"
assert "Domain-wide blacklist is enabled for lovinator.space." in response.text
response = client.get(url="/whitelist", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/whitelist failed: {response.text}"
assert "Domain-wide whitelist is enabled for lovinator.space." in response.text
def test_domain_blacklist_isolation_between_domains() -> None:
"""Applying domain blacklist should not overwrite other domains."""
reader: Reader = get_reader_dependency()
reader.set_tag((), "domain_blacklist", {"example.com": {"blacklist_title": "existing"}}) # pyright: ignore[reportArgumentType]
response = client.post(
url="/blacklist",
data={
"feed_url": feed_url,
"blacklist_author": "TheLovinator",
"apply_to_domain": "true",
},
)
assert response.status_code == 200, f"Failed to post blacklist: {response.text}"
domain_blacklist = reader.get_tag((), "domain_blacklist", {})
assert isinstance(domain_blacklist, dict)
assert domain_blacklist["example.com"]["blacklist_title"] == "existing"
assert domain_blacklist["lovinator.space"]["blacklist_author"] == "TheLovinator"
def test_domain_whitelist_isolation_between_domains() -> None:
"""Applying domain whitelist should not overwrite other domains."""
reader: Reader = get_reader_dependency()
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
reader.set_tag((), "domain_whitelist", {"example.com": {"whitelist_title": "existing"}}) # pyright: ignore[reportArgumentType]
response = client.post(
url="/whitelist",
data={
"feed_url": feed_url,
"whitelist_author": "TheLovinator",
"apply_to_domain": "true",
},
)
assert response.status_code == 200, f"Failed to post whitelist: {response.text}"
domain_whitelist = reader.get_tag((), "domain_whitelist", {})
assert isinstance(domain_whitelist, dict)
assert domain_whitelist["example.com"]["whitelist_title"] == "existing"
assert domain_whitelist["lovinator.space"]["whitelist_author"] == "TheLovinator"
def test_domain_blacklist_removed_when_apply_to_domain_and_empty_values() -> None:
"""Submitting empty domain blacklist values should remove existing domain entry."""
reader: Reader = get_reader_dependency()
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
reader.set_tag((), "domain_blacklist", {"lovinator.space": {"blacklist_title": "existing"}}) # pyright: ignore[reportArgumentType]
response = client.post(
url="/blacklist",
data={"feed_url": feed_url, "apply_to_domain": "true"},
)
assert response.status_code == 200, f"Failed to post blacklist: {response.text}"
domain_blacklist = reader.get_tag((), "domain_blacklist", {})
assert isinstance(domain_blacklist, dict)
assert "lovinator.space" not in domain_blacklist
def test_domain_whitelist_removed_when_apply_to_domain_and_empty_values() -> None:
"""Submitting empty domain whitelist values should remove existing domain entry."""
reader: Reader = get_reader_dependency()
reader.set_tag((), "domain_whitelist", {"lovinator.space": {"whitelist_title": "existing"}}) # pyright: ignore[reportArgumentType]
response = client.post(
url="/whitelist",
data={"feed_url": feed_url, "apply_to_domain": "true"},
)
assert response.status_code == 200, f"Failed to post whitelist: {response.text}"
domain_whitelist = reader.get_tag((), "domain_whitelist", {})
assert isinstance(domain_whitelist, dict)
assert "lovinator.space" not in domain_whitelist
def test_apply_to_domain_missing_does_not_update_domain_tags() -> None:
"""When apply_to_domain is omitted, domain tags should not change."""
reader: Reader = get_reader_dependency()
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
reader.set_tag((), "domain_blacklist", {}) # pyright: ignore[reportArgumentType]
reader.set_tag((), "domain_whitelist", {}) # pyright: ignore[reportArgumentType]
response = client.post(
url="/blacklist",
data={"feed_url": feed_url, "blacklist_author": "TheLovinator"},
)
assert response.status_code == 200, f"Failed to post blacklist: {response.text}"
response = client.post(
url="/whitelist",
data={"feed_url": feed_url, "whitelist_author": "TheLovinator"},
)
assert response.status_code == 200, f"Failed to post whitelist: {response.text}"
assert reader.get_tag((), "domain_blacklist", {}) == {}
assert reader.get_tag((), "domain_whitelist", {}) == {}
def test_apply_to_domain_invalid_value_rejected() -> None:
"""Invalid boolean value for apply_to_domain should return validation error."""
response = client.post(
url="/blacklist",
data={
"feed_url": feed_url,
"blacklist_author": "TheLovinator",
"apply_to_domain": "invalid-bool",
},
)
assert response.status_code == 422, f"Expected 422 for invalid boolean: {response.text}"
def test_index_shows_domain_filter_shortcuts() -> None:
"""Index should show domain whitelist/blacklist shortcut buttons."""
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert "Domain whitelist" in response.text
assert "Domain blacklist" in response.text
assert f"/whitelist?feed_url={encoded_feed_url(feed_url)}" in response.text
assert f"/blacklist?feed_url={encoded_feed_url(feed_url)}" in response.text
def test_settings_page_shows_screenshot_layout_setting() -> None:
response: Response = client.get(url="/settings")
assert response.status_code == 200, f"/settings failed: {response.text}"
assert "Default delivery mode for new feeds" in response.text
assert "Default screenshot layout for new feeds" in response.text
assert "uv run playwright install chromium" in response.text
def test_set_global_delivery_mode() -> None:
response: Response = client.post(url="/set_global_delivery_mode", data={"delivery_mode": "text"})
assert response.status_code == 200, f"Failed to set global delivery mode: {response.text}"
response = client.get(url="/settings")
assert response.status_code == 200, f"/settings failed after setting delivery mode: {response.text}"
assert re.search(r"<option\s+value=\"text\"[^>]*\bselected\b", response.text)
def test_add_page_shows_global_default_delivery_mode_hint() -> None:
response: Response = client.post(url="/set_global_delivery_mode", data={"delivery_mode": "text"})
assert response.status_code == 200, f"Failed to set global delivery mode: {response.text}"
response = client.get(url="/add")
assert response.status_code == 200, f"/add failed: {response.text}"
assert "New feeds currently default to" in response.text
assert "text" in response.text
def test_navbar_add_feed_visible_only_when_webhooks_exist() -> None:
reader: Reader = get_reader_dependency()
reader.set_tag((), "webhooks", []) # pyright: ignore[reportArgumentType]
response: Response = client.get(url="/")
assert response.status_code == 200, f"/ failed: {response.text}"
assert '<a class="nav-link" href="/add">Add feed</a>' not in response.text
response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
response = client.get(url="/")
assert response.status_code == 200, f"/ failed: {response.text}"
assert '<a class="nav-link" href="/add">Add feed</a>' in response.text
cleanup_response: Response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
assert cleanup_response.status_code == 200, f"Failed to clean up webhook: {cleanup_response.text}"
def test_c3kay_feed_delivery_mode_toggle_routes_update_stored_tags() -> None:
reader = get_reader_dependency()
c3kay_feed_url = "https://feeds.c3kay.de/hoyolab-ui-toggle-test.xml"
with contextlib.suppress(Exception):
reader.add_feed(c3kay_feed_url)
response: Response = client.post(url="/use_text", data={"feed_url": c3kay_feed_url})
assert response.status_code == 200, f"Failed to set text mode: {response.text}"
assert reader.get_tag(c3kay_feed_url, "delivery_mode") == "text"
assert reader.get_tag(c3kay_feed_url, "should_send_embed") is False
response = client.post(url="/use_screenshot_mobile", data={"feed_url": c3kay_feed_url})
assert response.status_code == 200, f"Failed to set screenshot mobile mode: {response.text}"
assert reader.get_tag(c3kay_feed_url, "delivery_mode") == "screenshot"
assert reader.get_tag(c3kay_feed_url, "screenshot_layout") == "mobile"
assert reader.get_tag(c3kay_feed_url, "should_send_embed") is False
response = client.post(url="/use_embed", data={"feed_url": c3kay_feed_url})
assert response.status_code == 200, f"Failed to set embed mode: {response.text}"
assert reader.get_tag(c3kay_feed_url, "delivery_mode") == "embed"
assert reader.get_tag(c3kay_feed_url, "should_send_embed") is True
def test_set_global_screenshot_layout() -> None:
response: Response = client.post(url="/set_global_screenshot_layout", data={"screenshot_layout": "mobile"})
assert response.status_code == 200, f"Failed to set global screenshot layout: {response.text}"
response = client.get(url="/settings")
assert response.status_code == 200, f"/settings failed after setting layout: {response.text}"
assert re.search(r"<option\s+value=\"mobile\"[^>]*\bselected\b", response.text)
def test_pause_feed() -> None:
"""Test the /pause_feed page."""
# Ensure webhook exists for this test regardless of test order.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get(url="/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": encoded_feed_url(feed_url)})
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Unpause the feed if it is paused.
feeds: Response = client.get(url="/")
if "Paused" in feeds.text:
response: Response = client.post(url="/unpause", data={"feed_url": feed_url})
assert response.status_code == 200, f"Failed to unpause feed: {response.text}"
# Pause the feed.
response: Response = client.post(url="/pause", data={"feed_url": feed_url})
assert response.status_code == 200, f"Failed to pause feed: {response.text}"
# Check that the feed was paused.
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
def test_unpause_feed() -> None:
"""Test the /unpause_feed page."""
# Ensure webhook exists for this test regardless of test order.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get("/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": encoded_feed_url(feed_url)})
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Pause the feed if it is unpaused.
feeds: Response = client.get(url="/")
if "Paused" not in feeds.text:
response: Response = client.post(url="/pause", data={"feed_url": feed_url})
assert response.status_code == 200, f"Failed to pause feed: {response.text}"
# Unpause the feed.
response: Response = client.post(url="/unpause", data={"feed_url": feed_url})
assert response.status_code == 200, f"Failed to unpause feed: {response.text}"
# Check that the feed was unpaused.
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
def test_remove_feed() -> None:
"""Test the /remove page."""
# Ensure webhook exists for this test regardless of test order.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get(url="/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": encoded_feed_url(feed_url)})
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Remove the feed.
response: Response = client.post(url="/remove", data={"feed_url": feed_url})
assert response.status_code == 200, f"Failed to remove feed: {response.text}"
# Check that the feed was removed.
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert feed_url not in response.text, f"Feed found in /: {response.text}"
def test_change_feed_url() -> None:
"""Test changing a feed URL from the feed page endpoint."""
new_feed_url = "https://lovinator.space/rss_test_small.xml"
# Ensure test feeds do not already exist.
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": new_feed_url})
# Ensure webhook exists.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Add the original feed.
response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Change feed URL.
response = client.post(
url="/change_feed_url",
data={"old_feed_url": feed_url, "new_feed_url": new_feed_url},
)
assert response.status_code == 200, f"Failed to change feed URL: {response.text}"
# New feed should be accessible.
response = client.get(url="/feed", params={"feed_url": new_feed_url})
assert response.status_code == 200, f"New feed URL is not accessible: {response.text}"
# Old feed should no longer be accessible.
response = client.get(url="/feed", params={"feed_url": feed_url})
assert response.status_code == 404, "Old feed URL should no longer exist"
# Cleanup.
client.post(url="/remove", data={"feed_url": new_feed_url})
def test_change_feed_url_marks_entries_as_read() -> None:
"""After changing a feed URL all entries on the new feed should be marked read to prevent resending."""
new_feed_url = "https://lovinator.space/rss_test_small.xml"
# Ensure feeds do not already exist.
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": new_feed_url})
# Ensure webhook exists.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
client.post(url="/add_webhook", data={"webhook_name": webhook_name, "webhook_url": webhook_url})
# Add the original feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Patch reader on the main module so we can observe calls.
mock_entry_a = MagicMock()
mock_entry_a.id = "entry-a"
mock_entry_b = MagicMock()
mock_entry_b.id = "entry-b"
real_reader = main_module.get_reader_dependency()
# Use a no-redirect client so the POST response is inspected directly; the
# redirect target (/feed?feed_url=…) would 404 because change_feed_url is mocked.
no_redirect_client = TestClient(app, follow_redirects=False)
with (
patch.object(real_reader, "get_entries", return_value=[mock_entry_a, mock_entry_b]) as mock_get_entries,
patch.object(real_reader, "set_entry_read") as mock_set_read,
patch.object(real_reader, "update_feed") as mock_update_feed,
patch.object(real_reader, "change_feed_url"),
):
response = no_redirect_client.post(
url="/change_feed_url",
data={"old_feed_url": feed_url, "new_feed_url": new_feed_url},
)
assert response.status_code == 303, f"Expected 303 redirect, got {response.status_code}: {response.text}"
# update_feed should have been called with the new URL.
mock_update_feed.assert_called_once_with(new_feed_url)
# get_entries should have been called to fetch unread entries on the new URL.
mock_get_entries.assert_called_once_with(feed=new_feed_url, read=False)
# Every returned entry should have been marked as read.
assert mock_set_read.call_count == 2, f"Expected 2 set_entry_read calls, got {mock_set_read.call_count}"
mock_set_read.assert_any_call(mock_entry_a, True)
mock_set_read.assert_any_call(mock_entry_b, True)
# Cleanup.
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": new_feed_url})
def test_change_feed_url_empty_old_url_returns_400() -> None:
"""Submitting an empty old_feed_url should return HTTP 400."""
response: Response = client.post(
url="/change_feed_url",
data={"old_feed_url": " ", "new_feed_url": "https://example.com/feed.xml"},
)
assert response.status_code == 400, f"Expected 400 for empty old URL, got {response.status_code}"
def test_change_feed_url_empty_new_url_returns_400() -> None:
"""Submitting a blank new_feed_url should return HTTP 400."""
response: Response = client.post(
url="/change_feed_url",
data={"old_feed_url": feed_url, "new_feed_url": " "},
)
assert response.status_code == 400, f"Expected 400 for blank new URL, got {response.status_code}"
def test_change_feed_url_nonexistent_old_url_returns_404() -> None:
"""Trying to rename a feed that does not exist should return HTTP 404."""
non_existent = "https://does-not-exist.example.com/rss.xml"
# Make sure it really is absent.
client.post(url="/remove", data={"feed_url": non_existent})
response: Response = client.post(
url="/change_feed_url",
data={"old_feed_url": non_existent, "new_feed_url": "https://example.com/new.xml"},
)
assert response.status_code == 404, f"Expected 404 for non-existent feed, got {response.status_code}"
def test_change_feed_url_new_url_already_exists_returns_409() -> None:
"""Changing to a URL that is already tracked should return HTTP 409."""
second_feed_url = "https://lovinator.space/rss_test_small.xml"
# Ensure both feeds are absent.
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": second_feed_url})
# Ensure webhook exists.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
client.post(url="/add_webhook", data={"webhook_name": webhook_name, "webhook_url": webhook_url})
# Add both feeds.
client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
client.post(url="/add", data={"feed_url": second_feed_url, "webhook_dropdown": webhook_name})
# Try to rename one to the other.
response: Response = client.post(
url="/change_feed_url",
data={"old_feed_url": feed_url, "new_feed_url": second_feed_url},
)
assert response.status_code == 409, f"Expected 409 when new URL already exists, got {response.status_code}"
# Cleanup.
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": second_feed_url})
def test_change_feed_url_same_url_redirects_without_error() -> None:
"""Changing a feed's URL to itself should redirect cleanly without any error."""
# Ensure webhook exists.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
client.post(url="/add_webhook", data={"webhook_name": webhook_name, "webhook_url": webhook_url})
# Add the feed.
client.post(url="/remove", data={"feed_url": feed_url})
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Submit the same URL as both old and new.
response = client.post(
url="/change_feed_url",
data={"old_feed_url": feed_url, "new_feed_url": feed_url},
)
assert response.status_code == 200, f"Expected 200 redirect for same URL, got {response.status_code}"
# Feed should still be accessible.
response = client.get(url="/feed", params={"feed_url": feed_url})
assert response.status_code == 200, f"Feed should still exist after no-op URL change: {response.text}"
# Cleanup.
client.post(url="/remove", data={"feed_url": feed_url})
def test_delete_webhook() -> None:
"""Test the /delete_webhook page."""
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get(url="/webhooks")
if webhook_url in feeds.text:
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
# Add the webhook.
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Delete the webhook.
response2: Response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
assert response2.status_code == 200, f"Failed to delete webhook: {response2.text}"
# Check that the webhook was added.
response3 = client.get(url="/webhooks")
assert response3.status_code == 200, f"Failed to get /webhooks: {response3.text}"
assert webhook_name not in response3.text, f"Webhook found in /webhooks: {response3.text}"
def test_attach_feed_webhook_from_index() -> None:
"""Feeds without attached webhook should be attachable from the index page."""
original_webhook_name = "original-webhook"
original_webhook_url = "https://discord.com/api/webhooks/111/original"
replacement_webhook_name = "replacement-webhook"
replacement_webhook_url = "https://discord.com/api/webhooks/222/replacement"
# Start clean.
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url})
client.post(url="/delete_webhook", data={"webhook_url": replacement_webhook_url})
# Add a webhook and a feed attached to it.
response = client.post(
url="/add_webhook",
data={"webhook_name": original_webhook_name, "webhook_url": original_webhook_url},
)
assert response.status_code == 200, f"Failed to add original webhook: {response.text}"
response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": original_webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Remove the original webhook so feed becomes "without attached webhook".
response = client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url})
assert response.status_code == 200, f"Failed to delete original webhook: {response.text}"
# Add a replacement webhook we can attach to.
response = client.post(
url="/add_webhook",
data={"webhook_name": replacement_webhook_name, "webhook_url": replacement_webhook_url},
)
assert response.status_code == 200, f"Failed to add replacement webhook: {response.text}"
# The feed should now be listed in "Feeds without attached webhook" section.
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert "Feeds without attached webhook:" in response.text
assert "/attach_feed_webhook" in response.text
# Attach the feed to the new webhook.
response = client.post(
url="/attach_feed_webhook",
data={"feed_url": feed_url, "webhook_dropdown": replacement_webhook_name, "redirect_to": "/"},
)
assert response.status_code == 200, f"Failed to attach feed to webhook: {response.text}"
reader = get_reader_dependency()
assert reader.get_tag(feed_url, "webhook", "") == replacement_webhook_url
# Cleanup.
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/delete_webhook", data={"webhook_url": replacement_webhook_url})
def test_attach_feed_webhook_from_feed_page() -> None:
"""Feed detail page should allow attaching/replacing webhook directly."""
original_webhook_name = "feed-page-original-webhook"
original_webhook_url = "https://discord.com/api/webhooks/333/original"
replacement_webhook_name = "feed-page-replacement-webhook"
replacement_webhook_url = "https://discord.com/api/webhooks/444/replacement"
# Start clean.
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url})
client.post(url="/delete_webhook", data={"webhook_url": replacement_webhook_url})
# Create two webhooks and attach feed to original.
response = client.post(
url="/add_webhook",
data={"webhook_name": original_webhook_name, "webhook_url": original_webhook_url},
)
assert response.status_code == 200, f"Failed to add original webhook: {response.text}"
response = client.post(
url="/add_webhook",
data={"webhook_name": replacement_webhook_name, "webhook_url": replacement_webhook_url},
)
assert response.status_code == 200, f"Failed to add replacement webhook: {response.text}"
response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": original_webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Feed page should show the webhook form and current webhook label.
response = client.get(url="/feed", params={"feed_url": feed_url})
assert response.status_code == 200, f"Failed to get /feed: {response.text}"
assert "Current webhook:" in response.text
assert "/attach_feed_webhook" in response.text
# Reattach to replacement webhook via endpoint used by feed page form.
response = client.post(
url="/attach_feed_webhook",
data={
"feed_url": feed_url,
"webhook_dropdown": replacement_webhook_name,
"redirect_to": f"/feed?feed_url={urllib.parse.quote(feed_url)}",
},
)
assert response.status_code == 200, f"Failed to reattach feed webhook: {response.text}"
reader = get_reader_dependency()
assert reader.get_tag(feed_url, "webhook", "") == replacement_webhook_url
# Cleanup.
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url})
client.post(url="/delete_webhook", data={"webhook_url": replacement_webhook_url})
def test_update_feed_not_found() -> None:
"""Test updating a non-existent feed."""
# Generate a feed URL that does not exist
nonexistent_feed_url = "https://nonexistent-feed.example.com/rss.xml"
# Try to update the non-existent feed
response: Response = client.get(url="/update", params={"feed_url": urllib.parse.quote(nonexistent_feed_url)})
# Check that it returns a 404 status code
assert response.status_code == 404, f"Expected 404 for non-existent feed, got: {response.status_code}"
assert "Feed not found" in response.text
def test_post_entry_send_to_discord() -> None:
"""Test that /post_entry sends an entry to Discord and redirects to the feed page.
Regression test for the bug where the injected reader was not passed to
send_entry_to_discord, meaning the dependency-injected reader was silently ignored.
"""
# Ensure webhook and feed exist.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Retrieve an entry from the feed to get a valid entry ID.
reader: main_module.Reader = main_module.get_reader_dependency()
entries: list[Entry] = list(reader.get_entries(feed=feed_url, limit=1))
assert entries, "Feed should have at least one entry to send"
entry_to_send: main_module.Entry = entries[0]
encoded_id: str = urllib.parse.quote(entry_to_send.id)
no_redirect_client = TestClient(app, follow_redirects=False)
# Patch execute_webhook so no real HTTP requests are made to Discord.
with patch("discord_rss_bot.feeds.execute_webhook") as mock_execute:
response = no_redirect_client.get(
url="/post_entry",
params={"entry_id": encoded_id, "feed_url": urllib.parse.quote(feed_url)},
)
assert response.status_code == 303, f"Expected redirect after sending, got {response.status_code}: {response.text}"
location: str = response.headers.get("location", "")
assert "feed?feed_url=" in location, f"Should redirect to feed page, got: {location}"
assert mock_execute.called, "execute_webhook should have been called to deliver the entry to Discord"
# Cleanup.
client.post(url="/remove", data={"feed_url": feed_url})
def test_post_entry_unknown_id_returns_404() -> None:
"""Test that /post_entry returns 404 when the entry ID does not exist."""
response: Response = client.get(
url="/post_entry",
params={"entry_id": "https://nonexistent.example.com/entry-that-does-not-exist"},
)
assert response.status_code == 404, f"Expected 404 for unknown entry, got {response.status_code}"
def test_post_entry_uses_feed_url_to_disambiguate_duplicate_ids() -> None:
"""When IDs collide across feeds, /post_entry should pick the entry from provided feed_url."""
@dataclass(slots=True)
class DummyFeed:
url: str
@dataclass(slots=True)
class DummyEntry:
id: str
feed: DummyFeed
feed_url: str
feed_a = "https://example.com/feed-a.xml"
feed_b = "https://example.com/feed-b.xml"
shared_id = "https://example.com/shared-entry-id"
entry_a: Entry = cast("Entry", DummyEntry(id=shared_id, feed=DummyFeed(feed_a), feed_url=feed_a))
entry_b: Entry = cast("Entry", DummyEntry(id=shared_id, feed=DummyFeed(feed_b), feed_url=feed_b))
class StubReader:
def get_entries(self, feed: str | None = None) -> list[Entry]:
if feed == feed_a:
return [entry_a]
if feed == feed_b:
return [entry_b]
return [entry_a, entry_b]
selected_feed_urls: list[str] = []
def fake_send_entry_to_discord(entry: Entry, reader: object) -> None:
selected_feed_urls.append(entry.feed.url)
app.dependency_overrides[get_reader_dependency] = StubReader
no_redirect_client = TestClient(app, follow_redirects=False)
try:
with patch("discord_rss_bot.main.send_entry_to_discord", side_effect=fake_send_entry_to_discord):
response: Response = no_redirect_client.get(
url="/post_entry",
params={"entry_id": urllib.parse.quote(shared_id), "feed_url": urllib.parse.quote(feed_b)},
)
assert response.status_code == 303, f"Expected redirect after sending, got {response.status_code}"
assert selected_feed_urls == [feed_b], f"Expected feed-b entry, got: {selected_feed_urls}"
location = response.headers.get("location", "")
assert urllib.parse.quote(feed_b) in location, f"Expected redirect to feed-b page, got: {location}"
finally:
app.dependency_overrides = {}
def test_navbar_backup_link_hidden_when_not_configured(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that the backup link is not shown in the navbar when GIT_BACKUP_PATH is not set."""
# Ensure GIT_BACKUP_PATH is not set
monkeypatch.delenv("GIT_BACKUP_PATH", raising=False)
# Get the index page
response: Response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
# Check that the backup button is not in the response
assert "Backup" not in response.text or 'action="/backup"' not in response.text, (
"Backup button should not be visible when GIT_BACKUP_PATH is not configured"
)
def test_navbar_backup_link_visible_when_configured(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""Test that the backup link is shown in the navbar when GIT_BACKUP_PATH is set."""
# Set GIT_BACKUP_PATH
monkeypatch.setenv("GIT_BACKUP_PATH", str(tmp_path))
# Get the index page
response: Response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
# Check that the backup button is in the response
assert "Backup" in response.text, "Backup button text should be visible when GIT_BACKUP_PATH is configured"
assert 'action="/backup"' in response.text, "Backup form should be visible when GIT_BACKUP_PATH is configured"
def test_backup_endpoint_returns_error_when_not_configured(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that the backup endpoint returns an error when GIT_BACKUP_PATH is not set."""
# Ensure GIT_BACKUP_PATH is not set
monkeypatch.delenv("GIT_BACKUP_PATH", raising=False)
# Try to trigger a backup
response: Response = client.post(url="/backup")
# Should redirect to index with error message
assert response.status_code == 200, f"Failed to post /backup: {response.text}"
assert "Git backup is not configured" in response.text or "GIT_BACKUP_PATH" in response.text, (
"Error message about backup not being configured should be shown"
)
def test_show_more_entries_button_visible_when_many_entries() -> None:
"""Test that the 'Show more entries' button is visible when there are more than 20 entries."""
# Add the webhook first
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists
feeds: Response = client.get(url="/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
# Add the feed
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Get the feed page
response: Response = client.get(url="/feed", params={"feed_url": feed_url})
assert response.status_code == 200, f"Failed to get /feed: {response.text}"
# Check if the feed has more than 20 entries by looking at the response
# The button should be visible if there are more than 20 entries
# We check for both the button text and the link structure
if "Show more entries" in response.text:
# Button is visible - verify it has the correct structure
assert "starting_after=" in response.text, "Show more entries button should contain starting_after parameter"
# The button should be a link to the feed page with pagination
assert (
f'href="/feed?feed_url={urllib.parse.quote(feed_url)}' in response.text
or f'href="/feed?feed_url={encoded_feed_url(feed_url)}' in response.text
), "Show more entries button should link back to the feed page"
def test_show_more_entries_button_not_visible_when_few_entries() -> None:
"""Test that the 'Show more entries' button is not visible when there are 20 or fewer entries."""
# Ensure webhook exists for this test regardless of test order.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Use a feed with very few entries
small_feed_url = "https://lovinator.space/rss_test_small.xml"
# Clean up if exists
client.post(url="/remove", data={"feed_url": small_feed_url})
# Add a small feed (this may not exist, so this test is conditional)
response: Response = client.post(url="/add", data={"feed_url": small_feed_url, "webhook_dropdown": webhook_name})
if response.status_code == 200:
# Get the feed page
response: Response = client.get(url="/feed", params={"feed_url": small_feed_url})
assert response.status_code == 200, f"Failed to get /feed: {response.text}"
# If the feed has 20 or fewer entries, the button should not be visible
# We check the total entry count in the page
if "0 entries" in response.text or " entries)" in response.text:
# Extract entry count and verify button visibility
match: re.Match[str] | None = re.search(r"\((\d+) entries\)", response.text)
if match:
entry_count = int(match.group(1))
if entry_count <= 20:
assert "Show more entries" not in response.text, (
f"Show more entries button should not be visible when there are {entry_count} entries"
)
# Clean up
client.post(url="/remove", data={"feed_url": small_feed_url})
def test_show_more_entries_pagination_works() -> None:
"""Test that pagination with starting_after parameter works correctly."""
# Add the webhook first
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists
feeds: Response = client.get(url="/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
# Add the feed
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Get the first page
response: Response = client.get(url="/feed", params={"feed_url": feed_url})
assert response.status_code == 200, f"Failed to get /feed: {response.text}"
# Check if pagination is available
if "Show more entries" in response.text and "starting_after=" in response.text:
# Extract the starting_after parameter from the button link
match: re.Match[str] | None = re.search(r'starting_after=([^"&]+)', response.text)
if match:
starting_after_id: str = match.group(1)
# Request the second page
response: Response = client.get(
url="/feed",
params={"feed_url": feed_url, "starting_after": starting_after_id},
)
assert response.status_code == 200, f"Failed to get paginated feed: {response.text}"
# Verify we got a valid response (the page should contain entries)
assert "entries)" in response.text, "Paginated page should show entry count"
def test_show_more_entries_button_context_variable() -> None:
"""Test that the button visibility variable is correctly passed to the template context."""
# Add the webhook first
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists
feeds: Response = client.get(url="/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
# Add the feed
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Get the feed page
response: Response = client.get(url="/feed", params={"feed_url": feed_url})
assert response.status_code == 200, f"Failed to get /feed: {response.text}"
# Extract the total entries count from the page
match: re.Match[str] | None = re.search(r"\((\d+) entries\)", response.text)
if match:
entry_count = int(match.group(1))
# If more than 20 entries, button should be visible
if entry_count > 20:
assert "Show more entries" in response.text, (
f"Button should be visible when there are {entry_count} entries (more than 20)"
)
# If 20 or fewer entries, button should not be visible
else:
assert "Show more entries" not in response.text, (
f"Button should not be visible when there are {entry_count} entries (20 or fewer)"
)
def test_create_html_marks_entries_from_another_feed(monkeypatch: pytest.MonkeyPatch) -> None:
"""Entries from another feed should be marked in /feed html output."""
@dataclass(slots=True)
class DummyContent:
value: str
@dataclass(slots=True)
class DummyFeed:
url: str
@dataclass(slots=True)
class DummyEntry:
feed: DummyFeed
id: str
original_feed_url: str | None = None
link: str = "https://example.com/post"
title: str = "Example title"
author: str = "Author"
summary: str = "Summary"
content: list[DummyContent] = field(default_factory=lambda: [DummyContent("Content")])
published: None = None
def __post_init__(self) -> None:
if self.original_feed_url is None:
self.original_feed_url = self.feed.url
selected_feed_url = "https://example.com/feed-a.xml"
same_feed_entry = DummyEntry(DummyFeed(selected_feed_url), "same")
# feed.url matches selected feed, but original_feed_url differs; marker should still show.
other_feed_entry = DummyEntry(
DummyFeed(selected_feed_url),
"other",
original_feed_url="https://example.com/feed-b.xml",
)
monkeypatch.setattr(
"discord_rss_bot.main.replace_tags_in_text_message",
lambda _entry, **_kwargs: "Rendered content",
)
monkeypatch.setattr("discord_rss_bot.main.entry_is_blacklisted", lambda _entry, **_kwargs: False)
monkeypatch.setattr("discord_rss_bot.main.entry_is_whitelisted", lambda _entry, **_kwargs: False)
same_feed_entry_typed: Entry = cast("Entry", same_feed_entry)
other_feed_entry_typed: Entry = cast("Entry", other_feed_entry)
html: str = create_html_for_feed(
reader=MagicMock(),
current_feed_url=selected_feed_url,
entries=[
same_feed_entry_typed,
other_feed_entry_typed,
],
)
assert "From another feed: https://example.com/feed-b.xml" in html
assert "From another feed: https://example.com/feed-a.xml" not in html
def test_webhook_entries_webhook_not_found() -> None:
"""Test webhook_entries endpoint returns 404 when webhook doesn't exist."""
nonexistent_webhook_url = "https://discord.com/api/webhooks/999999/nonexistent"
response: Response = client.get(
url="/webhook_entries",
params={"webhook_url": nonexistent_webhook_url},
)
assert response.status_code == 404, f"Expected 404 for non-existent webhook, got: {response.status_code}"
assert "Webhook not found" in response.text
def test_webhook_entries_no_feeds() -> None:
"""Test webhook_entries endpoint displays message when webhook has no feeds."""
# Clean up any existing feeds first
client.post(url="/remove", data={"feed_url": feed_url})
# Clean up and create a webhook
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Get webhook_entries without adding any feeds
response = client.get(
url="/webhook_entries",
params={"webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
assert webhook_name in response.text, "Webhook name not found in response"
assert "No feeds found" in response.text or "Add feeds" in response.text, "Expected message about no feeds"
def test_webhook_entries_no_feeds_still_shows_webhook_settings() -> None:
"""The webhook detail view should show settings/actions even with no attached feeds."""
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
response = client.get(
url="/webhook_entries",
params={"webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
assert "Settings" in response.text, "Expected settings card on webhook detail view"
assert "Modify Webhook" in response.text, "Expected modify form on webhook detail view"
assert "Delete Webhook" in response.text, "Expected delete action on webhook detail view"
assert "Back to dashboard" in response.text, "Expected dashboard navigation link"
assert "All webhooks" in response.text, "Expected all webhooks navigation link"
assert f'name="old_hook" value="{webhook_url}"' in response.text, "Expected old_hook hidden input"
assert f'value="/webhook_entries?webhook_url={urllib.parse.quote(webhook_url)}"' in response.text, (
"Expected modify form to redirect back to the current webhook detail view"
)
def test_webhook_entries_with_feeds_no_entries() -> None:
"""Test webhook_entries endpoint when webhook has feeds but no entries yet."""
# Clean up and create fresh webhook
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Use a feed URL that exists but has no entries (or clean feed)
empty_feed_url = "https://lovinator.space/empty_feed.xml"
client.post(url="/remove", data={"feed_url": empty_feed_url})
# Add the feed
response = client.post(
url="/add",
data={"feed_url": empty_feed_url, "webhook_dropdown": webhook_name},
)
# Get webhook_entries
response = client.get(
url="/webhook_entries",
params={"webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
assert webhook_name in response.text, "Webhook name not found in response"
# Clean up
client.post(url="/remove", data={"feed_url": empty_feed_url})
def test_webhook_entries_with_entries() -> None:
"""Test webhook_entries endpoint displays entries correctly."""
# Clean up and create webhook
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove and add the feed
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(
url="/add",
data={"feed_url": feed_url, "webhook_dropdown": webhook_name},
)
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Get webhook_entries
response = client.get(
url="/webhook_entries",
params={"webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
assert webhook_name in response.text, "Webhook name not found in response"
# Should show entries (the feed has entries)
assert "total from" in response.text, "Expected to see entry count"
assert "Modify Webhook" in response.text, "Expected webhook settings to be visible"
assert "Attached feeds" in response.text, "Expected attached feeds section to be visible"
def test_webhook_entries_shows_attached_feed_link() -> None:
"""The webhook detail view should list attached feeds linking to their feed pages."""
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(
url="/add",
data={"feed_url": feed_url, "webhook_dropdown": webhook_name},
)
assert response.status_code == 200, f"Failed to add feed: {response.text}"
response = client.get(
url="/webhook_entries",
params={"webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
assert f"/feed?feed_url={urllib.parse.quote(feed_url)}" in response.text, (
"Expected attached feed to link to its feed detail page"
)
assert "Latest entries" in response.text, "Expected latest entries heading on webhook detail view"
client.post(url="/remove", data={"feed_url": feed_url})
def test_webhook_entries_multiple_feeds() -> None:
"""Test webhook_entries endpoint shows feed count correctly."""
# Clean up and create webhook
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove and add feed
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(
url="/add",
data={"feed_url": feed_url, "webhook_dropdown": webhook_name},
)
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Get webhook_entries
response = client.get(
url="/webhook_entries",
params={"webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
assert webhook_name in response.text, "Webhook name not found in response"
# Should show entries and feed count
assert "feed" in response.text.lower(), "Expected to see feed information"
# Clean up
client.post(url="/remove", data={"feed_url": feed_url})
def test_webhook_entries_sort_newest_and_non_null_published_first() -> None:
"""Webhook entries should be sorted newest-first with published=None entries placed last."""
@dataclass(slots=True)
class DummyFeed:
url: str
title: str | None = None
updates_enabled: bool = True
last_exception: None = None
@dataclass(slots=True)
class DummyEntry:
id: str
feed: DummyFeed
published: datetime | None
dummy_feed = DummyFeed(url="https://example.com/feed.xml", title="Example Feed")
# Intentionally unsorted input with two dated entries and two undated entries.
unsorted_entries: list[Entry] = [
cast("Entry", DummyEntry(id="old", feed=dummy_feed, published=datetime(2024, 1, 1, tzinfo=UTC))),
cast("Entry", DummyEntry(id="none-1", feed=dummy_feed, published=None)),
cast("Entry", DummyEntry(id="new", feed=dummy_feed, published=datetime(2024, 2, 1, tzinfo=UTC))),
cast("Entry", DummyEntry(id="none-2", feed=dummy_feed, published=None)),
]
class StubReader:
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return [dummy_feed]
def get_entries(self, **_kwargs: object) -> list[Entry]:
return unsorted_entries
observed_order: list[str] = []
def capture_entries(*, reader: object, entries: list[Entry], current_feed_url: str = "") -> str:
del reader, current_feed_url
observed_order.extend(entry.id for entry in entries)
return ""
app.dependency_overrides[get_reader_dependency] = StubReader
try:
with (
patch(
"discord_rss_bot.main.get_data_from_hook_url",
return_value=main_module.WebhookInfo(custom_name=webhook_name, url=webhook_url),
),
patch("discord_rss_bot.main.create_html_for_feed", side_effect=capture_entries),
):
response: Response = client.get(
url="/webhook_entries",
params={"webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
assert observed_order == ["new", "old", "none-1", "none-2"], (
"Expected newest published entries first and published=None entries last"
)
finally:
app.dependency_overrides = {}
def test_webhook_entries_pagination() -> None:
"""Test webhook_entries endpoint pagination functionality."""
# Clean up and create webhook
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove and add the feed
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(
url="/add",
data={"feed_url": feed_url, "webhook_dropdown": webhook_name},
)
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Get first page of webhook_entries
response = client.get(
url="/webhook_entries",
params={"webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
# Check if pagination button is shown when there are many entries
# The button should be visible if total_entries > 20 (entries_per_page)
if "Load More Entries" in response.text:
# Extract the starting_after parameter from the pagination form
# This is a simple check that pagination elements exist
assert 'name="starting_after"' in response.text, "Expected pagination form with starting_after parameter"
# Clean up
client.post(url="/remove", data={"feed_url": feed_url})
def test_webhook_entries_url_encoding() -> None:
"""Test webhook_entries endpoint handles URL encoding correctly."""
# Clean up and create webhook
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove and add the feed
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(
url="/add",
data={"feed_url": feed_url, "webhook_dropdown": webhook_name},
)
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Get webhook_entries with URL-encoded webhook URL
encoded_webhook_url = urllib.parse.quote(webhook_url)
response = client.get(
url="/webhook_entries",
params={"webhook_url": encoded_webhook_url},
)
assert response.status_code == 200, f"Failed to get /webhook_entries with encoded URL: {response.text}"
assert webhook_name in response.text, "Webhook name not found in response"
# Clean up
client.post(url="/remove", data={"feed_url": feed_url})
def test_dashboard_webhook_name_links_to_webhook_detail() -> None:
"""Webhook names on the dashboard should open the webhook detail view."""
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
expected_link = f"/webhook_entries?webhook_url={urllib.parse.quote(webhook_url)}"
assert expected_link in response.text, "Expected dashboard webhook link to point to the webhook detail view"
client.post(url="/remove", data={"feed_url": feed_url})
def test_modify_webhook_redirects_back_to_webhook_detail() -> None:
"""Webhook updates from the detail view should redirect back to that view with the new URL."""
original_webhook_url = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz"
new_webhook_url = "https://discord.com/api/webhooks/1234567890/updated-token"
client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url})
client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url})
def test_modify_webhook_triggers_git_backup_commit() -> None:
"""Modifying a webhook URL should record a state change for git backup."""
original_webhook_url = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz"
new_webhook_url = "https://discord.com/api/webhooks/1234567890/updated-token"
client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url})
client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": original_webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
no_redirect_client = TestClient(app, follow_redirects=False)
with patch("discord_rss_bot.main.commit_state_change") as mock_commit_state_change:
response = no_redirect_client.post(
url="/modify_webhook",
data={
"old_hook": original_webhook_url,
"new_hook": new_webhook_url,
"redirect_to": f"/webhook_entries?webhook_url={urllib.parse.quote(original_webhook_url)}",
},
)
assert response.status_code == 303, f"Expected 303 redirect, got {response.status_code}: {response.text}"
assert mock_commit_state_change.call_count == 1, "Expected webhook modification to trigger git backup commit"
client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url})
response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": original_webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
no_redirect_client = TestClient(app, follow_redirects=False)
response = no_redirect_client.post(
url="/modify_webhook",
data={
"old_hook": original_webhook_url,
"new_hook": new_webhook_url,
"redirect_to": f"/webhook_entries?webhook_url={urllib.parse.quote(original_webhook_url)}",
},
)
assert response.status_code == 303, f"Expected 303 redirect, got {response.status_code}: {response.text}"
assert response.headers["location"] == (f"/webhook_entries?webhook_url={urllib.parse.quote(new_webhook_url)}"), (
f"Unexpected redirect location: {response.headers['location']}"
)
client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url})
def test_webhook_entries_mass_update_preview_shows_old_and_new_urls() -> None:
"""Preview should list old->new feed URLs for webhook bulk replacement."""
@dataclass(slots=True)
class DummyFeed:
url: str
title: str | None = None
updates_enabled: bool = True
last_exception: None = None
class StubReader:
def __init__(self) -> None:
self._feeds: list[DummyFeed] = [
DummyFeed(url="https://old.example.com/rss/a.xml", title="A"),
DummyFeed(url="https://old.example.com/rss/b.xml", title="B"),
DummyFeed(url="https://unchanged.example.com/rss/c.xml", title="C"),
]
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
if resource.startswith("https://old.example.com"):
return webhook_url
if resource.startswith("https://unchanged.example.com"):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return self._feeds
def get_entries(self, **_kwargs: object) -> list[Entry]:
return []
app.dependency_overrides[get_reader_dependency] = StubReader
try:
with (
patch(
"discord_rss_bot.main.get_data_from_hook_url",
return_value=main_module.WebhookInfo(custom_name=webhook_name, url=webhook_url),
),
patch(
"discord_rss_bot.main.resolve_final_feed_url",
side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
),
):
response: Response = client.get(
url="/webhook_entries",
params={
"webhook_url": webhook_url,
"replace_from": "old.example.com",
"replace_to": "new.example.com",
"resolve_urls": "true",
},
)
assert response.status_code == 200, f"Failed to get preview: {response.text}"
assert "Mass update feed URLs" in response.text
assert "old.example.com/rss/a.xml" in response.text
assert "new.example.com/rss/a.xml" in response.text
assert "Will update" in response.text
assert "Matched: 2" in response.text
assert "Will update: 2" in response.text
finally:
app.dependency_overrides = {}
def test_bulk_change_feed_urls_updates_matching_feeds() -> None:
"""Mass updater should change all matching feed URLs for a webhook."""
@dataclass(slots=True)
class DummyFeed:
url: str
class StubReader:
def __init__(self) -> None:
self._feeds = [
DummyFeed(url="https://old.example.com/rss/a.xml"),
DummyFeed(url="https://old.example.com/rss/b.xml"),
DummyFeed(url="https://unchanged.example.com/rss/c.xml"),
]
self.change_calls: list[tuple[str, str]] = []
self.updated_feeds: list[str] = []
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return self._feeds
def change_feed_url(self, old_url: str, new_url: str) -> None:
self.change_calls.append((old_url, new_url))
def update_feed(self, feed_url: str) -> None:
self.updated_feeds.append(feed_url)
def get_entries(self, **_kwargs: object) -> list[Entry]:
return []
def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001
return
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
no_redirect_client = TestClient(app, follow_redirects=False)
try:
with patch(
"discord_rss_bot.main.resolve_final_feed_url",
side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
):
response: Response = no_redirect_client.post(
url="/bulk_change_feed_urls",
data={
"webhook_url": webhook_url,
"replace_from": "old.example.com",
"replace_to": "new.example.com",
"resolve_urls": "true",
},
)
assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}"
assert "Updated%202%20feed%20URL%28s%29" in response.headers.get("location", "")
assert sorted(stub_reader.change_calls) == sorted([
("https://old.example.com/rss/a.xml", "https://new.example.com/rss/a.xml"),
("https://old.example.com/rss/b.xml", "https://new.example.com/rss/b.xml"),
])
assert sorted(stub_reader.updated_feeds) == sorted([
"https://new.example.com/rss/a.xml",
"https://new.example.com/rss/b.xml",
])
finally:
app.dependency_overrides = {}
def test_webhook_entries_mass_update_preview_fragment_endpoint() -> None:
"""HTMX preview endpoint should render only the mass-update preview fragment."""
@dataclass(slots=True)
class DummyFeed:
url: str
title: str | None = None
updates_enabled: bool = True
last_exception: None = None
class StubReader:
def __init__(self) -> None:
self._feeds: list[DummyFeed] = [
DummyFeed(url="https://old.example.com/rss/a.xml", title="A"),
DummyFeed(url="https://old.example.com/rss/b.xml", title="B"),
]
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if key == "webhook" and isinstance(resource, str):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return self._feeds
app.dependency_overrides[get_reader_dependency] = StubReader
try:
with patch(
"discord_rss_bot.main.resolve_final_feed_url",
side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
):
response: Response = client.get(
url="/webhook_entries_mass_update_preview",
params={
"webhook_url": webhook_url,
"replace_from": "old.example.com",
"replace_to": "new.example.com",
"resolve_urls": "true",
},
)
assert response.status_code == 200, f"Failed to get HTMX preview fragment: {response.text}"
assert "Will update: 2" in response.text
assert "<table" in response.text
assert "Mass update feed URLs" not in response.text, "Fragment should not include full page wrapper text"
finally:
app.dependency_overrides = {}
def test_bulk_change_feed_urls_force_update_overwrites_conflict() -> None: # noqa: C901
"""Force update should overwrite conflicting target URLs instead of skipping them."""
@dataclass(slots=True)
class DummyFeed:
url: str
class StubReader:
def __init__(self) -> None:
self._feeds = [
DummyFeed(url="https://old.example.com/rss/a.xml"),
DummyFeed(url="https://new.example.com/rss/a.xml"),
]
self.delete_calls: list[str] = []
self.change_calls: list[tuple[str, str]] = []
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return self._feeds
def delete_feed(self, feed_url: str) -> None:
self.delete_calls.append(feed_url)
def change_feed_url(self, old_url: str, new_url: str) -> None:
self.change_calls.append((old_url, new_url))
def update_feed(self, _feed_url: str) -> None:
return
def get_entries(self, **_kwargs: object) -> list[Entry]:
return []
def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001
return
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
no_redirect_client = TestClient(app, follow_redirects=False)
try:
with patch(
"discord_rss_bot.main.resolve_final_feed_url",
side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
):
response: Response = no_redirect_client.post(
url="/bulk_change_feed_urls",
data={
"webhook_url": webhook_url,
"replace_from": "old.example.com",
"replace_to": "new.example.com",
"resolve_urls": "true",
"force_update": "true",
},
)
assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}"
assert stub_reader.delete_calls == ["https://new.example.com/rss/a.xml"]
assert stub_reader.change_calls == [
(
"https://old.example.com/rss/a.xml",
"https://new.example.com/rss/a.xml",
),
]
assert "Force%20overwrote%201" in response.headers.get("location", "")
finally:
app.dependency_overrides = {}
def test_bulk_change_feed_urls_force_update_ignores_resolution_error() -> None:
"""Force update should proceed even when URL resolution returns an error (e.g. HTTP 404)."""
@dataclass(slots=True)
class DummyFeed:
url: str
class StubReader:
def __init__(self) -> None:
self._feeds = [
DummyFeed(url="https://old.example.com/rss/a.xml"),
]
self.change_calls: list[tuple[str, str]] = []
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return self._feeds
def change_feed_url(self, old_url: str, new_url: str) -> None:
self.change_calls.append((old_url, new_url))
def update_feed(self, _feed_url: str) -> None:
return
def get_entries(self, **_kwargs: object) -> list[Entry]:
return []
def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001
return
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
no_redirect_client = TestClient(app, follow_redirects=False)
try:
with patch(
"discord_rss_bot.main.resolve_final_feed_url",
return_value=("https://new.example.com/rss/a.xml", "HTTP 404"),
):
response: Response = no_redirect_client.post(
url="/bulk_change_feed_urls",
data={
"webhook_url": webhook_url,
"replace_from": "old.example.com",
"replace_to": "new.example.com",
"resolve_urls": "true",
"force_update": "true",
},
)
assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}"
assert stub_reader.change_calls == [
(
"https://old.example.com/rss/a.xml",
"https://new.example.com/rss/a.xml",
),
]
location = response.headers.get("location", "")
assert "Updated%201%20feed%20URL%28s%29" in location
assert "Failed%200" in location
finally:
app.dependency_overrides = {}
def test_reader_dependency_override_is_used() -> None:
"""Reader should be injectable and overridable via FastAPI dependency overrides."""
class StubReader:
def get_tag(self, _resource: str, _key: str, default: str | None = None) -> str | None:
"""Stub get_tag that always returns the default value.
Args:
_resource: Ignored.
_key: Ignored.
default: The value to return.
Returns:
The default value, simulating a missing tag.
"""
return default
app.dependency_overrides[get_reader_dependency] = StubReader
try:
response: Response = client.get(url="/add")
assert response.status_code == 200, f"Expected /add to render with overridden reader: {response.text}"
finally:
app.dependency_overrides = {}