From 5215b806438d16fd3420d2fe0840158b483e3013 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Helle=C5=9Ben?= Date: Sat, 7 Mar 2026 19:00:11 +0100 Subject: [PATCH] Add endpoint to see all the entries for a webhook --- discord_rss_bot/main.py | 117 ++++++- discord_rss_bot/templates/index.html | 289 +++++++++--------- .../templates/webhook_entries.html | 38 +++ discord_rss_bot/templates/webhooks.html | 106 ++++--- tests/test_main.py | 202 ++++++++++++ 5 files changed, 557 insertions(+), 195 deletions(-) create mode 100644 discord_rss_bot/templates/webhook_entries.html diff --git a/discord_rss_bot/main.py b/discord_rss_bot/main.py index e4e8975..7c5e7ac 100644 --- a/discord_rss_bot/main.py +++ b/discord_rss_bot/main.py @@ -948,7 +948,7 @@ async def get_feed(feed_url: str, request: Request, starting_after: str = ""): return templates.TemplateResponse(request=request, name="feed.html", context=context) -def create_html_for_feed(entries: Iterable[Entry], current_feed_url: str = "") -> str: # noqa: PLR0914 +def create_html_for_feed(entries: Iterable[Entry], current_feed_url: str = "") -> str: # noqa: C901, PLR0914 """Create HTML for the search results. Args: @@ -988,6 +988,16 @@ def create_html_for_feed(entries: Iterable[Entry], current_feed_url: str = "") - if current_feed_url and source_feed_url != current_feed_url: from_another_feed = f"From another feed: {source_feed_url}" + # Add feed link when viewing from webhook_entries or aggregated views + feed_link: str = "" + if not current_feed_url or source_feed_url != current_feed_url: + encoded_feed_url: str = urllib.parse.quote(source_feed_url) + feed_title: str = entry.feed.title if hasattr(entry.feed, "title") and entry.feed.title else source_feed_url + feed_link = ( + f"{feed_title}
" + ) + entry_id: str = urllib.parse.quote(entry.id) to_discord_html: str = f"Send to Discord" @@ -1015,7 +1025,7 @@ def create_html_for_feed(entries: Iterable[Entry], current_feed_url: str = "") - html += f"""
{blacklisted}{whitelisted}{from_another_feed}

{entry.title}

-{f"By {entry.author} @" if entry.author else ""}{published} - {to_discord_html} +{feed_link}{f"By {entry.author} @" if entry.author else ""}{published} - {to_discord_html} {text} {video_embed_html} @@ -1397,6 +1407,109 @@ def extract_youtube_video_id(url: str) -> str | None: return None +@app.get("/webhook_entries", response_class=HTMLResponse) +async def get_webhook_entries( # noqa: C901, PLR0912, PLR0914 + webhook_url: str, + request: Request, + starting_after: str = "", +) -> HTMLResponse: + """Get all latest entries from all feeds for a specific webhook. + + Args: + webhook_url: The webhook URL to get entries for. + request: The request object. + starting_after: The entry to start after. Used for pagination. + + Returns: + HTMLResponse: The webhook entries page. + + Raises: + HTTPException: If no feeds are found for this webhook or webhook doesn't exist. + """ + entries_per_page: int = 20 + clean_webhook_url: str = urllib.parse.unquote(webhook_url.strip()) + + # Get the webhook name from the webhooks list + webhooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(reader.get_tag((), "webhooks", []))) + webhook_name: str = "" + for hook in webhooks: + if hook["url"] == clean_webhook_url: + webhook_name = hook["name"] + break + + if not webhook_name: + raise HTTPException(status_code=404, detail=f"Webhook not found: {clean_webhook_url}") + + # Get all feeds associated with this webhook + all_feeds: list[Feed] = list(reader.get_feeds()) + webhook_feeds: list[Feed] = [] + + for feed in all_feeds: + try: + feed_webhook: str = str(reader.get_tag(feed.url, "webhook", "")) + if feed_webhook == clean_webhook_url: + webhook_feeds.append(feed) + except TagNotFoundError: + continue + + # Get all entries from all feeds for this webhook, sorted by published date + all_entries: list[Entry] = [entry for feed in webhook_feeds for entry in reader.get_entries(feed=feed)] + + # Sort entries by published date (newest first) + all_entries.sort( + key=lambda e: e.published or datetime.now(tz=UTC), + reverse=True, + ) + + # Handle pagination + if starting_after: + try: + start_after_entry: Entry | None = reader.get_entry(( + starting_after.split("|", maxsplit=1)[0], + starting_after.split("|")[1], + )) + except (FeedNotFoundError, EntryNotFoundError): + start_after_entry = None + else: + start_after_entry = None + + # Find the index of the starting entry + start_index: int = 0 + if start_after_entry: + for idx, entry in enumerate(all_entries): + if entry.id == start_after_entry.id and entry.feed.url == start_after_entry.feed.url: + start_index = idx + 1 + break + + # Get the page of entries + paginated_entries: list[Entry] = all_entries[start_index : start_index + entries_per_page] + + # Get the last entry for pagination + last_entry: Entry | None = None + if paginated_entries: + last_entry = paginated_entries[-1] + + # Create the html for the entries + html: str = create_html_for_feed(paginated_entries) + + # Check if there are more entries available + total_entries: int = len(all_entries) + is_show_more_entries_button_visible: bool = (start_index + entries_per_page) < total_entries + + context = { + "request": request, + "webhook_name": webhook_name, + "webhook_url": clean_webhook_url, + "entries": paginated_entries, + "html": html, + "last_entry": last_entry, + "is_show_more_entries_button_visible": is_show_more_entries_button_visible, + "total_entries": total_entries, + "feeds_count": len(webhook_feeds), + } + return templates.TemplateResponse(request=request, name="webhook_entries.html", context=context) + + if __name__ == "__main__": sentry_sdk.init( dsn="https://6e77a0d7acb9c7ea22e85a375e0ff1f4@o4505228040339456.ingest.us.sentry.io/4508792887967744", diff --git a/discord_rss_bot/templates/index.html b/discord_rss_bot/templates/index.html index f9dfc0d..06b8578 100644 --- a/discord_rss_bot/templates/index.html +++ b/discord_rss_bot/templates/index.html @@ -1,154 +1,155 @@ {% extends "base.html" %} {% block content %} - - {% endblock content %} diff --git a/discord_rss_bot/templates/webhook_entries.html b/discord_rss_bot/templates/webhook_entries.html new file mode 100644 index 0000000..eb12487 --- /dev/null +++ b/discord_rss_bot/templates/webhook_entries.html @@ -0,0 +1,38 @@ +{% extends "base.html" %} +{% block title %} + | {{ webhook_name }} - Latest Entries +{% endblock title %} +{% block content %} +
+ +

{{ webhook_name }} - Latest Entries ({{ total_entries }} total from {{ feeds_count }} feeds)

+ +
+

+ {{ webhook_url }} +

+
+
+ {# Rendered HTML content #} + {% if entries %} +
{{ html|safe }}
+ {% if is_show_more_entries_button_visible and last_entry %} + + Show more entries + + {% endif %} + {% elif feeds_count == 0 %} +
+

+ No feeds found for {{ webhook_name }}. Add feeds to this webhook to see entries here. +

+
+ {% else %} +
+

+ No entries found for {{ webhook_name }}. Update feeds to fetch new entries. +

+
+ {% endif %} +{% endblock content %} diff --git a/discord_rss_bot/templates/webhooks.html b/discord_rss_bot/templates/webhooks.html index 3f0934f..d37e390 100644 --- a/discord_rss_bot/templates/webhooks.html +++ b/discord_rss_bot/templates/webhooks.html @@ -1,55 +1,63 @@ {% extends "base.html" %} {% block title %} -| Webhooks + | Webhooks {% endblock title %} {% block content %} -
- {% for hook in hooks_with_data %} -
-
-

{{ hook.custom_name }}

- -
-
- -
- - - +
+ {% for hook in hooks_with_data %} +
+
+

{{ hook.custom_name }}

+ +
+ + +
+ + +
+
+ +
+ +
+
+ +
+ + +
+
-
- -
- + {% endfor %} +
+ You can append ?thread_id=THREAD_ID to the URL to send messages to a thread. +
+
+
-
-
- - -
-
-
- {% endfor %} -
- You can append ?thread_id=THREAD_ID to the URL to send messages to a thread. -
-
- -
-{% endblock content %} + {% endblock content %} diff --git a/tests/test_main.py b/tests/test_main.py index a53e9a4..dc3ecf5 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -581,3 +581,205 @@ def test_create_html_marks_entries_from_another_feed(monkeypatch: pytest.MonkeyP assert "From another feed: https://example.com/feed-b.xml" in html assert "From another feed: https://example.com/feed-a.xml" not in html + + +def test_webhook_entries_webhook_not_found() -> None: + """Test webhook_entries endpoint returns 404 when webhook doesn't exist.""" + nonexistent_webhook_url = "https://discord.com/api/webhooks/999999/nonexistent" + + response: Response = client.get( + url="/webhook_entries", + params={"webhook_url": nonexistent_webhook_url}, + ) + + assert response.status_code == 404, f"Expected 404 for non-existent webhook, got: {response.status_code}" + assert "Webhook not found" in response.text + + +def test_webhook_entries_no_feeds() -> None: + """Test webhook_entries endpoint displays message when webhook has no feeds.""" + # Clean up any existing feeds first + client.post(url="/remove", data={"feed_url": feed_url}) + + # Clean up and create a webhook + client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) + response: Response = client.post( + url="/add_webhook", + data={"webhook_name": webhook_name, "webhook_url": webhook_url}, + ) + assert response.status_code == 200, f"Failed to add webhook: {response.text}" + + # Get webhook_entries without adding any feeds + response = client.get( + url="/webhook_entries", + params={"webhook_url": webhook_url}, + ) + + assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}" + assert webhook_name in response.text, "Webhook name not found in response" + assert "No feeds found" in response.text or "Add feeds" in response.text, "Expected message about no feeds" + + +def test_webhook_entries_with_feeds_no_entries() -> None: + """Test webhook_entries endpoint when webhook has feeds but no entries yet.""" + # Clean up and create fresh webhook + client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) + response: Response = client.post( + url="/add_webhook", + data={"webhook_name": webhook_name, "webhook_url": webhook_url}, + ) + assert response.status_code == 200, f"Failed to add webhook: {response.text}" + + # Use a feed URL that exists but has no entries (or clean feed) + empty_feed_url = "https://lovinator.space/empty_feed.xml" + client.post(url="/remove", data={"feed_url": empty_feed_url}) + + # Add the feed + response = client.post( + url="/add", + data={"feed_url": empty_feed_url, "webhook_dropdown": webhook_name}, + ) + + # Get webhook_entries + response = client.get( + url="/webhook_entries", + params={"webhook_url": webhook_url}, + ) + + assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}" + assert webhook_name in response.text, "Webhook name not found in response" + + # Clean up + client.post(url="/remove", data={"feed_url": empty_feed_url}) + + +def test_webhook_entries_with_entries() -> None: + """Test webhook_entries endpoint displays entries correctly.""" + # Clean up and create webhook + client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) + response: Response = client.post( + url="/add_webhook", + data={"webhook_name": webhook_name, "webhook_url": webhook_url}, + ) + assert response.status_code == 200, f"Failed to add webhook: {response.text}" + + # Remove and add the feed + client.post(url="/remove", data={"feed_url": feed_url}) + response = client.post( + url="/add", + data={"feed_url": feed_url, "webhook_dropdown": webhook_name}, + ) + assert response.status_code == 200, f"Failed to add feed: {response.text}" + + # Get webhook_entries + response = client.get( + url="/webhook_entries", + params={"webhook_url": webhook_url}, + ) + + assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}" + assert webhook_name in response.text, "Webhook name not found in response" + # Should show entries (the feed has entries) + assert "total from" in response.text, "Expected to see entry count" + + +def test_webhook_entries_multiple_feeds() -> None: + """Test webhook_entries endpoint shows feed count correctly.""" + # Clean up and create webhook + client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) + response: Response = client.post( + url="/add_webhook", + data={"webhook_name": webhook_name, "webhook_url": webhook_url}, + ) + assert response.status_code == 200, f"Failed to add webhook: {response.text}" + + # Remove and add feed + client.post(url="/remove", data={"feed_url": feed_url}) + response = client.post( + url="/add", + data={"feed_url": feed_url, "webhook_dropdown": webhook_name}, + ) + assert response.status_code == 200, f"Failed to add feed: {response.text}" + + # Get webhook_entries + response = client.get( + url="/webhook_entries", + params={"webhook_url": webhook_url}, + ) + + assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}" + assert webhook_name in response.text, "Webhook name not found in response" + # Should show entries and feed count + assert "feed" in response.text.lower(), "Expected to see feed information" + + # Clean up + client.post(url="/remove", data={"feed_url": feed_url}) + + +def test_webhook_entries_pagination() -> None: + """Test webhook_entries endpoint pagination functionality.""" + # Clean up and create webhook + client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) + response: Response = client.post( + url="/add_webhook", + data={"webhook_name": webhook_name, "webhook_url": webhook_url}, + ) + assert response.status_code == 200, f"Failed to add webhook: {response.text}" + + # Remove and add the feed + client.post(url="/remove", data={"feed_url": feed_url}) + response = client.post( + url="/add", + data={"feed_url": feed_url, "webhook_dropdown": webhook_name}, + ) + assert response.status_code == 200, f"Failed to add feed: {response.text}" + + # Get first page of webhook_entries + response = client.get( + url="/webhook_entries", + params={"webhook_url": webhook_url}, + ) + + assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}" + + # Check if pagination button is shown when there are many entries + # The button should be visible if total_entries > 20 (entries_per_page) + if "Load More Entries" in response.text: + # Extract the starting_after parameter from the pagination form + # This is a simple check that pagination elements exist + assert 'name="starting_after"' in response.text, "Expected pagination form with starting_after parameter" + + # Clean up + client.post(url="/remove", data={"feed_url": feed_url}) + + +def test_webhook_entries_url_encoding() -> None: + """Test webhook_entries endpoint handles URL encoding correctly.""" + # Clean up and create webhook + client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) + response: Response = client.post( + url="/add_webhook", + data={"webhook_name": webhook_name, "webhook_url": webhook_url}, + ) + assert response.status_code == 200, f"Failed to add webhook: {response.text}" + + # Remove and add the feed + client.post(url="/remove", data={"feed_url": feed_url}) + response = client.post( + url="/add", + data={"feed_url": feed_url, "webhook_dropdown": webhook_name}, + ) + assert response.status_code == 200, f"Failed to add feed: {response.text}" + + # Get webhook_entries with URL-encoded webhook URL + encoded_webhook_url = urllib.parse.quote(webhook_url) + response = client.get( + url="/webhook_entries", + params={"webhook_url": encoded_webhook_url}, + ) + + assert response.status_code == 200, f"Failed to get /webhook_entries with encoded URL: {response.text}" + assert webhook_name in response.text, "Webhook name not found in response" + + # Clean up + client.post(url="/remove", data={"feed_url": feed_url})