Add mass update functionality for feed URLs with preview
All checks were successful
Test and build Docker image / docker (push) Successful in 24s

This commit is contained in:
Joakim Hellsén 2026-03-16 04:48:34 +01:00
commit 955b94456d
Signed by: Joakim Hellsén
SSH key fingerprint: SHA256:/9h/CsExpFp+PRhsfA0xznFx2CGfTT5R/kpuFfUgEQk
6 changed files with 952 additions and 26 deletions

View file

@ -54,6 +54,7 @@ from discord_rss_bot.feeds import send_entry_to_discord
from discord_rss_bot.feeds import send_to_discord from discord_rss_bot.feeds import send_to_discord
from discord_rss_bot.git_backup import commit_state_change from discord_rss_bot.git_backup import commit_state_change
from discord_rss_bot.git_backup import get_backup_path from discord_rss_bot.git_backup import get_backup_path
from discord_rss_bot.is_url_valid import is_url_valid
from discord_rss_bot.search import create_search_context from discord_rss_bot.search import create_search_context
from discord_rss_bot.settings import get_reader from discord_rss_bot.settings import get_reader
@ -1496,15 +1497,20 @@ def modify_webhook(
# Webhooks are stored as a list of dictionaries. # Webhooks are stored as a list of dictionaries.
# Example: [{"name": "webhook_name", "url": "webhook_url"}] # Example: [{"name": "webhook_name", "url": "webhook_url"}]
webhooks = cast("list[dict[str, str]]", webhooks) webhooks = cast("list[dict[str, str]]", webhooks)
old_hook_clean: str = old_hook.strip()
new_hook_clean: str = new_hook.strip()
webhook_modified: bool = False
for hook in webhooks: for hook in webhooks:
if hook["url"] in old_hook.strip(): if hook["url"] in old_hook_clean:
hook["url"] = new_hook.strip() hook["url"] = new_hook_clean
# Check if it has been modified. # Check if it has been modified.
if hook["url"] != new_hook.strip(): if hook["url"] != new_hook_clean:
raise HTTPException(status_code=500, detail="Webhook could not be modified") raise HTTPException(status_code=500, detail="Webhook could not be modified")
webhook_modified = True
# Add our new list of webhooks to the database. # Add our new list of webhooks to the database.
reader.set_tag((), "webhooks", webhooks) # pyright: ignore[reportArgumentType] reader.set_tag((), "webhooks", webhooks) # pyright: ignore[reportArgumentType]
@ -1514,13 +1520,16 @@ def modify_webhook(
for feed in feeds: for feed in feeds:
webhook: str = str(reader.get_tag(feed, "webhook", "")) webhook: str = str(reader.get_tag(feed, "webhook", ""))
if webhook == old_hook.strip(): if webhook == old_hook_clean:
reader.set_tag(feed.url, "webhook", new_hook.strip()) # pyright: ignore[reportArgumentType] reader.set_tag(feed.url, "webhook", new_hook_clean) # pyright: ignore[reportArgumentType]
if webhook_modified and old_hook_clean != new_hook_clean:
commit_state_change(reader, f"Modify webhook URL from {old_hook_clean} to {new_hook_clean}")
redirect_url: str = redirect_to.strip() or "/webhooks" redirect_url: str = redirect_to.strip() or "/webhooks"
if redirect_to: if redirect_to:
redirect_url = redirect_url.replace(urllib.parse.quote(old_hook.strip()), urllib.parse.quote(new_hook.strip())) redirect_url = redirect_url.replace(urllib.parse.quote(old_hook_clean), urllib.parse.quote(new_hook_clean))
redirect_url = redirect_url.replace(old_hook.strip(), new_hook.strip()) redirect_url = redirect_url.replace(old_hook_clean, new_hook_clean)
# Redirect to the requested page. # Redirect to the requested page.
return RedirectResponse(url=redirect_url, status_code=303) return RedirectResponse(url=redirect_url, status_code=303)
@ -1549,12 +1558,216 @@ def extract_youtube_video_id(url: str) -> str | None:
return None return None
def resolve_final_feed_url(url: str) -> tuple[str, str | None]:
"""Resolve a feed URL by following redirects.
Args:
url: The feed URL to resolve.
Returns:
tuple[str, str | None]: A tuple with (resolved_url, error_message).
error_message is None when resolution succeeded.
"""
clean_url: str = url.strip()
if not clean_url:
return "", "URL is empty"
if not is_url_valid(clean_url):
return clean_url, "URL is invalid"
try:
response: Response = httpx.get(clean_url, follow_redirects=True, timeout=10.0)
except httpx.HTTPError as e:
return clean_url, str(e)
if not response.is_success:
return clean_url, f"HTTP {response.status_code}"
return str(response.url), None
def create_webhook_feed_url_preview(
webhook_feeds: list[Feed],
replace_from: str,
replace_to: str,
resolve_urls: bool, # noqa: FBT001
force_update: bool = False, # noqa: FBT001, FBT002
existing_feed_urls: set[str] | None = None,
) -> list[dict[str, str | bool | None]]:
"""Create preview rows for bulk feed URL replacement.
Args:
webhook_feeds: Feeds attached to a webhook.
replace_from: Text to replace in each URL.
replace_to: Replacement text.
resolve_urls: Whether to resolve resulting URLs via HTTP redirects.
force_update: Whether conflicts should be marked as force-overwritable.
existing_feed_urls: Optional set of all tracked feed URLs used for conflict detection.
Returns:
list[dict[str, str | bool | None]]: Rows used in the preview table.
"""
known_feed_urls: set[str] = existing_feed_urls or {feed.url for feed in webhook_feeds}
preview_rows: list[dict[str, str | bool | None]] = []
for feed in webhook_feeds:
old_url: str = feed.url
has_match: bool = bool(replace_from and replace_from in old_url)
candidate_url: str = old_url
if has_match:
candidate_url = old_url.replace(replace_from, replace_to)
resolved_url: str = candidate_url
resolution_error: str | None = None
if has_match and candidate_url != old_url and resolve_urls:
resolved_url, resolution_error = resolve_final_feed_url(candidate_url)
will_force_ignore_errors: bool = bool(
force_update and bool(resolution_error) and has_match and old_url != candidate_url,
)
target_exists: bool = bool(
has_match and not resolution_error and resolved_url != old_url and resolved_url in known_feed_urls,
)
will_force_overwrite: bool = bool(target_exists and force_update)
will_change: bool = bool(
has_match
and old_url != (candidate_url if will_force_ignore_errors else resolved_url)
and (not target_exists or will_force_overwrite)
and (not resolution_error or will_force_ignore_errors),
)
preview_rows.append({
"old_url": old_url,
"candidate_url": candidate_url,
"resolved_url": resolved_url,
"has_match": has_match,
"will_change": will_change,
"target_exists": target_exists,
"will_force_overwrite": will_force_overwrite,
"will_force_ignore_errors": will_force_ignore_errors,
"resolution_error": resolution_error,
})
return preview_rows
def build_webhook_mass_update_context(
webhook_feeds: list[Feed],
all_feeds: list[Feed],
replace_from: str,
replace_to: str,
resolve_urls: bool, # noqa: FBT001
force_update: bool = False, # noqa: FBT001, FBT002
) -> dict[str, str | bool | int | list[dict[str, str | bool | None]] | dict[str, int]]:
"""Build context data used by the webhook mass URL update preview UI.
Args:
webhook_feeds: Feeds attached to the selected webhook.
all_feeds: All tracked feeds.
replace_from: Text to replace in URLs.
replace_to: Replacement text.
resolve_urls: Whether to resolve resulting URLs.
force_update: Whether to allow overwriting existing target URLs.
Returns:
dict[str, ...]: Context values for rendering preview controls and table.
"""
clean_replace_from: str = replace_from.strip()
clean_replace_to: str = replace_to.strip()
preview_rows: list[dict[str, str | bool | None]] = []
if clean_replace_from:
preview_rows = create_webhook_feed_url_preview(
webhook_feeds=webhook_feeds,
replace_from=clean_replace_from,
replace_to=clean_replace_to,
resolve_urls=resolve_urls,
force_update=force_update,
existing_feed_urls={feed.url for feed in all_feeds},
)
preview_summary: dict[str, int] = {
"total": len(preview_rows),
"matched": sum(1 for row in preview_rows if row["has_match"]),
"will_update": sum(1 for row in preview_rows if row["will_change"]),
"conflicts": sum(1 for row in preview_rows if row["target_exists"] and not row["will_force_overwrite"]),
"force_overwrite": sum(1 for row in preview_rows if row["will_force_overwrite"]),
"force_ignore_errors": sum(1 for row in preview_rows if row["will_force_ignore_errors"]),
"resolve_errors": sum(1 for row in preview_rows if row["resolution_error"]),
}
preview_summary["no_match"] = preview_summary["total"] - preview_summary["matched"]
preview_summary["no_change"] = sum(
1 for row in preview_rows if row["has_match"] and not row["resolution_error"] and not row["will_change"]
)
return {
"replace_from": clean_replace_from,
"replace_to": clean_replace_to,
"resolve_urls": resolve_urls,
"force_update": force_update,
"preview_rows": preview_rows,
"preview_summary": preview_summary,
"preview_change_count": preview_summary["will_update"],
}
@app.get("/webhook_entries_mass_update_preview", response_class=HTMLResponse)
async def get_webhook_entries_mass_update_preview(
webhook_url: str,
request: Request,
reader: Annotated[Reader, Depends(get_reader_dependency)],
replace_from: str = "",
replace_to: str = "",
resolve_urls: bool = True, # noqa: FBT001, FBT002
force_update: bool = False, # noqa: FBT001, FBT002
) -> HTMLResponse:
"""Render the mass-update preview fragment for a webhook using HTMX.
Args:
webhook_url: Webhook URL whose feeds are being updated.
request: The request object.
reader: The Reader instance.
replace_from: Text to find in URLs.
replace_to: Replacement text.
resolve_urls: Whether to resolve resulting URLs.
force_update: Whether to allow overwriting existing target URLs.
Returns:
HTMLResponse: Rendered partial template containing summary + preview table.
"""
clean_webhook_url: str = urllib.parse.unquote(webhook_url.strip())
all_feeds: list[Feed] = list(reader.get_feeds())
webhook_feeds: list[Feed] = [
feed for feed in all_feeds if str(reader.get_tag(feed.url, "webhook", "")) == clean_webhook_url
]
context = {
"request": request,
"webhook_url": clean_webhook_url,
**build_webhook_mass_update_context(
webhook_feeds=webhook_feeds,
all_feeds=all_feeds,
replace_from=replace_from,
replace_to=replace_to,
resolve_urls=resolve_urls,
force_update=force_update,
),
}
return templates.TemplateResponse(request=request, name="_webhook_mass_update_preview.html", context=context)
@app.get("/webhook_entries", response_class=HTMLResponse) @app.get("/webhook_entries", response_class=HTMLResponse)
async def get_webhook_entries( # noqa: C901, PLR0914 async def get_webhook_entries( # noqa: C901, PLR0914
webhook_url: str, webhook_url: str,
request: Request, request: Request,
reader: Annotated[Reader, Depends(get_reader_dependency)], reader: Annotated[Reader, Depends(get_reader_dependency)],
starting_after: str = "", starting_after: str = "",
replace_from: str = "",
replace_to: str = "",
resolve_urls: bool = True, # noqa: FBT001, FBT002
force_update: bool = False, # noqa: FBT001, FBT002
message: str = "",
) -> HTMLResponse: ) -> HTMLResponse:
"""Get all latest entries from all feeds for a specific webhook. """Get all latest entries from all feeds for a specific webhook.
@ -1562,6 +1775,11 @@ async def get_webhook_entries( # noqa: C901, PLR0914
webhook_url: The webhook URL to get entries for. webhook_url: The webhook URL to get entries for.
request: The request object. request: The request object.
starting_after: The entry to start after. Used for pagination. starting_after: The entry to start after. Used for pagination.
replace_from: Optional URL substring to find for bulk URL replacement preview.
replace_to: Optional replacement substring used in bulk URL replacement preview.
resolve_urls: Whether to resolve replaced URLs by following redirects.
force_update: Whether to allow overwriting existing target URLs during apply.
message: Optional status message shown in the UI.
reader: The Reader instance. reader: The Reader instance.
Returns: Returns:
@ -1598,9 +1816,12 @@ async def get_webhook_entries( # noqa: C901, PLR0914
# Get all entries from all feeds for this webhook, sorted by published date # Get all entries from all feeds for this webhook, sorted by published date
all_entries: list[Entry] = [entry for feed in webhook_feeds for entry in reader.get_entries(feed=feed)] all_entries: list[Entry] = [entry for feed in webhook_feeds for entry in reader.get_entries(feed=feed)]
# Sort entries by published date (newest first) # Sort entries by published date (newest first), with undated entries last.
all_entries.sort( all_entries.sort(
key=lambda e: e.published or datetime.now(tz=UTC), key=lambda e: (
e.published is not None,
e.published or datetime.min.replace(tzinfo=UTC),
),
reverse=True, reverse=True,
) )
@ -1635,6 +1856,15 @@ async def get_webhook_entries( # noqa: C901, PLR0914
# Create the html for the entries # Create the html for the entries
html: str = create_html_for_feed(reader=reader, entries=paginated_entries) html: str = create_html_for_feed(reader=reader, entries=paginated_entries)
mass_update_context = build_webhook_mass_update_context(
webhook_feeds=webhook_feeds,
all_feeds=all_feeds,
replace_from=replace_from,
replace_to=replace_to,
resolve_urls=resolve_urls,
force_update=force_update,
)
# Check if there are more entries available # Check if there are more entries available
total_entries: int = len(all_entries) total_entries: int = len(all_entries)
is_show_more_entries_button_visible: bool = (start_index + entries_per_page) < total_entries is_show_more_entries_button_visible: bool = (start_index + entries_per_page) < total_entries
@ -1651,10 +1881,145 @@ async def get_webhook_entries( # noqa: C901, PLR0914
"is_show_more_entries_button_visible": is_show_more_entries_button_visible, "is_show_more_entries_button_visible": is_show_more_entries_button_visible,
"total_entries": total_entries, "total_entries": total_entries,
"feeds_count": len(webhook_feeds), "feeds_count": len(webhook_feeds),
"message": urllib.parse.unquote(message) if message else "",
**mass_update_context,
} }
return templates.TemplateResponse(request=request, name="webhook_entries.html", context=context) return templates.TemplateResponse(request=request, name="webhook_entries.html", context=context)
@app.post("/bulk_change_feed_urls", response_class=HTMLResponse)
async def post_bulk_change_feed_urls( # noqa: C901, PLR0914, PLR0912, PLR0915
webhook_url: Annotated[str, Form()],
replace_from: Annotated[str, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
replace_to: Annotated[str, Form()] = "",
resolve_urls: Annotated[bool, Form()] = True, # noqa: FBT002
force_update: Annotated[bool, Form()] = False, # noqa: FBT002
) -> RedirectResponse:
"""Bulk-change feed URLs attached to a webhook.
Args:
webhook_url: The webhook URL whose feeds should be updated.
replace_from: Text to find in each URL.
replace_to: Text to replace with.
resolve_urls: Whether to resolve resulting URLs via redirects.
force_update: Whether existing target feed URLs should be overwritten.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to webhook detail with status message.
Raises:
HTTPException: If webhook is missing or replace_from is empty.
"""
clean_webhook_url: str = urllib.parse.unquote(webhook_url.strip())
clean_replace_from: str = replace_from.strip()
clean_replace_to: str = replace_to.strip()
if not clean_replace_from:
raise HTTPException(status_code=400, detail="replace_from cannot be empty")
webhooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(reader.get_tag((), "webhooks", [])))
if not any(hook["url"] == clean_webhook_url for hook in webhooks):
raise HTTPException(status_code=404, detail=f"Webhook not found: {clean_webhook_url}")
all_feeds: list[Feed] = list(reader.get_feeds())
webhook_feeds: list[Feed] = []
for feed in all_feeds:
feed_webhook: str = str(reader.get_tag(feed.url, "webhook", ""))
if feed_webhook == clean_webhook_url:
webhook_feeds.append(feed)
preview_rows: list[dict[str, str | bool | None]] = create_webhook_feed_url_preview(
webhook_feeds=webhook_feeds,
replace_from=clean_replace_from,
replace_to=clean_replace_to,
resolve_urls=resolve_urls,
force_update=force_update,
existing_feed_urls={feed.url for feed in all_feeds},
)
changed_count: int = 0
skipped_count: int = 0
failed_count: int = 0
conflict_count: int = 0
force_overwrite_count: int = 0
for row in preview_rows:
if not row["has_match"]:
continue
if row["resolution_error"] and not force_update:
skipped_count += 1
continue
if row["target_exists"] and not force_update:
conflict_count += 1
skipped_count += 1
continue
old_url: str = str(row["old_url"])
new_url: str = str(row["candidate_url"] if row["will_force_ignore_errors"] else row["resolved_url"])
if old_url == new_url:
skipped_count += 1
continue
if row["target_exists"] and force_update:
try:
reader.delete_feed(new_url)
force_overwrite_count += 1
except FeedNotFoundError:
pass
except ReaderError:
failed_count += 1
continue
try:
reader.change_feed_url(old_url, new_url)
except FeedExistsError:
skipped_count += 1
continue
except FeedNotFoundError:
skipped_count += 1
continue
except ReaderError:
failed_count += 1
continue
try:
reader.update_feed(new_url)
except Exception:
logger.exception("Failed to update feed after URL change: %s", new_url)
for entry in reader.get_entries(feed=new_url, read=False):
try:
reader.set_entry_read(entry, True)
except Exception:
logger.exception("Failed to mark entry as read after URL change: %s", entry.id)
changed_count += 1
if changed_count > 0:
commit_state_change(
reader,
f"Bulk change {changed_count} feed URL(s) for webhook {clean_webhook_url}",
)
status_message: str = (
f"Updated {changed_count} feed URL(s). "
f"Force overwrote {force_overwrite_count}. "
f"Conflicts {conflict_count}. "
f"Skipped {skipped_count}. "
f"Failed {failed_count}."
)
redirect_url: str = (
f"/webhook_entries?webhook_url={urllib.parse.quote(clean_webhook_url)}"
f"&message={urllib.parse.quote(status_message)}"
)
return RedirectResponse(url=redirect_url, status_code=303)
if __name__ == "__main__": if __name__ == "__main__":
sentry_sdk.init( sentry_sdk.init(
dsn="https://6e77a0d7acb9c7ea22e85a375e0ff1f4@o4505228040339456.ingest.us.sentry.io/4508792887967744", dsn="https://6e77a0d7acb9c7ea22e85a375e0ff1f4@o4505228040339456.ingest.us.sentry.io/4508792887967744",

View file

@ -0,0 +1,73 @@
{% if preview_rows %}
<p class="small text-muted mb-1">
{{ preview_change_count }} feed URL{{ 's' if preview_change_count != 1 else '' }} ready to update.
</p>
<div class="small text-muted mb-2 d-flex flex-wrap gap-2">
<span class="badge bg-secondary">Total: {{ preview_summary.total }}</span>
<span class="badge bg-info text-dark">Matched: {{ preview_summary.matched }}</span>
<span class="badge bg-success">Will update: {{ preview_summary.will_update }}</span>
<span class="badge bg-warning text-dark">Conflicts: {{ preview_summary.conflicts }}</span>
<span class="badge bg-warning">Force overwrite: {{ preview_summary.force_overwrite }}</span>
<span class="badge bg-warning text-dark">Force ignore errors: {{ preview_summary.force_ignore_errors }}</span>
<span class="badge bg-danger">Resolve errors: {{ preview_summary.resolve_errors }}</span>
<span class="badge bg-secondary">No change: {{ preview_summary.no_change }}</span>
<span class="badge bg-secondary">No match: {{ preview_summary.no_match }}</span>
</div>
<form action="/bulk_change_feed_urls" method="post" class="mb-2">
<input type="hidden" name="webhook_url" value="{{ webhook_url }}" />
<input type="hidden" name="replace_from" value="{{ replace_from }}" />
<input type="hidden" name="replace_to" value="{{ replace_to }}" />
<input type="hidden"
name="resolve_urls"
value="{{ 'true' if resolve_urls else 'false' }}" />
<input type="hidden"
name="force_update"
value="{{ 'true' if force_update else 'false' }}" />
<button type="submit"
class="btn btn-warning w-100"
{% if preview_change_count == 0 %}disabled{% endif %}
onclick="return confirm('Apply these feed URL updates?');">Apply mass update</button>
</form>
<div class="table-responsive mt-2">
<table class="table table-sm table-dark table-striped align-middle mb-0">
<thead>
<tr>
<th scope="col">Old URL</th>
<th scope="col">New URL</th>
<th scope="col">Status</th>
</tr>
</thead>
<tbody>
{% for row in preview_rows %}
<tr>
<td>
<code>{{ row.old_url }}</code>
</td>
<td>
<code>{{ row.resolved_url if resolve_urls else row.candidate_url }}</code>
</td>
<td>
{% if not row.has_match %}
<span class="badge bg-secondary">No match</span>
{% elif row.will_force_ignore_errors %}
<span class="badge bg-warning text-dark">Will force update (ignore resolve error)</span>
{% elif row.resolution_error %}
<span class="badge bg-danger">{{ row.resolution_error }}</span>
{% elif row.will_force_overwrite %}
<span class="badge bg-warning">Will force overwrite</span>
{% elif row.target_exists %}
<span class="badge bg-warning text-dark">Conflict: target URL exists</span>
{% elif row.will_change %}
<span class="badge bg-success">Will update</span>
{% else %}
<span class="badge bg-secondary">No change</span>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% elif replace_from %}
<p class="small text-muted mb-0">No preview rows found for that replacement pattern.</p>
{% endif %}

View file

@ -1,6 +1,5 @@
<!DOCTYPE html> <!DOCTYPE html>
<html lang="en"> <html lang="en">
<head> <head>
<meta charset="UTF-8" /> <meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" /> <meta name="viewport" content="width=device-width, initial-scale=1" />
@ -18,7 +17,6 @@
{% block head %} {% block head %}
{% endblock head %} {% endblock head %}
</head> </head>
<body class="text-white-50"> <body class="text-white-50">
{% include "nav.html" %} {% include "nav.html" %}
<div class="p-2 mb-2"> <div class="p-2 mb-2">
@ -27,10 +25,12 @@
{% if messages %} {% if messages %}
<div class="alert alert-warning alert-dismissible fade show" role="alert"> <div class="alert alert-warning alert-dismissible fade show" role="alert">
<pre>{{ messages }}</pre> <pre>{{ messages }}</pre>
<button type="button" class="btn-close" data-bs-dismiss="alert" aria-label="Close"></button> <button type="button"
class="btn-close"
data-bs-dismiss="alert"
aria-label="Close"></button>
</div> </div>
{% endif %} {% endif %}
{% block content %} {% block content %}
{% endblock content %} {% endblock content %}
<footer class="d-flex flex-wrap justify-content-between align-items-center py-3 my-4 border-top"> <footer class="d-flex flex-wrap justify-content-between align-items-center py-3 my-4 border-top">
@ -52,7 +52,9 @@
</div> </div>
</div> </div>
</div> </div>
<script src="https://cdn.jsdelivr.net/npm/htmx.org@2.0.8/dist/htmx.min.js"
integrity="sha384-/TgkGk7p307TH7EXJDuUlgG3Ce1UVolAOFopFekQkkXihi5u/6OCvVKyz1W+idaz"
crossorigin="anonymous"></script>
<script src="/static/bootstrap.min.js" defer></script> <script src="/static/bootstrap.min.js" defer></script>
</body> </body>
</html> </html>

View file

@ -32,11 +32,10 @@
{% for hook_from_context in webhooks %} {% for hook_from_context in webhooks %}
<div class="p-2 mb-3 border border-dark"> <div class="p-2 mb-3 border border-dark">
<div class="d-flex justify-content-between align-items-center mb-3"> <div class="d-flex justify-content-between align-items-center mb-3">
<h2 class="h5 mb-0"> <h2 class="h5 mb-0">{{ hook_from_context.name }}</h2>
<a class="text-muted" <a class="text-muted fs-6 btn btn-outline-light btn-sm ms-auto me-2"
href="/webhook_entries?webhook_url={{ hook_from_context.url|encode_url }}">{{ hook_from_context.name }}</a> href="/webhook_entries?webhook_url={{ hook_from_context.url|encode_url }}">Settings</a>
</h2> <a class="text-muted fs-6 btn btn-outline-light btn-sm"
<a class="text-muted"
href="/webhook_entries?webhook_url={{ hook_from_context.url|encode_url }}">View Latest Entries</a> href="/webhook_entries?webhook_url={{ hook_from_context.url|encode_url }}">View Latest Entries</a>
</div> </div>
<!-- Group feeds by domain within each webhook --> <!-- Group feeds by domain within each webhook -->

View file

@ -3,6 +3,7 @@
| {{ webhook_name }} | {{ webhook_name }}
{% endblock title %} {% endblock title %}
{% block content %} {% block content %}
{% if message %}<div class="alert alert-info" role="alert">{{ message }}</div>{% endif %}
<div class="card mb-3 border border-dark p-3 text-light"> <div class="card mb-3 border border-dark p-3 text-light">
<div class="d-flex flex-column flex-md-row justify-content-between gap-3"> <div class="d-flex flex-column flex-md-row justify-content-between gap-3">
<div> <div>
@ -61,6 +62,57 @@
Delete Webhook Delete Webhook
</button> </button>
</form> </form>
<hr class="border-secondary my-3" />
<h3 class="h6">Mass update feed URLs</h3>
<p class="text-muted small mb-2">Replace part of feed URLs for all feeds attached to this webhook.</p>
<form action="/webhook_entries"
method="get"
class="row g-2 mb-2"
hx-get="/webhook_entries_mass_update_preview"
hx-target="#mass-update-preview"
hx-swap="innerHTML">
<input type="hidden" name="webhook_url" value="{{ webhook_url|encode_url }}" />
<div class="col-12">
<label for="replace_from" class="form-label small">Replace this</label>
<input type="text"
name="replace_from"
id="replace_from"
class="form-control border text-muted bg-dark"
value="{{ replace_from }}"
placeholder="https://old-domain.example" />
</div>
<div class="col-12">
<label for="replace_to" class="form-label small">With this</label>
<input type="text"
name="replace_to"
id="replace_to"
class="form-control border text-muted bg-dark"
value="{{ replace_to }}"
placeholder="https://new-domain.example" />
</div>
<div class="col-12 form-check ms-1">
<input class="form-check-input"
type="checkbox"
value="true"
id="resolve_urls"
name="resolve_urls"
{% if resolve_urls %}checked{% endif %} />
<label class="form-check-label small" for="resolve_urls">Resolve final URL with redirects (uses httpx)</label>
</div>
<div class="col-12 form-check ms-1">
<input class="form-check-input"
type="checkbox"
value="true"
id="force_update"
name="force_update"
{% if force_update %}checked{% endif %} />
<label class="form-check-label small" for="force_update">Force update (overwrite conflicting target feed URLs)</label>
</div>
<div class="col-12">
<button type="submit" class="btn btn-outline-warning w-100">Preview changes</button>
</div>
</form>
<div id="mass-update-preview">{% include "_webhook_mass_update_preview.html" %}</div>
</div> </div>
</div> </div>
<div class="col-lg-7"> <div class="col-lg-7">
@ -77,6 +129,7 @@
{{ feed.url }} {{ feed.url }}
{% endif %} {% endif %}
</a> </a>
{% if feed.title %}<span class="text-muted">- {{ feed.url }}</span>{% endif %}
{% if not feed.updates_enabled %}<span class="text-warning">Disabled</span>{% endif %} {% if not feed.updates_enabled %}<span class="text-warning">Disabled</span>{% endif %}
{% if feed.last_exception %}<span class="text-danger">({{ feed.last_exception.value_str }})</span>{% endif %} {% if feed.last_exception %}<span class="text-danger">({{ feed.last_exception.value_str }})</span>{% endif %}
</li> </li>

View file

@ -4,6 +4,8 @@ import re
import urllib.parse import urllib.parse
from dataclasses import dataclass from dataclasses import dataclass
from dataclasses import field from dataclasses import field
from datetime import UTC
from datetime import datetime
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import cast from typing import cast
from unittest.mock import MagicMock from unittest.mock import MagicMock
@ -1039,6 +1041,75 @@ def test_webhook_entries_multiple_feeds() -> None:
client.post(url="/remove", data={"feed_url": feed_url}) client.post(url="/remove", data={"feed_url": feed_url})
def test_webhook_entries_sort_newest_and_non_null_published_first() -> None:
"""Webhook entries should be sorted newest-first with published=None entries placed last."""
@dataclass(slots=True)
class DummyFeed:
url: str
title: str | None = None
updates_enabled: bool = True
last_exception: None = None
@dataclass(slots=True)
class DummyEntry:
id: str
feed: DummyFeed
published: datetime | None
dummy_feed = DummyFeed(url="https://example.com/feed.xml", title="Example Feed")
# Intentionally unsorted input with two dated entries and two undated entries.
unsorted_entries: list[Entry] = [
cast("Entry", DummyEntry(id="old", feed=dummy_feed, published=datetime(2024, 1, 1, tzinfo=UTC))),
cast("Entry", DummyEntry(id="none-1", feed=dummy_feed, published=None)),
cast("Entry", DummyEntry(id="new", feed=dummy_feed, published=datetime(2024, 2, 1, tzinfo=UTC))),
cast("Entry", DummyEntry(id="none-2", feed=dummy_feed, published=None)),
]
class StubReader:
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return [dummy_feed]
def get_entries(self, **_kwargs: object) -> list[Entry]:
return unsorted_entries
observed_order: list[str] = []
def capture_entries(*, reader: object, entries: list[Entry], current_feed_url: str = "") -> str:
del reader, current_feed_url
observed_order.extend(entry.id for entry in entries)
return ""
app.dependency_overrides[get_reader_dependency] = StubReader
try:
with (
patch(
"discord_rss_bot.main.get_data_from_hook_url",
return_value=main_module.WebhookInfo(custom_name=webhook_name, url=webhook_url),
),
patch("discord_rss_bot.main.create_html_for_feed", side_effect=capture_entries),
):
response: Response = client.get(
url="/webhook_entries",
params={"webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
assert observed_order == ["new", "old", "none-1", "none-2"], (
"Expected newest published entries first and published=None entries last"
)
finally:
app.dependency_overrides = {}
def test_webhook_entries_pagination() -> None: def test_webhook_entries_pagination() -> None:
"""Test webhook_entries endpoint pagination functionality.""" """Test webhook_entries endpoint pagination functionality."""
# Clean up and create webhook # Clean up and create webhook
@ -1138,6 +1209,37 @@ def test_modify_webhook_redirects_back_to_webhook_detail() -> None:
client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url}) client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url})
client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url}) client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url})
def test_modify_webhook_triggers_git_backup_commit() -> None:
"""Modifying a webhook URL should record a state change for git backup."""
original_webhook_url = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz"
new_webhook_url = "https://discord.com/api/webhooks/1234567890/updated-token"
client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url})
client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": original_webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
no_redirect_client = TestClient(app, follow_redirects=False)
with patch("discord_rss_bot.main.commit_state_change") as mock_commit_state_change:
response = no_redirect_client.post(
url="/modify_webhook",
data={
"old_hook": original_webhook_url,
"new_hook": new_webhook_url,
"redirect_to": f"/webhook_entries?webhook_url={urllib.parse.quote(original_webhook_url)}",
},
)
assert response.status_code == 303, f"Expected 303 redirect, got {response.status_code}: {response.text}"
assert mock_commit_state_change.call_count == 1, "Expected webhook modification to trigger git backup commit"
client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url})
response = client.post( response = client.post(
url="/add_webhook", url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": original_webhook_url}, data={"webhook_name": webhook_name, "webhook_url": original_webhook_url},
@ -1162,6 +1264,338 @@ def test_modify_webhook_redirects_back_to_webhook_detail() -> None:
client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url}) client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url})
def test_webhook_entries_mass_update_preview_shows_old_and_new_urls() -> None:
"""Preview should list old->new feed URLs for webhook bulk replacement."""
@dataclass(slots=True)
class DummyFeed:
url: str
title: str | None = None
updates_enabled: bool = True
last_exception: None = None
class StubReader:
def __init__(self) -> None:
self._feeds: list[DummyFeed] = [
DummyFeed(url="https://old.example.com/rss/a.xml", title="A"),
DummyFeed(url="https://old.example.com/rss/b.xml", title="B"),
DummyFeed(url="https://unchanged.example.com/rss/c.xml", title="C"),
]
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
if resource.startswith("https://old.example.com"):
return webhook_url
if resource.startswith("https://unchanged.example.com"):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return self._feeds
def get_entries(self, **_kwargs: object) -> list[Entry]:
return []
app.dependency_overrides[get_reader_dependency] = StubReader
try:
with (
patch(
"discord_rss_bot.main.get_data_from_hook_url",
return_value=main_module.WebhookInfo(custom_name=webhook_name, url=webhook_url),
),
patch(
"discord_rss_bot.main.resolve_final_feed_url",
side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
),
):
response: Response = client.get(
url="/webhook_entries",
params={
"webhook_url": webhook_url,
"replace_from": "old.example.com",
"replace_to": "new.example.com",
"resolve_urls": "true",
},
)
assert response.status_code == 200, f"Failed to get preview: {response.text}"
assert "Mass update feed URLs" in response.text
assert "old.example.com/rss/a.xml" in response.text
assert "new.example.com/rss/a.xml" in response.text
assert "Will update" in response.text
assert "Matched: 2" in response.text
assert "Will update: 2" in response.text
finally:
app.dependency_overrides = {}
def test_bulk_change_feed_urls_updates_matching_feeds() -> None:
"""Mass updater should change all matching feed URLs for a webhook."""
@dataclass(slots=True)
class DummyFeed:
url: str
class StubReader:
def __init__(self) -> None:
self._feeds = [
DummyFeed(url="https://old.example.com/rss/a.xml"),
DummyFeed(url="https://old.example.com/rss/b.xml"),
DummyFeed(url="https://unchanged.example.com/rss/c.xml"),
]
self.change_calls: list[tuple[str, str]] = []
self.updated_feeds: list[str] = []
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return self._feeds
def change_feed_url(self, old_url: str, new_url: str) -> None:
self.change_calls.append((old_url, new_url))
def update_feed(self, feed_url: str) -> None:
self.updated_feeds.append(feed_url)
def get_entries(self, **_kwargs: object) -> list[Entry]:
return []
def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001
return
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
no_redirect_client = TestClient(app, follow_redirects=False)
try:
with patch(
"discord_rss_bot.main.resolve_final_feed_url",
side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
):
response: Response = no_redirect_client.post(
url="/bulk_change_feed_urls",
data={
"webhook_url": webhook_url,
"replace_from": "old.example.com",
"replace_to": "new.example.com",
"resolve_urls": "true",
},
)
assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}"
assert "Updated%202%20feed%20URL%28s%29" in response.headers.get("location", "")
assert sorted(stub_reader.change_calls) == sorted([
("https://old.example.com/rss/a.xml", "https://new.example.com/rss/a.xml"),
("https://old.example.com/rss/b.xml", "https://new.example.com/rss/b.xml"),
])
assert sorted(stub_reader.updated_feeds) == sorted([
"https://new.example.com/rss/a.xml",
"https://new.example.com/rss/b.xml",
])
finally:
app.dependency_overrides = {}
def test_webhook_entries_mass_update_preview_fragment_endpoint() -> None:
"""HTMX preview endpoint should render only the mass-update preview fragment."""
@dataclass(slots=True)
class DummyFeed:
url: str
title: str | None = None
updates_enabled: bool = True
last_exception: None = None
class StubReader:
def __init__(self) -> None:
self._feeds: list[DummyFeed] = [
DummyFeed(url="https://old.example.com/rss/a.xml", title="A"),
DummyFeed(url="https://old.example.com/rss/b.xml", title="B"),
]
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if key == "webhook" and isinstance(resource, str):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return self._feeds
app.dependency_overrides[get_reader_dependency] = StubReader
try:
with patch(
"discord_rss_bot.main.resolve_final_feed_url",
side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
):
response: Response = client.get(
url="/webhook_entries_mass_update_preview",
params={
"webhook_url": webhook_url,
"replace_from": "old.example.com",
"replace_to": "new.example.com",
"resolve_urls": "true",
},
)
assert response.status_code == 200, f"Failed to get HTMX preview fragment: {response.text}"
assert "Will update: 2" in response.text
assert "<table" in response.text
assert "Mass update feed URLs" not in response.text, "Fragment should not include full page wrapper text"
finally:
app.dependency_overrides = {}
def test_bulk_change_feed_urls_force_update_overwrites_conflict() -> None: # noqa: C901
"""Force update should overwrite conflicting target URLs instead of skipping them."""
@dataclass(slots=True)
class DummyFeed:
url: str
class StubReader:
def __init__(self) -> None:
self._feeds = [
DummyFeed(url="https://old.example.com/rss/a.xml"),
DummyFeed(url="https://new.example.com/rss/a.xml"),
]
self.delete_calls: list[str] = []
self.change_calls: list[tuple[str, str]] = []
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return self._feeds
def delete_feed(self, feed_url: str) -> None:
self.delete_calls.append(feed_url)
def change_feed_url(self, old_url: str, new_url: str) -> None:
self.change_calls.append((old_url, new_url))
def update_feed(self, _feed_url: str) -> None:
return
def get_entries(self, **_kwargs: object) -> list[Entry]:
return []
def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001
return
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
no_redirect_client = TestClient(app, follow_redirects=False)
try:
with patch(
"discord_rss_bot.main.resolve_final_feed_url",
side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
):
response: Response = no_redirect_client.post(
url="/bulk_change_feed_urls",
data={
"webhook_url": webhook_url,
"replace_from": "old.example.com",
"replace_to": "new.example.com",
"resolve_urls": "true",
"force_update": "true",
},
)
assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}"
assert stub_reader.delete_calls == ["https://new.example.com/rss/a.xml"]
assert stub_reader.change_calls == [
(
"https://old.example.com/rss/a.xml",
"https://new.example.com/rss/a.xml",
),
]
assert "Force%20overwrote%201" in response.headers.get("location", "")
finally:
app.dependency_overrides = {}
def test_bulk_change_feed_urls_force_update_ignores_resolution_error() -> None:
"""Force update should proceed even when URL resolution returns an error (e.g. HTTP 404)."""
@dataclass(slots=True)
class DummyFeed:
url: str
class StubReader:
def __init__(self) -> None:
self._feeds = [
DummyFeed(url="https://old.example.com/rss/a.xml"),
]
self.change_calls: list[tuple[str, str]] = []
def get_tag(self, resource: object, key: str, default: object = None) -> object:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
return webhook_url
return default
def get_feeds(self) -> list[DummyFeed]:
return self._feeds
def change_feed_url(self, old_url: str, new_url: str) -> None:
self.change_calls.append((old_url, new_url))
def update_feed(self, _feed_url: str) -> None:
return
def get_entries(self, **_kwargs: object) -> list[Entry]:
return []
def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001
return
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
no_redirect_client = TestClient(app, follow_redirects=False)
try:
with patch(
"discord_rss_bot.main.resolve_final_feed_url",
return_value=("https://new.example.com/rss/a.xml", "HTTP 404"),
):
response: Response = no_redirect_client.post(
url="/bulk_change_feed_urls",
data={
"webhook_url": webhook_url,
"replace_from": "old.example.com",
"replace_to": "new.example.com",
"resolve_urls": "true",
"force_update": "true",
},
)
assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}"
assert stub_reader.change_calls == [
(
"https://old.example.com/rss/a.xml",
"https://new.example.com/rss/a.xml",
),
]
location = response.headers.get("location", "")
assert "Updated%201%20feed%20URL%28s%29" in location
assert "Failed%200" in location
finally:
app.dependency_overrides = {}
def test_reader_dependency_override_is_used() -> None: def test_reader_dependency_override_is_used() -> None:
"""Reader should be injectable and overridable via FastAPI dependency overrides.""" """Reader should be injectable and overridable via FastAPI dependency overrides."""