discord-rss-bot/tests/test_main.py
Joakim Helleśen 955b94456d
All checks were successful
Test and build Docker image / docker (push) Successful in 24s
Add mass update functionality for feed URLs with preview
2026-03-16 04:48:34 +01:00

1621 lines
67 KiB
Python

from __future__ import annotations
import re
import urllib.parse
from dataclasses import dataclass
from dataclasses import field
from datetime import UTC
from datetime import datetime
from typing import TYPE_CHECKING
from typing import cast
from unittest.mock import MagicMock
from unittest.mock import patch
from fastapi.testclient import TestClient
import discord_rss_bot.main as main_module
from discord_rss_bot.main import app
from discord_rss_bot.main import create_html_for_feed
from discord_rss_bot.main import get_reader_dependency
if TYPE_CHECKING:
from pathlib import Path
import pytest
from httpx import Response
from reader import Entry
client: TestClient = TestClient(app)
webhook_name: str = "Hello, I am a webhook!"
webhook_url: str = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz"
feed_url: str = "https://lovinator.space/rss_test.xml"
def encoded_feed_url(url: str) -> str:
return urllib.parse.quote(feed_url) if url else ""
def test_search() -> None:
"""Test the /search page."""
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get("/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": encoded_feed_url(feed_url)})
# Delete the webhook if it already exists before we run the test.
response: Response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
# Add the webhook.
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Check that the feed was added.
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
# Search for an entry.
response: Response = client.get(url="/search/?query=a")
assert response.status_code == 200, f"Failed to search for entry: {response.text}"
def test_add_webhook() -> None:
"""Test the /add_webhook page."""
# Delete the webhook if it already exists before we run the test.
response: Response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
# Add the webhook.
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Check that the webhook was added.
response = client.get(url="/webhooks")
assert response.status_code == 200, f"Failed to get /webhooks: {response.text}"
assert webhook_name in response.text, f"Webhook not found in /webhooks: {response.text}"
def test_create_feed() -> None:
"""Test the /create_feed page."""
# Ensure webhook exists for this test regardless of test order.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get(url="/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": encoded_feed_url(feed_url)})
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Check that the feed was added.
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
def test_get() -> None:
"""Test the /create_feed page."""
# Ensure webhook exists for this test regardless of test order.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get("/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": encoded_feed_url(feed_url)})
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Check that the feed was added.
response = client.get("/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
response: Response = client.get(url="/add")
assert response.status_code == 200, f"/add failed: {response.text}"
response: Response = client.get(url="/add_webhook")
assert response.status_code == 200, f"/add_webhook failed: {response.text}"
response: Response = client.get(url="/blacklist", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/blacklist failed: {response.text}"
response: Response = client.get(url="/custom", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/custom failed: {response.text}"
response: Response = client.get(url="/embed", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/embed failed: {response.text}"
response: Response = client.get(url="/feed", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/feed failed: {response.text}"
response: Response = client.get(url="/")
assert response.status_code == 200, f"/ failed: {response.text}"
response: Response = client.get(url="/webhooks")
assert response.status_code == 200, f"/webhooks failed: {response.text}"
response = client.get(url="/webhook_entries", params={"webhook_url": webhook_url})
assert response.status_code == 200, f"/webhook_entries failed: {response.text}"
response: Response = client.get(url="/whitelist", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/whitelist failed: {response.text}"
def test_pause_feed() -> None:
"""Test the /pause_feed page."""
# Ensure webhook exists for this test regardless of test order.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get(url="/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": encoded_feed_url(feed_url)})
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Unpause the feed if it is paused.
feeds: Response = client.get(url="/")
if "Paused" in feeds.text:
response: Response = client.post(url="/unpause", data={"feed_url": feed_url})
assert response.status_code == 200, f"Failed to unpause feed: {response.text}"
# Pause the feed.
response: Response = client.post(url="/pause", data={"feed_url": feed_url})
assert response.status_code == 200, f"Failed to pause feed: {response.text}"
# Check that the feed was paused.
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
def test_unpause_feed() -> None:
"""Test the /unpause_feed page."""
# Ensure webhook exists for this test regardless of test order.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get("/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": encoded_feed_url(feed_url)})
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Pause the feed if it is unpaused.
feeds: Response = client.get(url="/")
if "Paused" not in feeds.text:
response: Response = client.post(url="/pause", data={"feed_url": feed_url})
assert response.status_code == 200, f"Failed to pause feed: {response.text}"
# Unpause the feed.
response: Response = client.post(url="/unpause", data={"feed_url": feed_url})
assert response.status_code == 200, f"Failed to unpause feed: {response.text}"
# Check that the feed was unpaused.
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
def test_remove_feed() -> None:
"""Test the /remove page."""
# Ensure webhook exists for this test regardless of test order.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get(url="/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": encoded_feed_url(feed_url)})
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Remove the feed.
response: Response = client.post(url="/remove", data={"feed_url": feed_url})
assert response.status_code == 200, f"Failed to remove feed: {response.text}"
# Check that the feed was removed.
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
assert feed_url not in response.text, f"Feed found in /: {response.text}"
def test_change_feed_url() -> None:
"""Test changing a feed URL from the feed page endpoint."""
new_feed_url = "https://lovinator.space/rss_test_small.xml"
# Ensure test feeds do not already exist.
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": new_feed_url})
# Ensure webhook exists.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Add the original feed.
response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Change feed URL.
response = client.post(
url="/change_feed_url",
data={"old_feed_url": feed_url, "new_feed_url": new_feed_url},
)
assert response.status_code == 200, f"Failed to change feed URL: {response.text}"
# New feed should be accessible.
response = client.get(url="/feed", params={"feed_url": new_feed_url})
assert response.status_code == 200, f"New feed URL is not accessible: {response.text}"
# Old feed should no longer be accessible.
response = client.get(url="/feed", params={"feed_url": feed_url})
assert response.status_code == 404, "Old feed URL should no longer exist"
# Cleanup.
client.post(url="/remove", data={"feed_url": new_feed_url})
def test_change_feed_url_marks_entries_as_read() -> None:
"""After changing a feed URL all entries on the new feed should be marked read to prevent resending."""
new_feed_url = "https://lovinator.space/rss_test_small.xml"
# Ensure feeds do not already exist.
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": new_feed_url})
# Ensure webhook exists.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
client.post(url="/add_webhook", data={"webhook_name": webhook_name, "webhook_url": webhook_url})
# Add the original feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Patch reader on the main module so we can observe calls.
mock_entry_a = MagicMock()
mock_entry_a.id = "entry-a"
mock_entry_b = MagicMock()
mock_entry_b.id = "entry-b"
real_reader = main_module.get_reader_dependency()
# Use a no-redirect client so the POST response is inspected directly; the
# redirect target (/feed?feed_url=…) would 404 because change_feed_url is mocked.
no_redirect_client = TestClient(app, follow_redirects=False)
with (
patch.object(real_reader, "get_entries", return_value=[mock_entry_a, mock_entry_b]) as mock_get_entries,
patch.object(real_reader, "set_entry_read") as mock_set_read,
patch.object(real_reader, "update_feed") as mock_update_feed,
patch.object(real_reader, "change_feed_url"),
):
response = no_redirect_client.post(
url="/change_feed_url",
data={"old_feed_url": feed_url, "new_feed_url": new_feed_url},
)
assert response.status_code == 303, f"Expected 303 redirect, got {response.status_code}: {response.text}"
# update_feed should have been called with the new URL.
mock_update_feed.assert_called_once_with(new_feed_url)
# get_entries should have been called to fetch unread entries on the new URL.
mock_get_entries.assert_called_once_with(feed=new_feed_url, read=False)
# Every returned entry should have been marked as read.
assert mock_set_read.call_count == 2, f"Expected 2 set_entry_read calls, got {mock_set_read.call_count}"
mock_set_read.assert_any_call(mock_entry_a, True)
mock_set_read.assert_any_call(mock_entry_b, True)
# Cleanup.
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": new_feed_url})
def test_change_feed_url_empty_old_url_returns_400() -> None:
"""Submitting an empty old_feed_url should return HTTP 400."""
response: Response = client.post(
url="/change_feed_url",
data={"old_feed_url": " ", "new_feed_url": "https://example.com/feed.xml"},
)
assert response.status_code == 400, f"Expected 400 for empty old URL, got {response.status_code}"
def test_change_feed_url_empty_new_url_returns_400() -> None:
"""Submitting a blank new_feed_url should return HTTP 400."""
response: Response = client.post(
url="/change_feed_url",
data={"old_feed_url": feed_url, "new_feed_url": " "},
)
assert response.status_code == 400, f"Expected 400 for blank new URL, got {response.status_code}"
def test_change_feed_url_nonexistent_old_url_returns_404() -> None:
"""Trying to rename a feed that does not exist should return HTTP 404."""
non_existent = "https://does-not-exist.example.com/rss.xml"
# Make sure it really is absent.
client.post(url="/remove", data={"feed_url": non_existent})
response: Response = client.post(
url="/change_feed_url",
data={"old_feed_url": non_existent, "new_feed_url": "https://example.com/new.xml"},
)
assert response.status_code == 404, f"Expected 404 for non-existent feed, got {response.status_code}"
def test_change_feed_url_new_url_already_exists_returns_409() -> None:
"""Changing to a URL that is already tracked should return HTTP 409."""
second_feed_url = "https://lovinator.space/rss_test_small.xml"
# Ensure both feeds are absent.
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": second_feed_url})
# Ensure webhook exists.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
client.post(url="/add_webhook", data={"webhook_name": webhook_name, "webhook_url": webhook_url})
# Add both feeds.
client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
client.post(url="/add", data={"feed_url": second_feed_url, "webhook_dropdown": webhook_name})
# Try to rename one to the other.
response: Response = client.post(
url="/change_feed_url",
data={"old_feed_url": feed_url, "new_feed_url": second_feed_url},
)
assert response.status_code == 409, f"Expected 409 when new URL already exists, got {response.status_code}"
# Cleanup.
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": second_feed_url})
def test_change_feed_url_same_url_redirects_without_error() -> None:
"""Changing a feed's URL to itself should redirect cleanly without any error."""
# Ensure webhook exists.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
client.post(url="/add_webhook", data={"webhook_name": webhook_name, "webhook_url": webhook_url})
# Add the feed.
client.post(url="/remove", data={"feed_url": feed_url})
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Submit the same URL as both old and new.
response = client.post(
url="/change_feed_url",
data={"old_feed_url": feed_url, "new_feed_url": feed_url},
)
assert response.status_code == 200, f"Expected 200 redirect for same URL, got {response.status_code}"
# Feed should still be accessible.
response = client.get(url="/feed", params={"feed_url": feed_url})
assert response.status_code == 200, f"Feed should still exist after no-op URL change: {response.text}"
# Cleanup.
client.post(url="/remove", data={"feed_url": feed_url})
def test_delete_webhook() -> None:
"""Test the /delete_webhook page."""
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get(url="/webhooks")
if webhook_url in feeds.text:
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
# Add the webhook.
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
# Delete the webhook.
response: Response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
assert response.status_code == 200, f"Failed to delete webhook: {response.text}"
# Check that the webhook was added.
response = client.get(url="/webhooks")
assert response.status_code == 200, f"Failed to get /webhooks: {response.text}"
assert webhook_name not in response.text, f"Webhook found in /webhooks: {response.text}"
def test_update_feed_not_found() -> None:
"""Test updating a non-existent feed."""
# Generate a feed URL that does not exist
nonexistent_feed_url = "https://nonexistent-feed.example.com/rss.xml"
# Try to update the non-existent feed
response: Response = client.get(url="/update", params={"feed_url": urllib.parse.quote(nonexistent_feed_url)})
# Check that it returns a 404 status code
assert response.status_code == 404, f"Expected 404 for non-existent feed, got: {response.status_code}"
assert "Feed not found" in response.text
def test_post_entry_send_to_discord() -> None:
"""Test that /post_entry sends an entry to Discord and redirects to the feed page.
Regression test for the bug where the injected reader was not passed to
send_entry_to_discord, meaning the dependency-injected reader was silently ignored.
"""
# Ensure webhook and feed exist.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Retrieve an entry from the feed to get a valid entry ID.
reader: main_module.Reader = main_module.get_reader_dependency()
entries: list[Entry] = list(reader.get_entries(feed=feed_url, limit=1))
assert entries, "Feed should have at least one entry to send"
entry_to_send: main_module.Entry = entries[0]
encoded_id: str = urllib.parse.quote(entry_to_send.id)
no_redirect_client = TestClient(app, follow_redirects=False)
# Patch execute_webhook so no real HTTP requests are made to Discord.
with patch("discord_rss_bot.feeds.execute_webhook") as mock_execute:
response = no_redirect_client.get(
url="/post_entry",
params={"entry_id": encoded_id, "feed_url": urllib.parse.quote(feed_url)},
)
assert response.status_code == 303, f"Expected redirect after sending, got {response.status_code}: {response.text}"
location: str = response.headers.get("location", "")
assert "feed?feed_url=" in location, f"Should redirect to feed page, got: {location}"
assert mock_execute.called, "execute_webhook should have been called to deliver the entry to Discord"
# Cleanup.
client.post(url="/remove", data={"feed_url": feed_url})
def test_post_entry_unknown_id_returns_404() -> None:
"""Test that /post_entry returns 404 when the entry ID does not exist."""
response: Response = client.get(
url="/post_entry",
params={"entry_id": "https://nonexistent.example.com/entry-that-does-not-exist"},
)
assert response.status_code == 404, f"Expected 404 for unknown entry, got {response.status_code}"
def test_post_entry_uses_feed_url_to_disambiguate_duplicate_ids() -> None:
"""When IDs collide across feeds, /post_entry should pick the entry from provided feed_url."""
@dataclass(slots=True)
class DummyFeed:
url: str
@dataclass(slots=True)
class DummyEntry:
id: str
feed: DummyFeed
feed_url: str
feed_a = "https://example.com/feed-a.xml"
feed_b = "https://example.com/feed-b.xml"
shared_id = "https://example.com/shared-entry-id"
entry_a: Entry = cast("Entry", DummyEntry(id=shared_id, feed=DummyFeed(feed_a), feed_url=feed_a))
entry_b: Entry = cast("Entry", DummyEntry(id=shared_id, feed=DummyFeed(feed_b), feed_url=feed_b))
class StubReader:
def get_entries(self, feed: str | None = None) -> list[Entry]:
if feed == feed_a:
return [entry_a]
if feed == feed_b:
return [entry_b]
return [entry_a, entry_b]
selected_feed_urls: list[str] = []
def fake_send_entry_to_discord(entry: Entry, reader: object) -> None:
selected_feed_urls.append(entry.feed.url)
app.dependency_overrides[get_reader_dependency] = StubReader
no_redirect_client = TestClient(app, follow_redirects=False)
try:
with patch("discord_rss_bot.main.send_entry_to_discord", side_effect=fake_send_entry_to_discord):
response: Response = no_redirect_client.get(
url="/post_entry",
params={"entry_id": urllib.parse.quote(shared_id), "feed_url": urllib.parse.quote(feed_b)},
)
assert response.status_code == 303, f"Expected redirect after sending, got {response.status_code}"
assert selected_feed_urls == [feed_b], f"Expected feed-b entry, got: {selected_feed_urls}"
location = response.headers.get("location", "")
assert urllib.parse.quote(feed_b) in location, f"Expected redirect to feed-b page, got: {location}"
finally:
app.dependency_overrides = {}
def test_navbar_backup_link_hidden_when_not_configured(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that the backup link is not shown in the navbar when GIT_BACKUP_PATH is not set."""
# Ensure GIT_BACKUP_PATH is not set
monkeypatch.delenv("GIT_BACKUP_PATH", raising=False)
# Get the index page
response: Response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
# Check that the backup button is not in the response
assert "Backup" not in response.text or 'action="/backup"' not in response.text, (
"Backup button should not be visible when GIT_BACKUP_PATH is not configured"
)
def test_navbar_backup_link_visible_when_configured(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""Test that the backup link is shown in the navbar when GIT_BACKUP_PATH is set."""
# Set GIT_BACKUP_PATH
monkeypatch.setenv("GIT_BACKUP_PATH", str(tmp_path))
# Get the index page
response: Response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
# Check that the backup button is in the response
assert "Backup" in response.text, "Backup button text should be visible when GIT_BACKUP_PATH is configured"
assert 'action="/backup"' in response.text, "Backup form should be visible when GIT_BACKUP_PATH is configured"
def test_backup_endpoint_returns_error_when_not_configured(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that the backup endpoint returns an error when GIT_BACKUP_PATH is not set."""
# Ensure GIT_BACKUP_PATH is not set
monkeypatch.delenv("GIT_BACKUP_PATH", raising=False)
# Try to trigger a backup
response: Response = client.post(url="/backup")
# Should redirect to index with error message
assert response.status_code == 200, f"Failed to post /backup: {response.text}"
assert "Git backup is not configured" in response.text or "GIT_BACKUP_PATH" in response.text, (
"Error message about backup not being configured should be shown"
)
def test_show_more_entries_button_visible_when_many_entries() -> None:
"""Test that the 'Show more entries' button is visible when there are more than 20 entries."""
# Add the webhook first
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists
feeds: Response = client.get(url="/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
# Add the feed
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Get the feed page
response: Response = client.get(url="/feed", params={"feed_url": feed_url})
assert response.status_code == 200, f"Failed to get /feed: {response.text}"
# Check if the feed has more than 20 entries by looking at the response
# The button should be visible if there are more than 20 entries
# We check for both the button text and the link structure
if "Show more entries" in response.text:
# Button is visible - verify it has the correct structure
assert "starting_after=" in response.text, "Show more entries button should contain starting_after parameter"
# The button should be a link to the feed page with pagination
assert (
f'href="/feed?feed_url={urllib.parse.quote(feed_url)}' in response.text
or f'href="/feed?feed_url={encoded_feed_url(feed_url)}' in response.text
), "Show more entries button should link back to the feed page"
def test_show_more_entries_button_not_visible_when_few_entries() -> None:
"""Test that the 'Show more entries' button is not visible when there are 20 or fewer entries."""
# Ensure webhook exists for this test regardless of test order.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Use a feed with very few entries
small_feed_url = "https://lovinator.space/rss_test_small.xml"
# Clean up if exists
client.post(url="/remove", data={"feed_url": small_feed_url})
# Add a small feed (this may not exist, so this test is conditional)
response: Response = client.post(url="/add", data={"feed_url": small_feed_url, "webhook_dropdown": webhook_name})
if response.status_code == 200:
# Get the feed page
response: Response = client.get(url="/feed", params={"feed_url": small_feed_url})
assert response.status_code == 200, f"Failed to get /feed: {response.text}"
# If the feed has 20 or fewer entries, the button should not be visible
# We check the total entry count in the page
if "0 entries" in response.text or " entries)" in response.text:
# Extract entry count and verify button visibility
match: re.Match[str] | None = re.search(r"\((\d+) entries\)", response.text)
if match:
entry_count = int(match.group(1))
if entry_count <= 20:
assert "Show more entries" not in response.text, (
f"Show more entries button should not be visible when there are {entry_count} entries"
)
# Clean up
client.post(url="/remove", data={"feed_url": small_feed_url})
def test_show_more_entries_pagination_works() -> None:
"""Test that pagination with starting_after parameter works correctly."""
# Add the webhook first
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists
feeds: Response = client.get(url="/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
# Add the feed
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Get the first page
response: Response = client.get(url="/feed", params={"feed_url": feed_url})
assert response.status_code == 200, f"Failed to get /feed: {response.text}"
# Check if pagination is available
if "Show more entries" in response.text and "starting_after=" in response.text:
# Extract the starting_after parameter from the button link
match: re.Match[str] | None = re.search(r'starting_after=([^"&]+)', response.text)
if match:
starting_after_id: str = match.group(1)
# Request the second page
response: Response = client.get(
url="/feed",
params={"feed_url": feed_url, "starting_after": starting_after_id},
)
assert response.status_code == 200, f"Failed to get paginated feed: {response.text}"
# Verify we got a valid response (the page should contain entries)
assert "entries)" in response.text, "Paginated page should show entry count"
def test_show_more_entries_button_context_variable() -> None:
"""Test that the button visibility variable is correctly passed to the template context."""
# Add the webhook first
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists
feeds: Response = client.get(url="/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
# Add the feed
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Get the feed page
response: Response = client.get(url="/feed", params={"feed_url": feed_url})
assert response.status_code == 200, f"Failed to get /feed: {response.text}"
# Extract the total entries count from the page
match: re.Match[str] | None = re.search(r"\((\d+) entries\)", response.text)
if match:
entry_count = int(match.group(1))
# If more than 20 entries, button should be visible
if entry_count > 20:
assert "Show more entries" in response.text, (
f"Button should be visible when there are {entry_count} entries (more than 20)"
)
# If 20 or fewer entries, button should not be visible
else:
assert "Show more entries" not in response.text, (
f"Button should not be visible when there are {entry_count} entries (20 or fewer)"
)
def test_create_html_marks_entries_from_another_feed(monkeypatch: pytest.MonkeyPatch) -> None:
"""Entries from another feed should be marked in /feed html output."""
@dataclass(slots=True)
class DummyContent:
value: str
@dataclass(slots=True)
class DummyFeed:
url: str
@dataclass(slots=True)
class DummyEntry:
feed: DummyFeed
id: str
original_feed_url: str | None = None
link: str = "https://example.com/post"
title: str = "Example title"
author: str = "Author"
summary: str = "Summary"
content: list[DummyContent] = field(default_factory=lambda: [DummyContent("Content")])
published: None = None
def __post_init__(self) -> None:
if self.original_feed_url is None:
self.original_feed_url = self.feed.url
selected_feed_url = "https://example.com/feed-a.xml"
same_feed_entry = DummyEntry(DummyFeed(selected_feed_url), "same")
# feed.url matches selected feed, but original_feed_url differs; marker should still show.
other_feed_entry = DummyEntry(
DummyFeed(selected_feed_url),
"other",
original_feed_url="https://example.com/feed-b.xml",
)
monkeypatch.setattr(
"discord_rss_bot.main.replace_tags_in_text_message",
lambda _entry, **_kwargs: "Rendered content",
)
monkeypatch.setattr("discord_rss_bot.main.entry_is_blacklisted", lambda _entry, **_kwargs: False)
monkeypatch.setattr("discord_rss_bot.main.entry_is_whitelisted", lambda _entry, **_kwargs: False)
same_feed_entry_typed: Entry = cast("Entry", same_feed_entry)
other_feed_entry_typed: Entry = cast("Entry", other_feed_entry)
html: str = create_html_for_feed(
reader=MagicMock(),
current_feed_url=selected_feed_url,
entries=[
same_feed_entry_typed,
other_feed_entry_typed,
],
)
assert "From another feed: https://example.com/feed-b.xml" in html
assert "From another feed: https://example.com/feed-a.xml" not in html
def test_webhook_entries_webhook_not_found() -> None:
"""Test webhook_entries endpoint returns 404 when webhook doesn't exist."""
nonexistent_webhook_url = "https://discord.com/api/webhooks/999999/nonexistent"
response: Response = client.get(
url="/webhook_entries",
params={"webhook_url": nonexistent_webhook_url},
)
assert response.status_code == 404, f"Expected 404 for non-existent webhook, got: {response.status_code}"
assert "Webhook not found" in response.text
def test_webhook_entries_no_feeds() -> None:
"""Test webhook_entries endpoint displays message when webhook has no feeds."""
# Clean up any existing feeds first
client.post(url="/remove", data={"feed_url": feed_url})
# Clean up and create a webhook
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Get webhook_entries without adding any feeds
response = client.get(
url="/webhook_entries",
params={"webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
assert webhook_name in response.text, "Webhook name not found in response"
assert "No feeds found" in response.text or "Add feeds" in response.text, "Expected message about no feeds"
def test_webhook_entries_no_feeds_still_shows_webhook_settings() -> None:
"""The webhook detail view should show settings/actions even with no attached feeds."""
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
response = client.get(
url="/webhook_entries",
params={"webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
assert "Settings" in response.text, "Expected settings card on webhook detail view"
assert "Modify Webhook" in response.text, "Expected modify form on webhook detail view"
assert "Delete Webhook" in response.text, "Expected delete action on webhook detail view"
assert "Back to dashboard" in response.text, "Expected dashboard navigation link"
assert "All webhooks" in response.text, "Expected all webhooks navigation link"
assert f'name="old_hook" value="{webhook_url}"' in response.text, "Expected old_hook hidden input"
assert f'value="/webhook_entries?webhook_url={urllib.parse.quote(webhook_url)}"' in response.text, (
"Expected modify form to redirect back to the current webhook detail view"
)
def test_webhook_entries_with_feeds_no_entries() -> None:
"""Test webhook_entries endpoint when webhook has feeds but no entries yet."""
# Clean up and create fresh webhook
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Use a feed URL that exists but has no entries (or clean feed)
empty_feed_url = "https://lovinator.space/empty_feed.xml"
client.post(url="/remove", data={"feed_url": empty_feed_url})
# Add the feed
response = client.post(
url="/add",
data={"feed_url": empty_feed_url, "webhook_dropdown": webhook_name},
)
# Get webhook_entries
response = client.get(
url="/webhook_entries",
params={"webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
assert webhook_name in response.text, "Webhook name not found in response"
# Clean up
client.post(url="/remove", data={"feed_url": empty_feed_url})
def test_webhook_entries_with_entries() -> None:
"""Test webhook_entries endpoint displays entries correctly."""
# Clean up and create webhook
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove and add the feed
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(
url="/add",
data={"feed_url": feed_url, "webhook_dropdown": webhook_name},
)
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Get webhook_entries
response = client.get(
url="/webhook_entries",
params={"webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
assert webhook_name in response.text, "Webhook name not found in response"
# Should show entries (the feed has entries)
assert "total from" in response.text, "Expected to see entry count"
assert "Modify Webhook" in response.text, "Expected webhook settings to be visible"
assert "Attached feeds" in response.text, "Expected attached feeds section to be visible"
def test_webhook_entries_shows_attached_feed_link() -> None:
"""The webhook detail view should list attached feeds linking to their feed pages."""
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(
url="/add",
data={"feed_url": feed_url, "webhook_dropdown": webhook_name},
)
assert response.status_code == 200, f"Failed to add feed: {response.text}"
response = client.get(
url="/webhook_entries",
params={"webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
assert f"/feed?feed_url={urllib.parse.quote(feed_url)}" in response.text, (
"Expected attached feed to link to its feed detail page"
)
assert "Latest entries" in response.text, "Expected latest entries heading on webhook detail view"
client.post(url="/remove", data={"feed_url": feed_url})
def test_webhook_entries_multiple_feeds() -> None:
"""Test webhook_entries endpoint shows feed count correctly."""
# Clean up and create webhook
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove and add feed
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(
url="/add",
data={"feed_url": feed_url, "webhook_dropdown": webhook_name},
)
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Get webhook_entries
response = client.get(
url="/webhook_entries",
params={"webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
assert webhook_name in response.text, "Webhook name not found in response"
# Should show entries and feed count
assert "feed" in response.text.lower(), "Expected to see feed information"
# Clean up
client.post(url="/remove", data={"feed_url": feed_url})
def test_webhook_entries_sort_newest_and_non_null_published_first() -> None:
"""Webhook entries should be sorted newest-first with published=None entries placed last."""
@dataclass(slots=True)
class DummyFeed:
url: str
title: str | None = None
updates_enabled: bool = True
last_exception: None = None
@dataclass(slots=True)
class DummyEntry:
id: str
feed: DummyFeed
published: datetime | None
dummy_feed = DummyFeed(url="https://example.com/feed.xml", title="Example Feed")
# Intentionally unsorted input with two dated entries and two undated entries.
unsorted_entries: list[Entry] = [
cast("Entry", DummyEntry(id="old", feed=dummy_feed, published=datetime(2024, 1, 1, tzinfo=UTC))),
cast("Entry", DummyEntry(id="none-1", feed=dummy_feed, published=None)),
cast("Entry", DummyEntry(id="new", feed=dummy_feed, published=datetime(2024, 2, 1, tzinfo=UTC))),
cast("Entry", DummyEntry(id="none-2", feed=dummy_feed, published=None)),
]
class StubReader:
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return [dummy_feed]
def get_entries(self, **_kwargs: object) -> list[Entry]:
return unsorted_entries
observed_order: list[str] = []
def capture_entries(*, reader: object, entries: list[Entry], current_feed_url: str = "") -> str:
del reader, current_feed_url
observed_order.extend(entry.id for entry in entries)
return ""
app.dependency_overrides[get_reader_dependency] = StubReader
try:
with (
patch(
"discord_rss_bot.main.get_data_from_hook_url",
return_value=main_module.WebhookInfo(custom_name=webhook_name, url=webhook_url),
),
patch("discord_rss_bot.main.create_html_for_feed", side_effect=capture_entries),
):
response: Response = client.get(
url="/webhook_entries",
params={"webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
assert observed_order == ["new", "old", "none-1", "none-2"], (
"Expected newest published entries first and published=None entries last"
)
finally:
app.dependency_overrides = {}
def test_webhook_entries_pagination() -> None:
"""Test webhook_entries endpoint pagination functionality."""
# Clean up and create webhook
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove and add the feed
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(
url="/add",
data={"feed_url": feed_url, "webhook_dropdown": webhook_name},
)
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Get first page of webhook_entries
response = client.get(
url="/webhook_entries",
params={"webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
# Check if pagination button is shown when there are many entries
# The button should be visible if total_entries > 20 (entries_per_page)
if "Load More Entries" in response.text:
# Extract the starting_after parameter from the pagination form
# This is a simple check that pagination elements exist
assert 'name="starting_after"' in response.text, "Expected pagination form with starting_after parameter"
# Clean up
client.post(url="/remove", data={"feed_url": feed_url})
def test_webhook_entries_url_encoding() -> None:
"""Test webhook_entries endpoint handles URL encoding correctly."""
# Clean up and create webhook
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove and add the feed
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(
url="/add",
data={"feed_url": feed_url, "webhook_dropdown": webhook_name},
)
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Get webhook_entries with URL-encoded webhook URL
encoded_webhook_url = urllib.parse.quote(webhook_url)
response = client.get(
url="/webhook_entries",
params={"webhook_url": encoded_webhook_url},
)
assert response.status_code == 200, f"Failed to get /webhook_entries with encoded URL: {response.text}"
assert webhook_name in response.text, "Webhook name not found in response"
# Clean up
client.post(url="/remove", data={"feed_url": feed_url})
def test_dashboard_webhook_name_links_to_webhook_detail() -> None:
"""Webhook names on the dashboard should open the webhook detail view."""
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
client.post(url="/remove", data={"feed_url": feed_url})
response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
expected_link = f"/webhook_entries?webhook_url={urllib.parse.quote(webhook_url)}"
assert expected_link in response.text, "Expected dashboard webhook link to point to the webhook detail view"
client.post(url="/remove", data={"feed_url": feed_url})
def test_modify_webhook_redirects_back_to_webhook_detail() -> None:
"""Webhook updates from the detail view should redirect back to that view with the new URL."""
original_webhook_url = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz"
new_webhook_url = "https://discord.com/api/webhooks/1234567890/updated-token"
client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url})
client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url})
def test_modify_webhook_triggers_git_backup_commit() -> None:
"""Modifying a webhook URL should record a state change for git backup."""
original_webhook_url = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz"
new_webhook_url = "https://discord.com/api/webhooks/1234567890/updated-token"
client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url})
client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": original_webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
no_redirect_client = TestClient(app, follow_redirects=False)
with patch("discord_rss_bot.main.commit_state_change") as mock_commit_state_change:
response = no_redirect_client.post(
url="/modify_webhook",
data={
"old_hook": original_webhook_url,
"new_hook": new_webhook_url,
"redirect_to": f"/webhook_entries?webhook_url={urllib.parse.quote(original_webhook_url)}",
},
)
assert response.status_code == 303, f"Expected 303 redirect, got {response.status_code}: {response.text}"
assert mock_commit_state_change.call_count == 1, "Expected webhook modification to trigger git backup commit"
client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url})
response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": original_webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
no_redirect_client = TestClient(app, follow_redirects=False)
response = no_redirect_client.post(
url="/modify_webhook",
data={
"old_hook": original_webhook_url,
"new_hook": new_webhook_url,
"redirect_to": f"/webhook_entries?webhook_url={urllib.parse.quote(original_webhook_url)}",
},
)
assert response.status_code == 303, f"Expected 303 redirect, got {response.status_code}: {response.text}"
assert response.headers["location"] == (f"/webhook_entries?webhook_url={urllib.parse.quote(new_webhook_url)}"), (
f"Unexpected redirect location: {response.headers['location']}"
)
client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url})
def test_webhook_entries_mass_update_preview_shows_old_and_new_urls() -> None:
"""Preview should list old->new feed URLs for webhook bulk replacement."""
@dataclass(slots=True)
class DummyFeed:
url: str
title: str | None = None
updates_enabled: bool = True
last_exception: None = None
class StubReader:
def __init__(self) -> None:
self._feeds: list[DummyFeed] = [
DummyFeed(url="https://old.example.com/rss/a.xml", title="A"),
DummyFeed(url="https://old.example.com/rss/b.xml", title="B"),
DummyFeed(url="https://unchanged.example.com/rss/c.xml", title="C"),
]
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
if resource.startswith("https://old.example.com"):
return webhook_url
if resource.startswith("https://unchanged.example.com"):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return self._feeds
def get_entries(self, **_kwargs: object) -> list[Entry]:
return []
app.dependency_overrides[get_reader_dependency] = StubReader
try:
with (
patch(
"discord_rss_bot.main.get_data_from_hook_url",
return_value=main_module.WebhookInfo(custom_name=webhook_name, url=webhook_url),
),
patch(
"discord_rss_bot.main.resolve_final_feed_url",
side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
),
):
response: Response = client.get(
url="/webhook_entries",
params={
"webhook_url": webhook_url,
"replace_from": "old.example.com",
"replace_to": "new.example.com",
"resolve_urls": "true",
},
)
assert response.status_code == 200, f"Failed to get preview: {response.text}"
assert "Mass update feed URLs" in response.text
assert "old.example.com/rss/a.xml" in response.text
assert "new.example.com/rss/a.xml" in response.text
assert "Will update" in response.text
assert "Matched: 2" in response.text
assert "Will update: 2" in response.text
finally:
app.dependency_overrides = {}
def test_bulk_change_feed_urls_updates_matching_feeds() -> None:
"""Mass updater should change all matching feed URLs for a webhook."""
@dataclass(slots=True)
class DummyFeed:
url: str
class StubReader:
def __init__(self) -> None:
self._feeds = [
DummyFeed(url="https://old.example.com/rss/a.xml"),
DummyFeed(url="https://old.example.com/rss/b.xml"),
DummyFeed(url="https://unchanged.example.com/rss/c.xml"),
]
self.change_calls: list[tuple[str, str]] = []
self.updated_feeds: list[str] = []
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return self._feeds
def change_feed_url(self, old_url: str, new_url: str) -> None:
self.change_calls.append((old_url, new_url))
def update_feed(self, feed_url: str) -> None:
self.updated_feeds.append(feed_url)
def get_entries(self, **_kwargs: object) -> list[Entry]:
return []
def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001
return
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
no_redirect_client = TestClient(app, follow_redirects=False)
try:
with patch(
"discord_rss_bot.main.resolve_final_feed_url",
side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
):
response: Response = no_redirect_client.post(
url="/bulk_change_feed_urls",
data={
"webhook_url": webhook_url,
"replace_from": "old.example.com",
"replace_to": "new.example.com",
"resolve_urls": "true",
},
)
assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}"
assert "Updated%202%20feed%20URL%28s%29" in response.headers.get("location", "")
assert sorted(stub_reader.change_calls) == sorted([
("https://old.example.com/rss/a.xml", "https://new.example.com/rss/a.xml"),
("https://old.example.com/rss/b.xml", "https://new.example.com/rss/b.xml"),
])
assert sorted(stub_reader.updated_feeds) == sorted([
"https://new.example.com/rss/a.xml",
"https://new.example.com/rss/b.xml",
])
finally:
app.dependency_overrides = {}
def test_webhook_entries_mass_update_preview_fragment_endpoint() -> None:
"""HTMX preview endpoint should render only the mass-update preview fragment."""
@dataclass(slots=True)
class DummyFeed:
url: str
title: str | None = None
updates_enabled: bool = True
last_exception: None = None
class StubReader:
def __init__(self) -> None:
self._feeds: list[DummyFeed] = [
DummyFeed(url="https://old.example.com/rss/a.xml", title="A"),
DummyFeed(url="https://old.example.com/rss/b.xml", title="B"),
]
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if key == "webhook" and isinstance(resource, str):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return self._feeds
app.dependency_overrides[get_reader_dependency] = StubReader
try:
with patch(
"discord_rss_bot.main.resolve_final_feed_url",
side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
):
response: Response = client.get(
url="/webhook_entries_mass_update_preview",
params={
"webhook_url": webhook_url,
"replace_from": "old.example.com",
"replace_to": "new.example.com",
"resolve_urls": "true",
},
)
assert response.status_code == 200, f"Failed to get HTMX preview fragment: {response.text}"
assert "Will update: 2" in response.text
assert "<table" in response.text
assert "Mass update feed URLs" not in response.text, "Fragment should not include full page wrapper text"
finally:
app.dependency_overrides = {}
def test_bulk_change_feed_urls_force_update_overwrites_conflict() -> None: # noqa: C901
"""Force update should overwrite conflicting target URLs instead of skipping them."""
@dataclass(slots=True)
class DummyFeed:
url: str
class StubReader:
def __init__(self) -> None:
self._feeds = [
DummyFeed(url="https://old.example.com/rss/a.xml"),
DummyFeed(url="https://new.example.com/rss/a.xml"),
]
self.delete_calls: list[str] = []
self.change_calls: list[tuple[str, str]] = []
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return self._feeds
def delete_feed(self, feed_url: str) -> None:
self.delete_calls.append(feed_url)
def change_feed_url(self, old_url: str, new_url: str) -> None:
self.change_calls.append((old_url, new_url))
def update_feed(self, _feed_url: str) -> None:
return
def get_entries(self, **_kwargs: object) -> list[Entry]:
return []
def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001
return
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
no_redirect_client = TestClient(app, follow_redirects=False)
try:
with patch(
"discord_rss_bot.main.resolve_final_feed_url",
side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
):
response: Response = no_redirect_client.post(
url="/bulk_change_feed_urls",
data={
"webhook_url": webhook_url,
"replace_from": "old.example.com",
"replace_to": "new.example.com",
"resolve_urls": "true",
"force_update": "true",
},
)
assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}"
assert stub_reader.delete_calls == ["https://new.example.com/rss/a.xml"]
assert stub_reader.change_calls == [
(
"https://old.example.com/rss/a.xml",
"https://new.example.com/rss/a.xml",
),
]
assert "Force%20overwrote%201" in response.headers.get("location", "")
finally:
app.dependency_overrides = {}
def test_bulk_change_feed_urls_force_update_ignores_resolution_error() -> None:
"""Force update should proceed even when URL resolution returns an error (e.g. HTTP 404)."""
@dataclass(slots=True)
class DummyFeed:
url: str
class StubReader:
def __init__(self) -> None:
self._feeds = [
DummyFeed(url="https://old.example.com/rss/a.xml"),
]
self.change_calls: list[tuple[str, str]] = []
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return self._feeds
def change_feed_url(self, old_url: str, new_url: str) -> None:
self.change_calls.append((old_url, new_url))
def update_feed(self, _feed_url: str) -> None:
return
def get_entries(self, **_kwargs: object) -> list[Entry]:
return []
def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001
return
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
no_redirect_client = TestClient(app, follow_redirects=False)
try:
with patch(
"discord_rss_bot.main.resolve_final_feed_url",
return_value=("https://new.example.com/rss/a.xml", "HTTP 404"),
):
response: Response = no_redirect_client.post(
url="/bulk_change_feed_urls",
data={
"webhook_url": webhook_url,
"replace_from": "old.example.com",
"replace_to": "new.example.com",
"resolve_urls": "true",
"force_update": "true",
},
)
assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}"
assert stub_reader.change_calls == [
(
"https://old.example.com/rss/a.xml",
"https://new.example.com/rss/a.xml",
),
]
location = response.headers.get("location", "")
assert "Updated%201%20feed%20URL%28s%29" in location
assert "Failed%200" in location
finally:
app.dependency_overrides = {}
def test_reader_dependency_override_is_used() -> None:
"""Reader should be injectable and overridable via FastAPI dependency overrides."""
class StubReader:
def get_tag(self, _resource: str, _key: str, default: str | None = None) -> str | None:
"""Stub get_tag that always returns the default value.
Args:
_resource: Ignored.
_key: Ignored.
default: The value to return.
Returns:
The default value, simulating a missing tag.
"""
return default
app.dependency_overrides[get_reader_dependency] = StubReader
try:
response: Response = client.get(url="/add")
assert response.status_code == 200, f"Expected /add to render with overridden reader: {response.text}"
finally:
app.dependency_overrides = {}