diff --git a/discord_rss_bot/main.py b/discord_rss_bot/main.py index 85354bb..961c70e 100644 --- a/discord_rss_bot/main.py +++ b/discord_rss_bot/main.py @@ -114,6 +114,13 @@ def get_reader_dependency() -> Reader: return get_reader() +def has_webhooks() -> bool: + """Return whether at least one global webhook is configured.""" + reader: Reader = get_reader() + webhooks = list(reader.get_tag((), "webhooks", [])) + return bool(webhooks) + + # Time constants for relative time formatting SECONDS_PER_MINUTE = 60 SECONDS_PER_HOUR = 3600 @@ -185,6 +192,7 @@ templates.env.filters["encode_url"] = lambda url: urllib.parse.quote(str(url)) i templates.env.filters["discord_markdown"] = markdownify # pyright: ignore[reportArgumentType] templates.env.filters["relative_time"] = relative_time templates.env.globals["get_backup_path"] = get_backup_path # pyright: ignore[reportArgumentType] +templates.env.globals["has_webhooks"] = has_webhooks # pyright: ignore[reportArgumentType] @app.post("/add_webhook") @@ -213,11 +221,15 @@ async def post_add_webhook( # Example: [{"name": "webhook_name", "url": "webhook_url"}] webhooks = cast("list[dict[str, str]]", webhooks) + clean_webhook_url: str = webhook_url.strip() + if not is_url_valid(clean_webhook_url): + raise HTTPException(status_code=400, detail="Invalid webhook URL") + # Only add the webhook if it doesn't already exist. stripped_webhook_name = webhook_name.strip() if all(webhook["name"] != stripped_webhook_name for webhook in webhooks): # Add the new webhook to the list of webhooks. - webhooks.append({"name": webhook_name.strip(), "url": webhook_url.strip()}) + webhooks.append({"name": webhook_name.strip(), "url": clean_webhook_url}) reader.set_tag((), "webhooks", webhooks) # pyright: ignore[reportArgumentType] @@ -300,6 +312,52 @@ async def post_create_feed( return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) +@app.post("/attach_feed_webhook") +async def post_attach_feed_webhook( + feed_url: Annotated[str, Form()], + webhook_dropdown: Annotated[str, Form()], + reader: Annotated[Reader, Depends(get_reader_dependency)], + redirect_to: Annotated[str, Form()] = "", +) -> RedirectResponse: + """Attach an existing feed to one of the configured webhooks. + + Args: + feed_url: The feed URL to update. + webhook_dropdown: The webhook name selected from the dropdown. + reader: The Reader instance. + redirect_to: Optional redirect URL after update. + + Returns: + RedirectResponse: Redirect to index or feed page. + + Raises: + HTTPException: If feed or webhook cannot be found. + """ + clean_feed_url: str = urllib.parse.unquote(feed_url.strip()) + selected_webhook_name: str = webhook_dropdown.strip() + + try: + reader.get_feed(clean_feed_url) + except FeedNotFoundError as e: + raise HTTPException(status_code=404, detail="Feed not found") from e + + webhook_url: str = "" + hooks = cast("list[dict[str, str]]", list(reader.get_tag((), "webhooks", []))) + for hook in hooks: + if hook.get("name") == selected_webhook_name: + webhook_url = hook.get("url", "").strip() + break + + if not webhook_url: + raise HTTPException(status_code=404, detail="Webhook not found") + + reader.set_tag(clean_feed_url, "webhook", webhook_url) # pyright: ignore[reportArgumentType] + commit_state_change(reader, f"Attach feed {clean_feed_url} to webhook {selected_webhook_name}") + + redirect_url: str = redirect_to.strip() or f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}" + return RedirectResponse(url=redirect_url, status_code=303) + + @app.post("/pause") async def post_pause_feed( feed_url: Annotated[str, Form()], @@ -1058,6 +1116,14 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 except FeedNotFoundError as e: raise HTTPException(status_code=404, detail=f"Feed '{clean_feed_url}' not found.\n\n{e}") from e + webhooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(reader.get_tag((), "webhooks", []))) + current_webhook_url: str = str(reader.get_tag(feed.url, "webhook", "")).strip() + current_webhook_name: str = "" + for hook in webhooks: + if hook.get("url", "").strip() == current_webhook_url: + current_webhook_name = hook.get("name", "").strip() + break + # Only show button if more than 10 entries. total_entries: int = reader.get_entry_counts(feed=feed).total or 0 is_show_more_entries_button_visible: bool = total_entries > entries_per_page @@ -1103,6 +1169,9 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 "total_entries": total_entries, "feed_interval": feed_interval, "global_interval": global_interval, + "webhooks": webhooks, + "current_webhook_url": current_webhook_url, + "current_webhook_name": current_webhook_name, } return templates.TemplateResponse(request=request, name="feed.html", context=context) @@ -1159,6 +1228,9 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 "total_entries": total_entries, "feed_interval": feed_interval, "global_interval": global_interval, + "webhooks": webhooks, + "current_webhook_url": current_webhook_url, + "current_webhook_name": current_webhook_name, } return templates.TemplateResponse(request=request, name="feed.html", context=context) @@ -1300,20 +1372,30 @@ def get_data_from_hook_url(hook_name: str, hook_url: str) -> WebhookInfo: Returns: WebhookInfo: The webhook username, avatar, guild id, etc. """ - our_hook: WebhookInfo = WebhookInfo(custom_name=hook_name, url=hook_url) + clean_hook_url: str = hook_url.strip() + our_hook: WebhookInfo = WebhookInfo(custom_name=hook_name, url=clean_hook_url) - if hook_url: - response: Response = httpx.get(hook_url) - if response.is_success: - webhook_json = json.loads(response.text) - our_hook.webhook_type = webhook_json["type"] or None - our_hook.webhook_id = webhook_json["id"] or None - our_hook.name = webhook_json["name"] or None - our_hook.avatar = webhook_json["avatar"] or None - our_hook.channel_id = webhook_json["channel_id"] or None - our_hook.guild_id = webhook_json["guild_id"] or None - our_hook.token = webhook_json["token"] or None - our_hook.avatar_mod = int(webhook_json["channel_id"] or 0) % 5 + # Keep /webhooks usable even if a malformed webhook URL was saved. + if not clean_hook_url or not is_url_valid(clean_hook_url): + logger.warning("Skipping webhook metadata fetch for invalid URL: %s", clean_hook_url) + return our_hook + + try: + response: Response = httpx.get(clean_hook_url, timeout=10.0) + except httpx.HTTPError as e: + logger.warning("Failed to fetch webhook metadata for %s: %s", clean_hook_url, e) + return our_hook + + if response.is_success: + webhook_json = json.loads(response.text) + our_hook.webhook_type = webhook_json["type"] or None + our_hook.webhook_id = webhook_json["id"] or None + our_hook.name = webhook_json["name"] or None + our_hook.avatar = webhook_json["avatar"] or None + our_hook.channel_id = webhook_json["channel_id"] or None + our_hook.guild_id = webhook_json["guild_id"] or None + our_hook.token = webhook_json["token"] or None + our_hook.avatar_mod = int(webhook_json["channel_id"] or 0) % 5 return our_hook @@ -1649,6 +1731,9 @@ def modify_webhook( webhooks = cast("list[dict[str, str]]", webhooks) old_hook_clean: str = old_hook.strip() new_hook_clean: str = new_hook.strip() + if not is_url_valid(new_hook_clean): + raise HTTPException(status_code=400, detail="Invalid webhook URL") + webhook_modified: bool = False for hook in webhooks: diff --git a/discord_rss_bot/templates/feed.html b/discord_rss_bot/templates/feed.html index 6cb7ad4..92e0e7b 100644 --- a/discord_rss_bot/templates/feed.html +++ b/discord_rss_bot/templates/feed.html @@ -5,7 +5,7 @@ {% block content %}
-
+
-
+
Rendered HTML content
diff --git a/discord_rss_bot/templates/nav.html b/discord_rss_bot/templates/nav.html index 7442554..2286744 100644 --- a/discord_rss_bot/templates/nav.html +++ b/discord_rss_bot/templates/nav.html @@ -12,10 +12,12 @@ Feeds - - + {% if has_webhooks() %} + + + {% endif %} diff --git a/tests/test_main.py b/tests/test_main.py index 7766500..d680a26 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -25,6 +25,7 @@ if TYPE_CHECKING: import pytest from httpx import Response from reader import Entry + from reader import Reader client: TestClient = TestClient(app) webhook_name: str = "Hello, I am a webhook!" @@ -88,6 +89,56 @@ def test_add_webhook() -> None: assert webhook_name in response.text, f"Webhook not found in /webhooks: {response.text}" +def test_add_webhook_rejects_invalid_url() -> None: + """Adding a webhook with a non-URL value should fail validation.""" + response: Response = client.post( + url="/add_webhook", + data={"webhook_name": "Invalid URL Hook", "webhook_url": "not-a-url"}, + ) + + assert response.status_code == 400, f"Expected invalid webhook URL to be rejected: {response.text}" + assert "Invalid webhook URL" in response.text + + +def test_add_webhook_allows_valid_url_after_invalid_attempt() -> None: + """A rejected invalid webhook URL should not prevent a later valid add.""" + response: Response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) + assert response.status_code == 200, f"Failed to delete webhook: {response.text}" + + response = client.post( + url="/add_webhook", + data={"webhook_name": "Invalid URL Hook", "webhook_url": "not-a-url"}, + ) + assert response.status_code == 400, f"Expected invalid webhook URL to be rejected: {response.text}" + assert "Invalid webhook URL" in response.text + + response = client.post( + url="/add_webhook", + data={"webhook_name": webhook_name, "webhook_url": webhook_url}, + ) + assert response.status_code == 200, f"Failed to add webhook after invalid attempt: {response.text}" + + response = client.get(url="/webhooks") + assert response.status_code == 200, f"Failed to get /webhooks: {response.text}" + assert webhook_name in response.text, f"Webhook not found in /webhooks: {response.text}" + + response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) + assert response.status_code == 200, f"Failed to delete webhook: {response.text}" + + +def test_webhooks_page_handles_invalid_stored_webhook_url() -> None: + """/webhooks should render even if a malformed webhook URL is present in storage.""" + reader: Reader = get_reader_dependency() + malformed_webhook_name = "Malformed hook" + malformed_webhook_url = "definitely-not-a-url" + + reader.set_tag((), "webhooks", [{"name": malformed_webhook_name, "url": malformed_webhook_url}]) # pyright: ignore[reportArgumentType] + response: Response = client.get(url="/webhooks") + + assert response.status_code == 200, f"/webhooks should not crash for malformed URLs: {response.text}" + assert malformed_webhook_name in response.text + + def test_create_feed() -> None: """Test the /create_feed page.""" # Ensure webhook exists for this test regardless of test order. @@ -197,6 +248,28 @@ def test_add_page_shows_global_default_delivery_mode_hint() -> None: assert "text" in response.text +def test_navbar_add_feed_visible_only_when_webhooks_exist() -> None: + reader: Reader = get_reader_dependency() + reader.set_tag((), "webhooks", []) # pyright: ignore[reportArgumentType] + + response: Response = client.get(url="/") + assert response.status_code == 200, f"/ failed: {response.text}" + assert 'Add feed' not in response.text + + response = client.post( + url="/add_webhook", + data={"webhook_name": webhook_name, "webhook_url": webhook_url}, + ) + assert response.status_code == 200, f"Failed to add webhook: {response.text}" + + response = client.get(url="/") + assert response.status_code == 200, f"/ failed: {response.text}" + assert 'Add feed' in response.text + + cleanup_response: Response = client.post(url="/delete_webhook", data={"webhook_url": webhook_url}) + assert cleanup_response.status_code == 200, f"Failed to clean up webhook: {cleanup_response.text}" + + def test_c3kay_feed_delivery_mode_toggle_routes_update_stored_tags() -> None: reader = get_reader_dependency() c3kay_feed_url = "https://feeds.c3kay.de/hoyolab-ui-toggle-test.xml" @@ -536,6 +609,114 @@ def test_delete_webhook() -> None: assert webhook_name not in response3.text, f"Webhook found in /webhooks: {response3.text}" +def test_attach_feed_webhook_from_index() -> None: + """Feeds without attached webhook should be attachable from the index page.""" + original_webhook_name = "original-webhook" + original_webhook_url = "https://discord.com/api/webhooks/111/original" + replacement_webhook_name = "replacement-webhook" + replacement_webhook_url = "https://discord.com/api/webhooks/222/replacement" + + # Start clean. + client.post(url="/remove", data={"feed_url": feed_url}) + client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url}) + client.post(url="/delete_webhook", data={"webhook_url": replacement_webhook_url}) + + # Add a webhook and a feed attached to it. + response = client.post( + url="/add_webhook", + data={"webhook_name": original_webhook_name, "webhook_url": original_webhook_url}, + ) + assert response.status_code == 200, f"Failed to add original webhook: {response.text}" + + response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": original_webhook_name}) + assert response.status_code == 200, f"Failed to add feed: {response.text}" + + # Remove the original webhook so feed becomes "without attached webhook". + response = client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url}) + assert response.status_code == 200, f"Failed to delete original webhook: {response.text}" + + # Add a replacement webhook we can attach to. + response = client.post( + url="/add_webhook", + data={"webhook_name": replacement_webhook_name, "webhook_url": replacement_webhook_url}, + ) + assert response.status_code == 200, f"Failed to add replacement webhook: {response.text}" + + # The feed should now be listed in "Feeds without attached webhook" section. + response = client.get(url="/") + assert response.status_code == 200, f"Failed to get /: {response.text}" + assert "Feeds without attached webhook:" in response.text + assert "/attach_feed_webhook" in response.text + + # Attach the feed to the new webhook. + response = client.post( + url="/attach_feed_webhook", + data={"feed_url": feed_url, "webhook_dropdown": replacement_webhook_name, "redirect_to": "/"}, + ) + assert response.status_code == 200, f"Failed to attach feed to webhook: {response.text}" + + reader = get_reader_dependency() + assert reader.get_tag(feed_url, "webhook", "") == replacement_webhook_url + + # Cleanup. + client.post(url="/remove", data={"feed_url": feed_url}) + client.post(url="/delete_webhook", data={"webhook_url": replacement_webhook_url}) + + +def test_attach_feed_webhook_from_feed_page() -> None: + """Feed detail page should allow attaching/replacing webhook directly.""" + original_webhook_name = "feed-page-original-webhook" + original_webhook_url = "https://discord.com/api/webhooks/333/original" + replacement_webhook_name = "feed-page-replacement-webhook" + replacement_webhook_url = "https://discord.com/api/webhooks/444/replacement" + + # Start clean. + client.post(url="/remove", data={"feed_url": feed_url}) + client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url}) + client.post(url="/delete_webhook", data={"webhook_url": replacement_webhook_url}) + + # Create two webhooks and attach feed to original. + response = client.post( + url="/add_webhook", + data={"webhook_name": original_webhook_name, "webhook_url": original_webhook_url}, + ) + assert response.status_code == 200, f"Failed to add original webhook: {response.text}" + + response = client.post( + url="/add_webhook", + data={"webhook_name": replacement_webhook_name, "webhook_url": replacement_webhook_url}, + ) + assert response.status_code == 200, f"Failed to add replacement webhook: {response.text}" + + response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": original_webhook_name}) + assert response.status_code == 200, f"Failed to add feed: {response.text}" + + # Feed page should show the webhook form and current webhook label. + response = client.get(url="/feed", params={"feed_url": feed_url}) + assert response.status_code == 200, f"Failed to get /feed: {response.text}" + assert "Current webhook:" in response.text + assert "/attach_feed_webhook" in response.text + + # Reattach to replacement webhook via endpoint used by feed page form. + response = client.post( + url="/attach_feed_webhook", + data={ + "feed_url": feed_url, + "webhook_dropdown": replacement_webhook_name, + "redirect_to": f"/feed?feed_url={urllib.parse.quote(feed_url)}", + }, + ) + assert response.status_code == 200, f"Failed to reattach feed webhook: {response.text}" + + reader = get_reader_dependency() + assert reader.get_tag(feed_url, "webhook", "") == replacement_webhook_url + + # Cleanup. + client.post(url="/remove", data={"feed_url": feed_url}) + client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url}) + client.post(url="/delete_webhook", data={"webhook_url": replacement_webhook_url}) + + def test_update_feed_not_found() -> None: """Test updating a non-existent feed.""" # Generate a feed URL that does not exist