From 955b94456d9892f415bc07da5bae2a552d526e08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Helle=C5=9Ben?= Date: Mon, 16 Mar 2026 04:48:34 +0100 Subject: [PATCH] Add mass update functionality for feed URLs with preview --- discord_rss_bot/main.py | 383 +++++++++++++++- .../_webhook_mass_update_preview.html | 73 +++ discord_rss_bot/templates/base.html | 26 +- discord_rss_bot/templates/index.html | 9 +- .../templates/webhook_entries.html | 53 +++ tests/test_main.py | 434 ++++++++++++++++++ 6 files changed, 952 insertions(+), 26 deletions(-) create mode 100644 discord_rss_bot/templates/_webhook_mass_update_preview.html diff --git a/discord_rss_bot/main.py b/discord_rss_bot/main.py index 98c05b8..ca3894c 100644 --- a/discord_rss_bot/main.py +++ b/discord_rss_bot/main.py @@ -54,6 +54,7 @@ from discord_rss_bot.feeds import send_entry_to_discord from discord_rss_bot.feeds import send_to_discord from discord_rss_bot.git_backup import commit_state_change from discord_rss_bot.git_backup import get_backup_path +from discord_rss_bot.is_url_valid import is_url_valid from discord_rss_bot.search import create_search_context from discord_rss_bot.settings import get_reader @@ -1496,15 +1497,20 @@ def modify_webhook( # Webhooks are stored as a list of dictionaries. # Example: [{"name": "webhook_name", "url": "webhook_url"}] webhooks = cast("list[dict[str, str]]", webhooks) + old_hook_clean: str = old_hook.strip() + new_hook_clean: str = new_hook.strip() + webhook_modified: bool = False for hook in webhooks: - if hook["url"] in old_hook.strip(): - hook["url"] = new_hook.strip() + if hook["url"] in old_hook_clean: + hook["url"] = new_hook_clean # Check if it has been modified. - if hook["url"] != new_hook.strip(): + if hook["url"] != new_hook_clean: raise HTTPException(status_code=500, detail="Webhook could not be modified") + webhook_modified = True + # Add our new list of webhooks to the database. reader.set_tag((), "webhooks", webhooks) # pyright: ignore[reportArgumentType] @@ -1514,13 +1520,16 @@ def modify_webhook( for feed in feeds: webhook: str = str(reader.get_tag(feed, "webhook", "")) - if webhook == old_hook.strip(): - reader.set_tag(feed.url, "webhook", new_hook.strip()) # pyright: ignore[reportArgumentType] + if webhook == old_hook_clean: + reader.set_tag(feed.url, "webhook", new_hook_clean) # pyright: ignore[reportArgumentType] + + if webhook_modified and old_hook_clean != new_hook_clean: + commit_state_change(reader, f"Modify webhook URL from {old_hook_clean} to {new_hook_clean}") redirect_url: str = redirect_to.strip() or "/webhooks" if redirect_to: - redirect_url = redirect_url.replace(urllib.parse.quote(old_hook.strip()), urllib.parse.quote(new_hook.strip())) - redirect_url = redirect_url.replace(old_hook.strip(), new_hook.strip()) + redirect_url = redirect_url.replace(urllib.parse.quote(old_hook_clean), urllib.parse.quote(new_hook_clean)) + redirect_url = redirect_url.replace(old_hook_clean, new_hook_clean) # Redirect to the requested page. return RedirectResponse(url=redirect_url, status_code=303) @@ -1549,12 +1558,216 @@ def extract_youtube_video_id(url: str) -> str | None: return None +def resolve_final_feed_url(url: str) -> tuple[str, str | None]: + """Resolve a feed URL by following redirects. + + Args: + url: The feed URL to resolve. + + Returns: + tuple[str, str | None]: A tuple with (resolved_url, error_message). + error_message is None when resolution succeeded. + """ + clean_url: str = url.strip() + if not clean_url: + return "", "URL is empty" + + if not is_url_valid(clean_url): + return clean_url, "URL is invalid" + + try: + response: Response = httpx.get(clean_url, follow_redirects=True, timeout=10.0) + except httpx.HTTPError as e: + return clean_url, str(e) + + if not response.is_success: + return clean_url, f"HTTP {response.status_code}" + + return str(response.url), None + + +def create_webhook_feed_url_preview( + webhook_feeds: list[Feed], + replace_from: str, + replace_to: str, + resolve_urls: bool, # noqa: FBT001 + force_update: bool = False, # noqa: FBT001, FBT002 + existing_feed_urls: set[str] | None = None, +) -> list[dict[str, str | bool | None]]: + """Create preview rows for bulk feed URL replacement. + + Args: + webhook_feeds: Feeds attached to a webhook. + replace_from: Text to replace in each URL. + replace_to: Replacement text. + resolve_urls: Whether to resolve resulting URLs via HTTP redirects. + force_update: Whether conflicts should be marked as force-overwritable. + existing_feed_urls: Optional set of all tracked feed URLs used for conflict detection. + + Returns: + list[dict[str, str | bool | None]]: Rows used in the preview table. + """ + known_feed_urls: set[str] = existing_feed_urls or {feed.url for feed in webhook_feeds} + preview_rows: list[dict[str, str | bool | None]] = [] + for feed in webhook_feeds: + old_url: str = feed.url + has_match: bool = bool(replace_from and replace_from in old_url) + + candidate_url: str = old_url + if has_match: + candidate_url = old_url.replace(replace_from, replace_to) + + resolved_url: str = candidate_url + resolution_error: str | None = None + if has_match and candidate_url != old_url and resolve_urls: + resolved_url, resolution_error = resolve_final_feed_url(candidate_url) + + will_force_ignore_errors: bool = bool( + force_update and bool(resolution_error) and has_match and old_url != candidate_url, + ) + + target_exists: bool = bool( + has_match and not resolution_error and resolved_url != old_url and resolved_url in known_feed_urls, + ) + will_force_overwrite: bool = bool(target_exists and force_update) + will_change: bool = bool( + has_match + and old_url != (candidate_url if will_force_ignore_errors else resolved_url) + and (not target_exists or will_force_overwrite) + and (not resolution_error or will_force_ignore_errors), + ) + + preview_rows.append({ + "old_url": old_url, + "candidate_url": candidate_url, + "resolved_url": resolved_url, + "has_match": has_match, + "will_change": will_change, + "target_exists": target_exists, + "will_force_overwrite": will_force_overwrite, + "will_force_ignore_errors": will_force_ignore_errors, + "resolution_error": resolution_error, + }) + + return preview_rows + + +def build_webhook_mass_update_context( + webhook_feeds: list[Feed], + all_feeds: list[Feed], + replace_from: str, + replace_to: str, + resolve_urls: bool, # noqa: FBT001 + force_update: bool = False, # noqa: FBT001, FBT002 +) -> dict[str, str | bool | int | list[dict[str, str | bool | None]] | dict[str, int]]: + """Build context data used by the webhook mass URL update preview UI. + + Args: + webhook_feeds: Feeds attached to the selected webhook. + all_feeds: All tracked feeds. + replace_from: Text to replace in URLs. + replace_to: Replacement text. + resolve_urls: Whether to resolve resulting URLs. + force_update: Whether to allow overwriting existing target URLs. + + Returns: + dict[str, ...]: Context values for rendering preview controls and table. + """ + clean_replace_from: str = replace_from.strip() + clean_replace_to: str = replace_to.strip() + + preview_rows: list[dict[str, str | bool | None]] = [] + if clean_replace_from: + preview_rows = create_webhook_feed_url_preview( + webhook_feeds=webhook_feeds, + replace_from=clean_replace_from, + replace_to=clean_replace_to, + resolve_urls=resolve_urls, + force_update=force_update, + existing_feed_urls={feed.url for feed in all_feeds}, + ) + + preview_summary: dict[str, int] = { + "total": len(preview_rows), + "matched": sum(1 for row in preview_rows if row["has_match"]), + "will_update": sum(1 for row in preview_rows if row["will_change"]), + "conflicts": sum(1 for row in preview_rows if row["target_exists"] and not row["will_force_overwrite"]), + "force_overwrite": sum(1 for row in preview_rows if row["will_force_overwrite"]), + "force_ignore_errors": sum(1 for row in preview_rows if row["will_force_ignore_errors"]), + "resolve_errors": sum(1 for row in preview_rows if row["resolution_error"]), + } + preview_summary["no_match"] = preview_summary["total"] - preview_summary["matched"] + preview_summary["no_change"] = sum( + 1 for row in preview_rows if row["has_match"] and not row["resolution_error"] and not row["will_change"] + ) + + return { + "replace_from": clean_replace_from, + "replace_to": clean_replace_to, + "resolve_urls": resolve_urls, + "force_update": force_update, + "preview_rows": preview_rows, + "preview_summary": preview_summary, + "preview_change_count": preview_summary["will_update"], + } + + +@app.get("/webhook_entries_mass_update_preview", response_class=HTMLResponse) +async def get_webhook_entries_mass_update_preview( + webhook_url: str, + request: Request, + reader: Annotated[Reader, Depends(get_reader_dependency)], + replace_from: str = "", + replace_to: str = "", + resolve_urls: bool = True, # noqa: FBT001, FBT002 + force_update: bool = False, # noqa: FBT001, FBT002 +) -> HTMLResponse: + """Render the mass-update preview fragment for a webhook using HTMX. + + Args: + webhook_url: Webhook URL whose feeds are being updated. + request: The request object. + reader: The Reader instance. + replace_from: Text to find in URLs. + replace_to: Replacement text. + resolve_urls: Whether to resolve resulting URLs. + force_update: Whether to allow overwriting existing target URLs. + + Returns: + HTMLResponse: Rendered partial template containing summary + preview table. + """ + clean_webhook_url: str = urllib.parse.unquote(webhook_url.strip()) + all_feeds: list[Feed] = list(reader.get_feeds()) + webhook_feeds: list[Feed] = [ + feed for feed in all_feeds if str(reader.get_tag(feed.url, "webhook", "")) == clean_webhook_url + ] + + context = { + "request": request, + "webhook_url": clean_webhook_url, + **build_webhook_mass_update_context( + webhook_feeds=webhook_feeds, + all_feeds=all_feeds, + replace_from=replace_from, + replace_to=replace_to, + resolve_urls=resolve_urls, + force_update=force_update, + ), + } + return templates.TemplateResponse(request=request, name="_webhook_mass_update_preview.html", context=context) + + @app.get("/webhook_entries", response_class=HTMLResponse) async def get_webhook_entries( # noqa: C901, PLR0914 webhook_url: str, request: Request, reader: Annotated[Reader, Depends(get_reader_dependency)], starting_after: str = "", + replace_from: str = "", + replace_to: str = "", + resolve_urls: bool = True, # noqa: FBT001, FBT002 + force_update: bool = False, # noqa: FBT001, FBT002 + message: str = "", ) -> HTMLResponse: """Get all latest entries from all feeds for a specific webhook. @@ -1562,6 +1775,11 @@ async def get_webhook_entries( # noqa: C901, PLR0914 webhook_url: The webhook URL to get entries for. request: The request object. starting_after: The entry to start after. Used for pagination. + replace_from: Optional URL substring to find for bulk URL replacement preview. + replace_to: Optional replacement substring used in bulk URL replacement preview. + resolve_urls: Whether to resolve replaced URLs by following redirects. + force_update: Whether to allow overwriting existing target URLs during apply. + message: Optional status message shown in the UI. reader: The Reader instance. Returns: @@ -1598,9 +1816,12 @@ async def get_webhook_entries( # noqa: C901, PLR0914 # Get all entries from all feeds for this webhook, sorted by published date all_entries: list[Entry] = [entry for feed in webhook_feeds for entry in reader.get_entries(feed=feed)] - # Sort entries by published date (newest first) + # Sort entries by published date (newest first), with undated entries last. all_entries.sort( - key=lambda e: e.published or datetime.now(tz=UTC), + key=lambda e: ( + e.published is not None, + e.published or datetime.min.replace(tzinfo=UTC), + ), reverse=True, ) @@ -1635,6 +1856,15 @@ async def get_webhook_entries( # noqa: C901, PLR0914 # Create the html for the entries html: str = create_html_for_feed(reader=reader, entries=paginated_entries) + mass_update_context = build_webhook_mass_update_context( + webhook_feeds=webhook_feeds, + all_feeds=all_feeds, + replace_from=replace_from, + replace_to=replace_to, + resolve_urls=resolve_urls, + force_update=force_update, + ) + # Check if there are more entries available total_entries: int = len(all_entries) is_show_more_entries_button_visible: bool = (start_index + entries_per_page) < total_entries @@ -1651,10 +1881,145 @@ async def get_webhook_entries( # noqa: C901, PLR0914 "is_show_more_entries_button_visible": is_show_more_entries_button_visible, "total_entries": total_entries, "feeds_count": len(webhook_feeds), + "message": urllib.parse.unquote(message) if message else "", + **mass_update_context, } return templates.TemplateResponse(request=request, name="webhook_entries.html", context=context) +@app.post("/bulk_change_feed_urls", response_class=HTMLResponse) +async def post_bulk_change_feed_urls( # noqa: C901, PLR0914, PLR0912, PLR0915 + webhook_url: Annotated[str, Form()], + replace_from: Annotated[str, Form()], + reader: Annotated[Reader, Depends(get_reader_dependency)], + replace_to: Annotated[str, Form()] = "", + resolve_urls: Annotated[bool, Form()] = True, # noqa: FBT002 + force_update: Annotated[bool, Form()] = False, # noqa: FBT002 +) -> RedirectResponse: + """Bulk-change feed URLs attached to a webhook. + + Args: + webhook_url: The webhook URL whose feeds should be updated. + replace_from: Text to find in each URL. + replace_to: Text to replace with. + resolve_urls: Whether to resolve resulting URLs via redirects. + force_update: Whether existing target feed URLs should be overwritten. + reader: The Reader instance. + + Returns: + RedirectResponse: Redirect to webhook detail with status message. + + Raises: + HTTPException: If webhook is missing or replace_from is empty. + """ + clean_webhook_url: str = urllib.parse.unquote(webhook_url.strip()) + clean_replace_from: str = replace_from.strip() + clean_replace_to: str = replace_to.strip() + + if not clean_replace_from: + raise HTTPException(status_code=400, detail="replace_from cannot be empty") + + webhooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(reader.get_tag((), "webhooks", []))) + if not any(hook["url"] == clean_webhook_url for hook in webhooks): + raise HTTPException(status_code=404, detail=f"Webhook not found: {clean_webhook_url}") + + all_feeds: list[Feed] = list(reader.get_feeds()) + webhook_feeds: list[Feed] = [] + for feed in all_feeds: + feed_webhook: str = str(reader.get_tag(feed.url, "webhook", "")) + if feed_webhook == clean_webhook_url: + webhook_feeds.append(feed) + + preview_rows: list[dict[str, str | bool | None]] = create_webhook_feed_url_preview( + webhook_feeds=webhook_feeds, + replace_from=clean_replace_from, + replace_to=clean_replace_to, + resolve_urls=resolve_urls, + force_update=force_update, + existing_feed_urls={feed.url for feed in all_feeds}, + ) + + changed_count: int = 0 + skipped_count: int = 0 + failed_count: int = 0 + conflict_count: int = 0 + force_overwrite_count: int = 0 + + for row in preview_rows: + if not row["has_match"]: + continue + + if row["resolution_error"] and not force_update: + skipped_count += 1 + continue + + if row["target_exists"] and not force_update: + conflict_count += 1 + skipped_count += 1 + continue + + old_url: str = str(row["old_url"]) + new_url: str = str(row["candidate_url"] if row["will_force_ignore_errors"] else row["resolved_url"]) + + if old_url == new_url: + skipped_count += 1 + continue + + if row["target_exists"] and force_update: + try: + reader.delete_feed(new_url) + force_overwrite_count += 1 + except FeedNotFoundError: + pass + except ReaderError: + failed_count += 1 + continue + + try: + reader.change_feed_url(old_url, new_url) + except FeedExistsError: + skipped_count += 1 + continue + except FeedNotFoundError: + skipped_count += 1 + continue + except ReaderError: + failed_count += 1 + continue + + try: + reader.update_feed(new_url) + except Exception: + logger.exception("Failed to update feed after URL change: %s", new_url) + + for entry in reader.get_entries(feed=new_url, read=False): + try: + reader.set_entry_read(entry, True) + except Exception: + logger.exception("Failed to mark entry as read after URL change: %s", entry.id) + + changed_count += 1 + + if changed_count > 0: + commit_state_change( + reader, + f"Bulk change {changed_count} feed URL(s) for webhook {clean_webhook_url}", + ) + + status_message: str = ( + f"Updated {changed_count} feed URL(s). " + f"Force overwrote {force_overwrite_count}. " + f"Conflicts {conflict_count}. " + f"Skipped {skipped_count}. " + f"Failed {failed_count}." + ) + redirect_url: str = ( + f"/webhook_entries?webhook_url={urllib.parse.quote(clean_webhook_url)}" + f"&message={urllib.parse.quote(status_message)}" + ) + return RedirectResponse(url=redirect_url, status_code=303) + + if __name__ == "__main__": sentry_sdk.init( dsn="https://6e77a0d7acb9c7ea22e85a375e0ff1f4@o4505228040339456.ingest.us.sentry.io/4508792887967744", diff --git a/discord_rss_bot/templates/_webhook_mass_update_preview.html b/discord_rss_bot/templates/_webhook_mass_update_preview.html new file mode 100644 index 0000000..a59e97b --- /dev/null +++ b/discord_rss_bot/templates/_webhook_mass_update_preview.html @@ -0,0 +1,73 @@ +{% if preview_rows %} +

+ {{ preview_change_count }} feed URL{{ 's' if preview_change_count != 1 else '' }} ready to update. +

+
+ Total: {{ preview_summary.total }} + Matched: {{ preview_summary.matched }} + Will update: {{ preview_summary.will_update }} + Conflicts: {{ preview_summary.conflicts }} + Force overwrite: {{ preview_summary.force_overwrite }} + Force ignore errors: {{ preview_summary.force_ignore_errors }} + Resolve errors: {{ preview_summary.resolve_errors }} + No change: {{ preview_summary.no_change }} + No match: {{ preview_summary.no_match }} +
+
+ + + + + + +
+
+ + + + + + + + + + {% for row in preview_rows %} + + + + + + {% endfor %} + +
Old URLNew URLStatus
+ {{ row.old_url }} + + {{ row.resolved_url if resolve_urls else row.candidate_url }} + + {% if not row.has_match %} + No match + {% elif row.will_force_ignore_errors %} + Will force update (ignore resolve error) + {% elif row.resolution_error %} + {{ row.resolution_error }} + {% elif row.will_force_overwrite %} + Will force overwrite + {% elif row.target_exists %} + Conflict: target URL exists + {% elif row.will_change %} + Will update + {% else %} + No change + {% endif %} +
+
+{% elif replace_from %} +

No preview rows found for that replacement pattern.

+{% endif %} diff --git a/discord_rss_bot/templates/base.html b/discord_rss_bot/templates/base.html index a8640dd..9146b35 100644 --- a/discord_rss_bot/templates/base.html +++ b/discord_rss_bot/templates/base.html @@ -1,13 +1,12 @@ - + content="Stay updated with the latest news and events with our easy-to-use RSS bot. Never miss a message or announcement again with real-time notifications directly to your Discord server." /> + content="discord, rss, bot, notifications, announcements, updates, real-time, server, messages, news, events, feed." /> @@ -18,19 +17,20 @@ {% block head %} {% endblock head %} - {% include "nav.html" %}
{% if messages %} - + {% endif %} - {% block content %} {% endblock content %}
@@ -41,18 +41,20 @@
+ - diff --git a/discord_rss_bot/templates/index.html b/discord_rss_bot/templates/index.html index d7db60c..341ec38 100644 --- a/discord_rss_bot/templates/index.html +++ b/discord_rss_bot/templates/index.html @@ -32,11 +32,10 @@ {% for hook_from_context in webhooks %}
diff --git a/discord_rss_bot/templates/webhook_entries.html b/discord_rss_bot/templates/webhook_entries.html index 9798b1a..f0bc970 100644 --- a/discord_rss_bot/templates/webhook_entries.html +++ b/discord_rss_bot/templates/webhook_entries.html @@ -3,6 +3,7 @@ | {{ webhook_name }} {% endblock title %} {% block content %} + {% if message %}{% endif %}
@@ -61,6 +62,57 @@ Delete Webhook +
+

Mass update feed URLs

+

Replace part of feed URLs for all feeds attached to this webhook.

+
+ +
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ +
+
+
{% include "_webhook_mass_update_preview.html" %}
@@ -77,6 +129,7 @@ {{ feed.url }} {% endif %} + {% if feed.title %}- {{ feed.url }}{% endif %} {% if not feed.updates_enabled %}Disabled{% endif %} {% if feed.last_exception %}({{ feed.last_exception.value_str }}){% endif %} diff --git a/tests/test_main.py b/tests/test_main.py index f5167c7..f6396eb 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -4,6 +4,8 @@ import re import urllib.parse from dataclasses import dataclass from dataclasses import field +from datetime import UTC +from datetime import datetime from typing import TYPE_CHECKING from typing import cast from unittest.mock import MagicMock @@ -1039,6 +1041,75 @@ def test_webhook_entries_multiple_feeds() -> None: client.post(url="/remove", data={"feed_url": feed_url}) +def test_webhook_entries_sort_newest_and_non_null_published_first() -> None: + """Webhook entries should be sorted newest-first with published=None entries placed last.""" + + @dataclass(slots=True) + class DummyFeed: + url: str + title: str | None = None + updates_enabled: bool = True + last_exception: None = None + + @dataclass(slots=True) + class DummyEntry: + id: str + feed: DummyFeed + published: datetime | None + + dummy_feed = DummyFeed(url="https://example.com/feed.xml", title="Example Feed") + + # Intentionally unsorted input with two dated entries and two undated entries. + unsorted_entries: list[Entry] = [ + cast("Entry", DummyEntry(id="old", feed=dummy_feed, published=datetime(2024, 1, 1, tzinfo=UTC))), + cast("Entry", DummyEntry(id="none-1", feed=dummy_feed, published=None)), + cast("Entry", DummyEntry(id="new", feed=dummy_feed, published=datetime(2024, 2, 1, tzinfo=UTC))), + cast("Entry", DummyEntry(id="none-2", feed=dummy_feed, published=None)), + ] + + class StubReader: + def get_tag(self, resource: object, key: str, default: object = None) -> object: + if resource == () and key == "webhooks": + return [{"name": webhook_name, "url": webhook_url}] + if key == "webhook" and isinstance(resource, str): + return webhook_url + return default + + def get_feeds(self) -> list[DummyFeed]: + return [dummy_feed] + + def get_entries(self, **_kwargs: object) -> list[Entry]: + return unsorted_entries + + observed_order: list[str] = [] + + def capture_entries(*, reader: object, entries: list[Entry], current_feed_url: str = "") -> str: + del reader, current_feed_url + observed_order.extend(entry.id for entry in entries) + return "" + + app.dependency_overrides[get_reader_dependency] = StubReader + try: + with ( + patch( + "discord_rss_bot.main.get_data_from_hook_url", + return_value=main_module.WebhookInfo(custom_name=webhook_name, url=webhook_url), + ), + patch("discord_rss_bot.main.create_html_for_feed", side_effect=capture_entries), + ): + response: Response = client.get( + url="/webhook_entries", + params={"webhook_url": webhook_url}, + ) + + assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}" + assert observed_order == ["new", "old", "none-1", "none-2"], ( + "Expected newest published entries first and published=None entries last" + ) + finally: + app.dependency_overrides = {} + + def test_webhook_entries_pagination() -> None: """Test webhook_entries endpoint pagination functionality.""" # Clean up and create webhook @@ -1138,6 +1209,37 @@ def test_modify_webhook_redirects_back_to_webhook_detail() -> None: client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url}) client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url}) + +def test_modify_webhook_triggers_git_backup_commit() -> None: + """Modifying a webhook URL should record a state change for git backup.""" + original_webhook_url = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz" + new_webhook_url = "https://discord.com/api/webhooks/1234567890/updated-token" + + client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url}) + client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url}) + + response: Response = client.post( + url="/add_webhook", + data={"webhook_name": webhook_name, "webhook_url": original_webhook_url}, + ) + assert response.status_code == 200, f"Failed to add webhook: {response.text}" + + no_redirect_client = TestClient(app, follow_redirects=False) + with patch("discord_rss_bot.main.commit_state_change") as mock_commit_state_change: + response = no_redirect_client.post( + url="/modify_webhook", + data={ + "old_hook": original_webhook_url, + "new_hook": new_webhook_url, + "redirect_to": f"/webhook_entries?webhook_url={urllib.parse.quote(original_webhook_url)}", + }, + ) + + assert response.status_code == 303, f"Expected 303 redirect, got {response.status_code}: {response.text}" + assert mock_commit_state_change.call_count == 1, "Expected webhook modification to trigger git backup commit" + + client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url}) + response = client.post( url="/add_webhook", data={"webhook_name": webhook_name, "webhook_url": original_webhook_url}, @@ -1162,6 +1264,338 @@ def test_modify_webhook_redirects_back_to_webhook_detail() -> None: client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url}) +def test_webhook_entries_mass_update_preview_shows_old_and_new_urls() -> None: + """Preview should list old->new feed URLs for webhook bulk replacement.""" + + @dataclass(slots=True) + class DummyFeed: + url: str + title: str | None = None + updates_enabled: bool = True + last_exception: None = None + + class StubReader: + def __init__(self) -> None: + self._feeds: list[DummyFeed] = [ + DummyFeed(url="https://old.example.com/rss/a.xml", title="A"), + DummyFeed(url="https://old.example.com/rss/b.xml", title="B"), + DummyFeed(url="https://unchanged.example.com/rss/c.xml", title="C"), + ] + + def get_tag(self, resource: object, key: str, default: object = None) -> object: + if resource == () and key == "webhooks": + return [{"name": webhook_name, "url": webhook_url}] + if key == "webhook" and isinstance(resource, str): + if resource.startswith("https://old.example.com"): + return webhook_url + if resource.startswith("https://unchanged.example.com"): + return webhook_url + return default + + def get_feeds(self) -> list[DummyFeed]: + return self._feeds + + def get_entries(self, **_kwargs: object) -> list[Entry]: + return [] + + app.dependency_overrides[get_reader_dependency] = StubReader + try: + with ( + patch( + "discord_rss_bot.main.get_data_from_hook_url", + return_value=main_module.WebhookInfo(custom_name=webhook_name, url=webhook_url), + ), + patch( + "discord_rss_bot.main.resolve_final_feed_url", + side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None), + ), + ): + response: Response = client.get( + url="/webhook_entries", + params={ + "webhook_url": webhook_url, + "replace_from": "old.example.com", + "replace_to": "new.example.com", + "resolve_urls": "true", + }, + ) + + assert response.status_code == 200, f"Failed to get preview: {response.text}" + assert "Mass update feed URLs" in response.text + assert "old.example.com/rss/a.xml" in response.text + assert "new.example.com/rss/a.xml" in response.text + assert "Will update" in response.text + assert "Matched: 2" in response.text + assert "Will update: 2" in response.text + finally: + app.dependency_overrides = {} + + +def test_bulk_change_feed_urls_updates_matching_feeds() -> None: + """Mass updater should change all matching feed URLs for a webhook.""" + + @dataclass(slots=True) + class DummyFeed: + url: str + + class StubReader: + def __init__(self) -> None: + self._feeds = [ + DummyFeed(url="https://old.example.com/rss/a.xml"), + DummyFeed(url="https://old.example.com/rss/b.xml"), + DummyFeed(url="https://unchanged.example.com/rss/c.xml"), + ] + self.change_calls: list[tuple[str, str]] = [] + self.updated_feeds: list[str] = [] + + def get_tag(self, resource: object, key: str, default: object = None) -> object: + if resource == () and key == "webhooks": + return [{"name": webhook_name, "url": webhook_url}] + if key == "webhook" and isinstance(resource, str): + return webhook_url + return default + + def get_feeds(self) -> list[DummyFeed]: + return self._feeds + + def change_feed_url(self, old_url: str, new_url: str) -> None: + self.change_calls.append((old_url, new_url)) + + def update_feed(self, feed_url: str) -> None: + self.updated_feeds.append(feed_url) + + def get_entries(self, **_kwargs: object) -> list[Entry]: + return [] + + def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001 + return + + stub_reader = StubReader() + app.dependency_overrides[get_reader_dependency] = lambda: stub_reader + no_redirect_client = TestClient(app, follow_redirects=False) + + try: + with patch( + "discord_rss_bot.main.resolve_final_feed_url", + side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None), + ): + response: Response = no_redirect_client.post( + url="/bulk_change_feed_urls", + data={ + "webhook_url": webhook_url, + "replace_from": "old.example.com", + "replace_to": "new.example.com", + "resolve_urls": "true", + }, + ) + + assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}" + assert "Updated%202%20feed%20URL%28s%29" in response.headers.get("location", "") + assert sorted(stub_reader.change_calls) == sorted([ + ("https://old.example.com/rss/a.xml", "https://new.example.com/rss/a.xml"), + ("https://old.example.com/rss/b.xml", "https://new.example.com/rss/b.xml"), + ]) + assert sorted(stub_reader.updated_feeds) == sorted([ + "https://new.example.com/rss/a.xml", + "https://new.example.com/rss/b.xml", + ]) + finally: + app.dependency_overrides = {} + + +def test_webhook_entries_mass_update_preview_fragment_endpoint() -> None: + """HTMX preview endpoint should render only the mass-update preview fragment.""" + + @dataclass(slots=True) + class DummyFeed: + url: str + title: str | None = None + updates_enabled: bool = True + last_exception: None = None + + class StubReader: + def __init__(self) -> None: + self._feeds: list[DummyFeed] = [ + DummyFeed(url="https://old.example.com/rss/a.xml", title="A"), + DummyFeed(url="https://old.example.com/rss/b.xml", title="B"), + ] + + def get_tag(self, resource: object, key: str, default: object = None) -> object: + if key == "webhook" and isinstance(resource, str): + return webhook_url + return default + + def get_feeds(self) -> list[DummyFeed]: + return self._feeds + + app.dependency_overrides[get_reader_dependency] = StubReader + try: + with patch( + "discord_rss_bot.main.resolve_final_feed_url", + side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None), + ): + response: Response = client.get( + url="/webhook_entries_mass_update_preview", + params={ + "webhook_url": webhook_url, + "replace_from": "old.example.com", + "replace_to": "new.example.com", + "resolve_urls": "true", + }, + ) + + assert response.status_code == 200, f"Failed to get HTMX preview fragment: {response.text}" + assert "Will update: 2" in response.text + assert " None: # noqa: C901 + """Force update should overwrite conflicting target URLs instead of skipping them.""" + + @dataclass(slots=True) + class DummyFeed: + url: str + + class StubReader: + def __init__(self) -> None: + self._feeds = [ + DummyFeed(url="https://old.example.com/rss/a.xml"), + DummyFeed(url="https://new.example.com/rss/a.xml"), + ] + self.delete_calls: list[str] = [] + self.change_calls: list[tuple[str, str]] = [] + + def get_tag(self, resource: object, key: str, default: object = None) -> object: + if resource == () and key == "webhooks": + return [{"name": webhook_name, "url": webhook_url}] + if key == "webhook" and isinstance(resource, str): + return webhook_url + return default + + def get_feeds(self) -> list[DummyFeed]: + return self._feeds + + def delete_feed(self, feed_url: str) -> None: + self.delete_calls.append(feed_url) + + def change_feed_url(self, old_url: str, new_url: str) -> None: + self.change_calls.append((old_url, new_url)) + + def update_feed(self, _feed_url: str) -> None: + return + + def get_entries(self, **_kwargs: object) -> list[Entry]: + return [] + + def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001 + return + + stub_reader = StubReader() + app.dependency_overrides[get_reader_dependency] = lambda: stub_reader + no_redirect_client = TestClient(app, follow_redirects=False) + + try: + with patch( + "discord_rss_bot.main.resolve_final_feed_url", + side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None), + ): + response: Response = no_redirect_client.post( + url="/bulk_change_feed_urls", + data={ + "webhook_url": webhook_url, + "replace_from": "old.example.com", + "replace_to": "new.example.com", + "resolve_urls": "true", + "force_update": "true", + }, + ) + + assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}" + assert stub_reader.delete_calls == ["https://new.example.com/rss/a.xml"] + assert stub_reader.change_calls == [ + ( + "https://old.example.com/rss/a.xml", + "https://new.example.com/rss/a.xml", + ), + ] + assert "Force%20overwrote%201" in response.headers.get("location", "") + finally: + app.dependency_overrides = {} + + +def test_bulk_change_feed_urls_force_update_ignores_resolution_error() -> None: + """Force update should proceed even when URL resolution returns an error (e.g. HTTP 404).""" + + @dataclass(slots=True) + class DummyFeed: + url: str + + class StubReader: + def __init__(self) -> None: + self._feeds = [ + DummyFeed(url="https://old.example.com/rss/a.xml"), + ] + self.change_calls: list[tuple[str, str]] = [] + + def get_tag(self, resource: object, key: str, default: object = None) -> object: + if resource == () and key == "webhooks": + return [{"name": webhook_name, "url": webhook_url}] + if key == "webhook" and isinstance(resource, str): + return webhook_url + return default + + def get_feeds(self) -> list[DummyFeed]: + return self._feeds + + def change_feed_url(self, old_url: str, new_url: str) -> None: + self.change_calls.append((old_url, new_url)) + + def update_feed(self, _feed_url: str) -> None: + return + + def get_entries(self, **_kwargs: object) -> list[Entry]: + return [] + + def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001 + return + + stub_reader = StubReader() + app.dependency_overrides[get_reader_dependency] = lambda: stub_reader + no_redirect_client = TestClient(app, follow_redirects=False) + + try: + with patch( + "discord_rss_bot.main.resolve_final_feed_url", + return_value=("https://new.example.com/rss/a.xml", "HTTP 404"), + ): + response: Response = no_redirect_client.post( + url="/bulk_change_feed_urls", + data={ + "webhook_url": webhook_url, + "replace_from": "old.example.com", + "replace_to": "new.example.com", + "resolve_urls": "true", + "force_update": "true", + }, + ) + + assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}" + assert stub_reader.change_calls == [ + ( + "https://old.example.com/rss/a.xml", + "https://new.example.com/rss/a.xml", + ), + ] + location = response.headers.get("location", "") + assert "Updated%201%20feed%20URL%28s%29" in location + assert "Failed%200" in location + finally: + app.dependency_overrides = {} + + def test_reader_dependency_override_is_used() -> None: """Reader should be injectable and overridable via FastAPI dependency overrides."""