Add feed URL change functionality and related tests

This commit is contained in:
Joakim Hellsén 2026-03-07 06:29:12 +01:00
commit d87341d729
Signed by: Joakim Hellsén
SSH key fingerprint: SHA256:/9h/CsExpFp+PRhsfA0xznFx2CGfTT5R/kpuFfUgEQk
9 changed files with 176 additions and 14 deletions

View file

@ -1,4 +1,4 @@
# You can optionally store backups of your bot's configuration in a git repository. # You can optionally store backups of your bot's configuration in a git repository.
# This allows you to track changes by subscribing to the repository or using a RSS feed. # This allows you to track changes by subscribing to the repository or using a RSS feed.
# Local path for the backup git repository (e.g., /data/backup or /home/user/backups/discord-rss-bot) # Local path for the backup git repository (e.g., /data/backup or /home/user/backups/discord-rss-bot)
# When set, the bot will initialize a git repo here and commit state.json after every configuration change # When set, the bot will initialize a git repo here and commit state.json after every configuration change

View file

@ -2,10 +2,13 @@ from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from discord_rss_bot.filter.utils import is_regex_match, is_word_in_text from discord_rss_bot.filter.utils import is_regex_match
from discord_rss_bot.filter.utils import is_word_in_text
if TYPE_CHECKING: if TYPE_CHECKING:
from reader import Entry, Feed, Reader from reader import Entry
from reader import Feed
from reader import Reader
def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool: def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool:

View file

@ -2,10 +2,13 @@ from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from discord_rss_bot.filter.utils import is_regex_match, is_word_in_text from discord_rss_bot.filter.utils import is_regex_match
from discord_rss_bot.filter.utils import is_word_in_text
if TYPE_CHECKING: if TYPE_CHECKING:
from reader import Entry, Feed, Reader from reader import Entry
from reader import Feed
from reader import Reader
def has_white_tags(custom_reader: Reader, feed: Feed) -> bool: def has_white_tags(custom_reader: Reader, feed: Feed) -> bool:

View file

@ -179,7 +179,7 @@ def export_state(reader: Reader, backup_path: Path) -> None:
try: try:
webhooks: list[str | int | float | bool | dict[str, Any] | list[Any] | None] = list( webhooks: list[str | int | float | bool | dict[str, Any] | list[Any] | None] = list(
reader.get_tag((), "webhooks", []) reader.get_tag((), "webhooks", []),
) )
except TagNotFoundError: except TagNotFoundError:
webhooks = [] webhooks = []

View file

@ -31,8 +31,10 @@ from markdownify import markdownify
from reader import Entry from reader import Entry
from reader import EntryNotFoundError from reader import EntryNotFoundError
from reader import Feed from reader import Feed
from reader import FeedExistsError
from reader import FeedNotFoundError from reader import FeedNotFoundError
from reader import Reader from reader import Reader
from reader import ReaderError
from reader import TagNotFoundError from reader import TagNotFoundError
from starlette.responses import RedirectResponse from starlette.responses import RedirectResponse
@ -697,6 +699,45 @@ async def post_set_update_interval(
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.post("/change_feed_url")
async def post_change_feed_url(
old_feed_url: Annotated[str, Form()],
new_feed_url: Annotated[str, Form()],
) -> RedirectResponse:
"""Change the URL for an existing feed.
Args:
old_feed_url: Current feed URL.
new_feed_url: New feed URL to change to.
Returns:
RedirectResponse: Redirect to the feed page for the resulting URL.
Raises:
HTTPException: If the old feed is not found, the new URL already exists, or change fails.
"""
clean_old_feed_url: str = old_feed_url.strip()
clean_new_feed_url: str = new_feed_url.strip()
if not clean_old_feed_url or not clean_new_feed_url:
raise HTTPException(status_code=400, detail="Feed URLs cannot be empty")
if clean_old_feed_url == clean_new_feed_url:
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_old_feed_url)}", status_code=303)
try:
reader.change_feed_url(clean_old_feed_url, clean_new_feed_url)
except FeedNotFoundError as e:
raise HTTPException(status_code=404, detail=f"Feed not found: {clean_old_feed_url}") from e
except FeedExistsError as e:
raise HTTPException(status_code=409, detail=f"Feed already exists: {clean_new_feed_url}") from e
except ReaderError as e:
raise HTTPException(status_code=400, detail=f"Failed to change feed URL: {e}") from e
commit_state_change(reader, f"Change feed URL from {clean_old_feed_url} to {clean_new_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_new_feed_url)}", status_code=303)
@app.post("/reset_update_interval") @app.post("/reset_update_interval")
async def post_reset_update_interval( async def post_reset_update_interval(
feed_url: Annotated[str, Form()], feed_url: Annotated[str, Form()],
@ -804,7 +845,7 @@ async def get_feed(feed_url: str, request: Request, starting_after: str = ""):
except EntryNotFoundError as e: except EntryNotFoundError as e:
current_entries = list(reader.get_entries(feed=clean_feed_url)) current_entries = list(reader.get_entries(feed=clean_feed_url))
msg: str = f"{e}\n\n{[entry.id for entry in current_entries]}" msg: str = f"{e}\n\n{[entry.id for entry in current_entries]}"
html: str = create_html_for_feed(current_entries) html: str = create_html_for_feed(current_entries, clean_feed_url)
# Get feed and global intervals for error case too # Get feed and global intervals for error case too
feed_interval: int | None = None feed_interval: int | None = None
@ -860,7 +901,7 @@ async def get_feed(feed_url: str, request: Request, starting_after: str = ""):
last_entry = entries[-1] last_entry = entries[-1]
# Create the html for the entries. # Create the html for the entries.
html: str = create_html_for_feed(entries) html: str = create_html_for_feed(entries, clean_feed_url)
try: try:
should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed")) should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed"))
@ -907,11 +948,12 @@ async def get_feed(feed_url: str, request: Request, starting_after: str = ""):
return templates.TemplateResponse(request=request, name="feed.html", context=context) return templates.TemplateResponse(request=request, name="feed.html", context=context)
def create_html_for_feed(entries: Iterable[Entry]) -> str: def create_html_for_feed(entries: Iterable[Entry], current_feed_url: str = "") -> str: # noqa: PLR0914
"""Create HTML for the search results. """Create HTML for the search results.
Args: Args:
entries: The entries to create HTML for. entries: The entries to create HTML for.
current_feed_url: The feed URL currently being viewed in /feed.
Returns: Returns:
str: The HTML for the search results. str: The HTML for the search results.
@ -940,6 +982,12 @@ def create_html_for_feed(entries: Iterable[Entry]) -> str:
if entry_is_whitelisted(entry): if entry_is_whitelisted(entry):
whitelisted = "<span class='badge bg-success'>Whitelisted</span>" whitelisted = "<span class='badge bg-success'>Whitelisted</span>"
source_feed_url: str = getattr(entry, "original_feed_url", None) or entry.feed.url
from_another_feed: str = ""
if current_feed_url and source_feed_url != current_feed_url:
from_another_feed = f"<span class='badge bg-warning text-dark'>From another feed: {source_feed_url}</span>"
entry_id: str = urllib.parse.quote(entry.id) entry_id: str = urllib.parse.quote(entry.id)
to_discord_html: str = f"<a class='text-muted' href='/post_entry?entry_id={entry_id}'>Send to Discord</a>" to_discord_html: str = f"<a class='text-muted' href='/post_entry?entry_id={entry_id}'>Send to Discord</a>"
@ -966,14 +1014,14 @@ def create_html_for_feed(entries: Iterable[Entry]) -> str:
image_html: str = f"<img src='{first_image}' class='img-fluid'>" if first_image else "" image_html: str = f"<img src='{first_image}' class='img-fluid'>" if first_image else ""
html += f"""<div class="p-2 mb-2 border border-dark"> html += f"""<div class="p-2 mb-2 border border-dark">
{blacklisted}{whitelisted}<a class="text-muted text-decoration-none" href="{entry.link}"><h2>{entry.title}</h2></a> {blacklisted}{whitelisted}{from_another_feed}<a class="text-muted text-decoration-none" href="{entry.link}"><h2>{entry.title}</h2></a>
{f"By {entry.author} @" if entry.author else ""}{published} - {to_discord_html} {f"By {entry.author} @" if entry.author else ""}{published} - {to_discord_html}
{text} {text}
{video_embed_html} {video_embed_html}
{image_html} {image_html}
</div> </div>
""" """ # noqa: E501
return html.strip() return html.strip()

View file

@ -16,4 +16,4 @@ body {
.interval-input { .interval-input {
max-width: 120px; max-width: 120px;
} }

View file

@ -76,6 +76,22 @@
</a> </a>
{% endif %} {% endif %}
</div> </div>
<!-- Feed URL Configuration -->
<div class="mt-4 border-top border-secondary pt-3">
<h5 class="mb-3">Feed URL</h5>
<p class="text-muted mb-2">Change the URL for this feed. This can be useful if a feed has moved.</p>
<form action="/change_feed_url" method="post" class="mb-2">
<input type="hidden" name="old_feed_url" value="{{ feed.url }}" />
<div class="input-group input-group-sm mb-2">
<input type="url"
class="form-control form-control-sm"
name="new_feed_url"
value="{{ feed.url }}"
required />
<button class="btn btn-warning" type="submit">Update URL</button>
</div>
</form>
</div>
<!-- Feed Metadata --> <!-- Feed Metadata -->
<div class="mt-4 border-top border-secondary pt-3"> <div class="mt-4 border-top border-secondary pt-3">
<h5 class="mb-3">Feed Information</h5> <h5 class="mb-3">Feed Information</h5>

View file

@ -25,7 +25,8 @@ if TYPE_CHECKING:
SKIP_IF_NO_GIT: pytest.MarkDecorator = pytest.mark.skipif( SKIP_IF_NO_GIT: pytest.MarkDecorator = pytest.mark.skipif(
shutil.which("git") is None, reason="git executable not found" shutil.which("git") is None,
reason="git executable not found",
) )

View file

@ -2,17 +2,22 @@ from __future__ import annotations
import re import re
import urllib.parse import urllib.parse
from dataclasses import dataclass
from dataclasses import field
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import cast
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from discord_rss_bot.main import app from discord_rss_bot.main import app
from discord_rss_bot.main import create_html_for_feed
if TYPE_CHECKING: if TYPE_CHECKING:
from pathlib import Path from pathlib import Path
import pytest import pytest
from httpx import Response from httpx import Response
from reader import Entry
client: TestClient = TestClient(app) client: TestClient = TestClient(app)
webhook_name: str = "Hello, I am a webhook!" webhook_name: str = "Hello, I am a webhook!"
@ -212,6 +217,45 @@ def test_remove_feed() -> None:
assert feed_url not in response.text, f"Feed found in /: {response.text}" assert feed_url not in response.text, f"Feed found in /: {response.text}"
def test_change_feed_url() -> None:
"""Test changing a feed URL from the feed page endpoint."""
new_feed_url = "https://lovinator.space/rss_test_small.xml"
# Ensure test feeds do not already exist.
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/remove", data={"feed_url": new_feed_url})
# Ensure webhook exists.
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Add the original feed.
response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Change feed URL.
response = client.post(
url="/change_feed_url",
data={"old_feed_url": feed_url, "new_feed_url": new_feed_url},
)
assert response.status_code == 200, f"Failed to change feed URL: {response.text}"
# New feed should be accessible.
response = client.get(url="/feed", params={"feed_url": new_feed_url})
assert response.status_code == 200, f"New feed URL is not accessible: {response.text}"
# Old feed should no longer be accessible.
response = client.get(url="/feed", params={"feed_url": feed_url})
assert response.status_code == 404, "Old feed URL should no longer exist"
# Cleanup.
client.post(url="/remove", data={"feed_url": new_feed_url})
def test_delete_webhook() -> None: def test_delete_webhook() -> None:
"""Test the /delete_webhook page.""" """Test the /delete_webhook page."""
# Remove the feed if it already exists before we run the test. # Remove the feed if it already exists before we run the test.
@ -393,7 +437,8 @@ def test_show_more_entries_pagination_works() -> None:
# Request the second page # Request the second page
response: Response = client.get( response: Response = client.get(
url="/feed", params={"feed_url": feed_url, "starting_after": starting_after_id} url="/feed",
params={"feed_url": feed_url, "starting_after": starting_after_id},
) )
assert response.status_code == 200, f"Failed to get paginated feed: {response.text}" assert response.status_code == 200, f"Failed to get paginated feed: {response.text}"
@ -439,3 +484,49 @@ def test_show_more_entries_button_context_variable() -> None:
assert "Show more entries" not in response.text, ( assert "Show more entries" not in response.text, (
f"Button should not be visible when there are {entry_count} entries (20 or fewer)" f"Button should not be visible when there are {entry_count} entries (20 or fewer)"
) )
def test_create_html_marks_entries_from_another_feed(monkeypatch: pytest.MonkeyPatch) -> None:
"""Entries from another feed should be marked in /feed html output."""
@dataclass(slots=True)
class DummyContent:
value: str
@dataclass(slots=True)
class DummyFeed:
url: str
@dataclass(slots=True)
class DummyEntry:
feed: DummyFeed
id: str
original_feed_url: str | None = None
link: str = "https://example.com/post"
title: str = "Example title"
author: str = "Author"
summary: str = "Summary"
content: list[DummyContent] = field(default_factory=lambda: [DummyContent("Content")])
published: None = None
def __post_init__(self) -> None:
if self.original_feed_url is None:
self.original_feed_url = self.feed.url
selected_feed_url = "https://example.com/feed-a.xml"
same_feed_entry = DummyEntry(DummyFeed(selected_feed_url), "same")
# feed.url matches selected feed, but original_feed_url differs; marker should still show.
other_feed_entry = DummyEntry(
DummyFeed(selected_feed_url),
"other",
original_feed_url="https://example.com/feed-b.xml",
)
monkeypatch.setattr("discord_rss_bot.main.replace_tags_in_text_message", lambda _entry: "Rendered content")
monkeypatch.setattr("discord_rss_bot.main.entry_is_blacklisted", lambda _entry: False)
monkeypatch.setattr("discord_rss_bot.main.entry_is_whitelisted", lambda _entry: False)
html = create_html_for_feed(cast("list[Entry]", [same_feed_entry, other_feed_entry]), selected_feed_url)
assert "From another feed: https://example.com/feed-b.xml" in html
assert "From another feed: https://example.com/feed-a.xml" not in html