+ {% endif %}
{# Search #}
+ {% if is_show_more_entries_button_visible and last_entry %}
+
+ {% endif %}
+{% endblock content %}
diff --git a/discord_rss_bot/templates/webhooks.html b/discord_rss_bot/templates/webhooks.html
index 3f0934f..d37e390 100644
--- a/discord_rss_bot/templates/webhooks.html
+++ b/discord_rss_bot/templates/webhooks.html
@@ -1,55 +1,63 @@
{% extends "base.html" %}
{% block title %}
-| Webhooks
+ | Webhooks
{% endblock title %}
{% block content %}
-
- {% for hook in hooks_with_data %}
-
-
-
{{ hook.custom_name }}
-
-
-
-{% endblock content %}
+ {% endblock content %}
diff --git a/discord_rss_bot/templates/whitelist.html b/discord_rss_bot/templates/whitelist.html
index 5a958f6..61755e2 100644
--- a/discord_rss_bot/templates/whitelist.html
+++ b/discord_rss_bot/templates/whitelist.html
@@ -1,6 +1,6 @@
{% extends "base.html" %}
{% block title %}
-| Blacklist
+| Whitelist
{% endblock title %}
{% block content %}
@@ -42,6 +42,49 @@
Whitelist - Author
+
+
+
+
+
+ Regular expression patterns for advanced filtering. Each pattern should be on a new
+ line.
+
+ Patterns are case-insensitive.
+
+ Examples:
+
+
+^New Release:.*
+\b(update|version|patch)\s+\d+\.\d+
+.*\[(important|notice)\].*
+
+
+
+
+
+
Regex Whitelist - Title
+
{%- if regex_whitelist_title -%}{{ regex_whitelist_title }}{%- endif -%}
+
+
Regex Whitelist -
+ Summary
+
{%- if regex_whitelist_summary -%}{{ regex_whitelist_summary }}{%- endif -%}
+
+
Regex Whitelist -
+ Content
+
{%- if regex_whitelist_content -%}{{ regex_whitelist_content }}{%- endif -%}
+
+
Regex Whitelist - Author
+
{%- if regex_whitelist_author -%}{{ regex_whitelist_author }}{%- endif -%}
+
diff --git a/docker-compose.yml b/docker-compose.yml
index 837ed0b..6b92975 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -10,7 +10,7 @@ services:
# - /Docker/Bots/discord-rss-bot:/home/botuser/.local/share/discord_rss_bot/
- data:/home/botuser/.local/share/discord_rss_bot/
healthcheck:
- test: ["CMD", "python", "discord_rss_bot/healthcheck.py"]
+ test: [ "CMD", "uv", "run", "./discord_rss_bot/healthcheck.py" ]
interval: 1m
timeout: 10s
retries: 3
diff --git a/pyproject.toml b/pyproject.toml
index 4cda1f6..970faeb 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -5,7 +5,7 @@ description = "RSS bot for Discord"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
- "apscheduler",
+ "apscheduler>=3.11.0",
"discord-webhook",
"fastapi",
"httpx",
@@ -17,54 +17,30 @@ dependencies = [
"python-multipart",
"reader",
"sentry-sdk[fastapi]",
+ "tldextract",
"uvicorn",
]
[dependency-groups]
-dev = ["pytest"]
-
-[tool.poetry]
-name = "discord-rss-bot"
-version = "1.0.0"
-description = "RSS bot for Discord"
-authors = ["Joakim Hellsén
"]
-
-[tool.poetry.dependencies]
-python = "^3.12"
-apscheduler = "*"
-discord-webhook = "*"
-fastapi = "*"
-httpx = "*"
-jinja2 = "*"
-lxml = "*"
-markdownify = "*"
-platformdirs = "*"
-python-dotenv = "*"
-python-multipart = "*"
-reader = "*"
-sentry-sdk = {version = "*", extras = ["fastapi"]}
-uvicorn = "*"
-
-[tool.poetry.group.dev.dependencies]
-pytest = "*"
+dev = ["djlint", "pytest", "pytest-randomly", "pytest-xdist"]
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
-[tool.djlint]
-ignore = "D004,D018,J018,T001,J004"
-profile = "jinja"
-max_line_length = 120
-format_attribute_template_tags = true
-
[tool.ruff]
+
preview = true
+unsafe-fixes = true
+fix = true
line-length = 120
+
lint.select = ["ALL"]
+lint.unfixable = ["F841"] # Don't automatically remove unused variables
lint.pydocstyle.convention = "google"
lint.isort.required-imports = ["from __future__ import annotations"]
-lint.pycodestyle.ignore-overlong-task-comments = true
+lint.isort.force-single-line = true
+
lint.ignore = [
"ANN201", # Checks that public functions and methods have return type annotations.
@@ -86,6 +62,8 @@ lint.ignore = [
"PLR6301", # Checks for the presence of unused self parameter in methods definitions.
"RUF029", # Checks for functions declared async that do not await or otherwise use features requiring the function to be declared async.
"TD003", # Checks that a TODO comment is associated with a link to a relevant issue or ticket.
+ "PLR0913", # Checks for function definitions that include too many arguments.
+ "PLR0917", # Checks for function definitions that include too many positional arguments.
# Conflicting lint rules when using Ruff's formatter
# https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules
@@ -108,15 +86,8 @@ lint.ignore = [
[tool.ruff.lint.per-file-ignores]
"tests/*" = ["S101", "D103", "PLR2004"]
-[tool.ruff.lint.mccabe]
-max-complexity = 15 # Don't judge lol
-
[tool.pytest.ini_options]
-python_files = ["test_*.py"]
-log_cli = true
-log_cli_level = "DEBUG"
-log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)"
-log_cli_date_format = "%Y-%m-%d %H:%M:%S"
+addopts = "-n 5 --dist loadfile"
filterwarnings = [
"ignore::bs4.GuessedAtParserWarning",
"ignore:functools\\.partial will be a method descriptor in future Python versions; wrap it in staticmethod\\(\\) if you want to preserve the old behavior:FutureWarning",
diff --git a/requirements.txt b/requirements.txt
deleted file mode 100644
index 112d13e..0000000
--- a/requirements.txt
+++ /dev/null
@@ -1,13 +0,0 @@
-apscheduler
-discord-webhook
-fastapi
-httpx
-jinja2
-lxml
-markdownify
-platformdirs
-python-dotenv
-python-multipart
-reader
-sentry-sdk[fastapi]
-uvicorn
diff --git a/tests/conftest.py b/tests/conftest.py
new file mode 100644
index 0000000..4aa791d
--- /dev/null
+++ b/tests/conftest.py
@@ -0,0 +1,68 @@
+from __future__ import annotations
+
+import os
+import shutil
+import sys
+import tempfile
+import warnings
+from contextlib import suppress
+from pathlib import Path
+from typing import TYPE_CHECKING
+from typing import Any
+
+from bs4 import MarkupResemblesLocatorWarning
+
+if TYPE_CHECKING:
+ import pytest
+
+
+def pytest_addoption(parser: pytest.Parser) -> None:
+ """Register custom command-line options for optional integration tests."""
+ parser.addoption(
+ "--run-real-git-backup-tests",
+ action="store_true",
+ default=False,
+ help="Run tests that push git backup state to a real repository.",
+ )
+
+
+def pytest_sessionstart(session: pytest.Session) -> None:
+ """Isolate persistent app state per xdist worker to avoid cross-worker test interference."""
+ worker_id: str = os.environ.get("PYTEST_XDIST_WORKER", "gw0")
+ worker_data_dir: Path = Path(tempfile.gettempdir()) / "discord-rss-bot-tests" / worker_id
+
+ # Start each worker from a clean state.
+ shutil.rmtree(worker_data_dir, ignore_errors=True)
+ worker_data_dir.mkdir(parents=True, exist_ok=True)
+
+ os.environ["DISCORD_RSS_BOT_DATA_DIR"] = str(worker_data_dir)
+
+ # Tests call markdownify which may invoke BeautifulSoup on strings that look
+ # like URLs; that triggers MarkupResemblesLocatorWarning from bs4. Silence
+ # that warning during tests to avoid noisy output.
+ warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning)
+
+ # If modules were imported before this hook (unlikely), force them to use
+ # the worker-specific location.
+ settings_module: Any = sys.modules.get("discord_rss_bot.settings")
+ if settings_module is not None:
+ settings_module.data_dir = str(worker_data_dir)
+ get_reader: Any = getattr(settings_module, "get_reader", None)
+ if get_reader is not None and hasattr(get_reader, "cache_clear"):
+ get_reader.cache_clear()
+
+ main_module: Any = sys.modules.get("discord_rss_bot.main")
+ if main_module is not None and settings_module is not None:
+ with suppress(Exception):
+ current_reader = getattr(main_module, "reader", None)
+ if current_reader is not None:
+ current_reader.close()
+ get_reader: Any = getattr(settings_module, "get_reader", None)
+ if callable(get_reader):
+ get_reader()
+
+
+def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item]) -> None:
+ """Skip real git-repo push tests unless explicitly requested."""
+ if config.getoption("--run-real-git-backup-tests"):
+ return
diff --git a/tests/test_blacklist.py b/tests/test_blacklist.py
index 4f5a317..0c756ad 100644
--- a/tests/test_blacklist.py
+++ b/tests/test_blacklist.py
@@ -4,9 +4,13 @@ import tempfile
from pathlib import Path
from typing import TYPE_CHECKING
-from reader import Entry, Feed, Reader, make_reader
+from reader import Entry
+from reader import Feed
+from reader import Reader
+from reader import make_reader
-from discord_rss_bot.filter.blacklist import entry_should_be_skipped, feed_has_blacklist_tags
+from discord_rss_bot.filter.blacklist import entry_should_be_skipped
+from discord_rss_bot.filter.blacklist import feed_has_blacklist_tags
if TYPE_CHECKING:
from collections.abc import Iterable
@@ -34,11 +38,18 @@ def test_has_black_tags() -> None:
# Test feed without any blacklist tags
assert_msg: str = "Feed should not have any blacklist tags"
- assert feed_has_blacklist_tags(custom_reader=get_reader(), feed=feed) is False, assert_msg
+ assert feed_has_blacklist_tags(reader=get_reader(), feed=feed) is False, assert_msg
check_if_has_tag(reader, feed, "blacklist_title")
check_if_has_tag(reader, feed, "blacklist_summary")
check_if_has_tag(reader, feed, "blacklist_content")
+ check_if_has_tag(reader, feed, "blacklist_author")
+
+ # Test regex blacklist tags
+ check_if_has_tag(reader, feed, "regex_blacklist_title")
+ check_if_has_tag(reader, feed, "regex_blacklist_summary")
+ check_if_has_tag(reader, feed, "regex_blacklist_content")
+ check_if_has_tag(reader, feed, "regex_blacklist_author")
# Clean up
reader.delete_feed(feed_url)
@@ -47,11 +58,11 @@ def test_has_black_tags() -> None:
def check_if_has_tag(reader: Reader, feed: Feed, blacklist_name: str) -> None:
reader.set_tag(feed, blacklist_name, "a") # pyright: ignore[reportArgumentType]
assert_msg: str = f"Feed should have blacklist tags: {blacklist_name}"
- assert feed_has_blacklist_tags(custom_reader=reader, feed=feed) is True, assert_msg
+ assert feed_has_blacklist_tags(reader=reader, feed=feed) is True, assert_msg
asset_msg: str = f"Feed should not have any blacklist tags: {blacklist_name}"
reader.delete_tag(feed, blacklist_name)
- assert feed_has_blacklist_tags(custom_reader=reader, feed=feed) is False, asset_msg
+ assert feed_has_blacklist_tags(reader=reader, feed=feed) is False, asset_msg
def test_should_be_skipped() -> None:
@@ -74,6 +85,7 @@ def test_should_be_skipped() -> None:
# Test entry without any blacklists
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
+ # Test standard blacklist functionality
reader.set_tag(feed, "blacklist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
assert entry_should_be_skipped(reader, first_entry[0]) is True, f"Entry should be skipped: {first_entry[0]}"
reader.delete_tag(feed, "blacklist_title")
@@ -113,3 +125,81 @@ def test_should_be_skipped() -> None:
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
reader.delete_tag(feed, "blacklist_author")
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
+
+
+def test_regex_should_be_skipped() -> None:
+ """Test the regex filtering functionality for blacklist."""
+ reader: Reader = get_reader()
+
+ # Add feed and update entries
+ reader.add_feed(feed_url)
+ feed: Feed = reader.get_feed(feed_url)
+ reader.update_feeds()
+
+ # Get first entry
+ first_entry: list[Entry] = []
+ entries: Iterable[Entry] = reader.get_entries(feed=feed)
+ assert entries is not None, f"Entries should not be None: {entries}"
+ for entry in entries:
+ first_entry.append(entry)
+ break
+ assert len(first_entry) == 1, f"First entry should be added: {first_entry}"
+
+ # Test entry without any regex blacklists
+ assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
+
+ # Test regex blacklist for title
+ reader.set_tag(feed, "regex_blacklist_title", r"fvnnn\w+") # pyright: ignore[reportArgumentType]
+ assert entry_should_be_skipped(reader, first_entry[0]) is True, (
+ f"Entry should be skipped with regex title match: {first_entry[0]}"
+ )
+ reader.delete_tag(feed, "regex_blacklist_title")
+ assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
+
+ # Test regex blacklist for summary
+ reader.set_tag(feed, "regex_blacklist_summary", r"ffdnfdn\w+") # pyright: ignore[reportArgumentType]
+ assert entry_should_be_skipped(reader, first_entry[0]) is True, (
+ f"Entry should be skipped with regex summary match: {first_entry[0]}"
+ )
+ reader.delete_tag(feed, "regex_blacklist_summary")
+ assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
+
+ # Test regex blacklist for content
+ reader.set_tag(feed, "regex_blacklist_content", r"ffdnfdnfdn\w+") # pyright: ignore[reportArgumentType]
+ assert entry_should_be_skipped(reader, first_entry[0]) is True, (
+ f"Entry should be skipped with regex content match: {first_entry[0]}"
+ )
+ reader.delete_tag(feed, "regex_blacklist_content")
+ assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
+
+ # Test regex blacklist for author
+ reader.set_tag(feed, "regex_blacklist_author", r"TheLovinator\d*") # pyright: ignore[reportArgumentType]
+ assert entry_should_be_skipped(reader, first_entry[0]) is True, (
+ f"Entry should be skipped with regex author match: {first_entry[0]}"
+ )
+ reader.delete_tag(feed, "regex_blacklist_author")
+ assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
+
+ # Test invalid regex pattern (should not raise an exception)
+ reader.set_tag(feed, "regex_blacklist_title", r"[incomplete") # pyright: ignore[reportArgumentType]
+ assert entry_should_be_skipped(reader, first_entry[0]) is False, (
+ f"Entry should not be skipped with invalid regex: {first_entry[0]}"
+ )
+ reader.delete_tag(feed, "regex_blacklist_title")
+
+ # Test multiple regex patterns separated by commas
+ reader.set_tag(feed, "regex_blacklist_author", r"pattern1,TheLovinator\d*,pattern3") # pyright: ignore[reportArgumentType]
+ assert entry_should_be_skipped(reader, first_entry[0]) is True, (
+ f"Entry should be skipped with one matching pattern in list: {first_entry[0]}"
+ )
+ reader.delete_tag(feed, "regex_blacklist_author")
+ assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
+
+ # Test newline-separated regex patterns
+ newline_patterns = "pattern1\nTheLovinator\\d*\npattern3"
+ reader.set_tag(feed, "regex_blacklist_author", newline_patterns) # pyright: ignore[reportArgumentType]
+ assert entry_should_be_skipped(reader, first_entry[0]) is True, (
+ f"Entry should be skipped with newline-separated patterns: {first_entry[0]}"
+ )
+ reader.delete_tag(feed, "regex_blacklist_author")
+ assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
diff --git a/tests/test_custom_filter.py b/tests/test_custom_filter.py
index 94ce18e..9611698 100644
--- a/tests/test_custom_filter.py
+++ b/tests/test_custom_filter.py
@@ -5,7 +5,9 @@ import tempfile
from pathlib import Path
from typing import TYPE_CHECKING
-from discord_rss_bot.custom_filters import encode_url, entry_is_blacklisted, entry_is_whitelisted
+from discord_rss_bot.custom_filters import encode_url
+from discord_rss_bot.custom_filters import entry_is_blacklisted
+from discord_rss_bot.custom_filters import entry_is_whitelisted
from discord_rss_bot.settings import get_reader
if TYPE_CHECKING:
@@ -43,39 +45,39 @@ def test_entry_is_whitelisted() -> None:
Path.mkdir(Path(temp_dir), exist_ok=True)
custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite")
- custom_reader: Reader = get_reader(custom_location=str(custom_loc))
+ reader: Reader = get_reader(custom_location=str(custom_loc))
# Add a feed to the database.
- custom_reader.add_feed("https://lovinator.space/rss_test.xml")
- custom_reader.update_feed("https://lovinator.space/rss_test.xml")
+ reader.add_feed("https://lovinator.space/rss_test.xml")
+ reader.update_feed("https://lovinator.space/rss_test.xml")
# whitelist_title
- custom_reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
- for entry in custom_reader.get_entries():
- if entry_is_whitelisted(entry) is True:
+ reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
+ for entry in reader.get_entries():
+ if entry_is_whitelisted(entry, reader=reader) is True:
assert entry.title == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.title}"
break
- custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_title")
+ reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_title")
# whitelist_summary
- custom_reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
- for entry in custom_reader.get_entries():
- if entry_is_whitelisted(entry) is True:
+ reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
+ for entry in reader.get_entries():
+ if entry_is_whitelisted(entry, reader=reader) is True:
assert entry.summary == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.summary}"
break
- custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_summary")
+ reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_summary")
# whitelist_content
- custom_reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
- for entry in custom_reader.get_entries():
- if entry_is_whitelisted(entry) is True:
+ reader.set_tag("https://lovinator.space/rss_test.xml", "whitelist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
+ for entry in reader.get_entries():
+ if entry_is_whitelisted(entry, reader=reader) is True:
assert_msg = f"Expected: ffdnfdnfdnfdnfdndfn
, Got: {entry.content[0].value}"
assert entry.content[0].value == "ffdnfdnfdnfdnfdndfn
", assert_msg
break
- custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_content")
+ reader.delete_tag("https://lovinator.space/rss_test.xml", "whitelist_content")
# Close the reader, so we can delete the directory.
- custom_reader.close()
+ reader.close()
def test_entry_is_blacklisted() -> None:
@@ -85,36 +87,36 @@ def test_entry_is_blacklisted() -> None:
Path.mkdir(Path(temp_dir), exist_ok=True)
custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite")
- custom_reader: Reader = get_reader(custom_location=str(custom_loc))
+ reader: Reader = get_reader(custom_location=str(custom_loc))
# Add a feed to the database.
- custom_reader.add_feed("https://lovinator.space/rss_test.xml")
- custom_reader.update_feed("https://lovinator.space/rss_test.xml")
+ reader.add_feed("https://lovinator.space/rss_test.xml")
+ reader.update_feed("https://lovinator.space/rss_test.xml")
# blacklist_title
- custom_reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
- for entry in custom_reader.get_entries():
- if entry_is_blacklisted(entry) is True:
+ reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
+ for entry in reader.get_entries():
+ if entry_is_blacklisted(entry, reader=reader) is True:
assert entry.title == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.title}"
break
- custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_title")
+ reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_title")
# blacklist_summary
- custom_reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
- for entry in custom_reader.get_entries():
- if entry_is_blacklisted(entry) is True:
+ reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_summary", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
+ for entry in reader.get_entries():
+ if entry_is_blacklisted(entry, reader=reader) is True:
assert entry.summary == "fvnnnfnfdnfdnfd", f"Expected: fvnnnfnfdnfdnfd, Got: {entry.summary}"
break
- custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_summary")
+ reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_summary")
# blacklist_content
- custom_reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
- for entry in custom_reader.get_entries():
- if entry_is_blacklisted(entry) is True:
+ reader.set_tag("https://lovinator.space/rss_test.xml", "blacklist_content", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
+ for entry in reader.get_entries():
+ if entry_is_blacklisted(entry, reader=reader) is True:
assert_msg = f"Expected: ffdnfdnfdnfdnfdndfn
, Got: {entry.content[0].value}"
assert entry.content[0].value == "ffdnfdnfdnfdnfdndfn
", assert_msg
break
- custom_reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_content")
+ reader.delete_tag("https://lovinator.space/rss_test.xml", "blacklist_content")
# Close the reader, so we can delete the directory.
- custom_reader.close()
+ reader.close()
diff --git a/tests/test_custom_message.py b/tests/test_custom_message.py
new file mode 100644
index 0000000..4b23f45
--- /dev/null
+++ b/tests/test_custom_message.py
@@ -0,0 +1,140 @@
+from __future__ import annotations
+
+import typing
+from types import SimpleNamespace
+from unittest.mock import MagicMock
+from unittest.mock import patch
+
+import pytest
+
+from discord_rss_bot.custom_message import CustomEmbed
+from discord_rss_bot.custom_message import format_entry_html_for_discord
+from discord_rss_bot.custom_message import replace_tags_in_embed
+from discord_rss_bot.custom_message import replace_tags_in_text_message
+
+if typing.TYPE_CHECKING:
+ from reader import Entry
+
+# https://docs.discord.com/developers/reference#message-formatting
+TIMESTAMP_FORMATS: tuple[str, ...] = (
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+)
+
+
+def make_feed() -> SimpleNamespace:
+ return SimpleNamespace(
+ added=None,
+ author="Feed Author",
+ last_exception=None,
+ last_updated=None,
+ link="https://example.com/feed",
+ subtitle="",
+ title="Example Feed",
+ updated=None,
+ updates_enabled=True,
+ url="https://example.com/feed.xml",
+ user_title="",
+ version="atom10",
+ )
+
+
+def make_entry(summary: str) -> SimpleNamespace:
+ feed: SimpleNamespace = make_feed()
+ return SimpleNamespace(
+ added=None,
+ author="Entry Author",
+ content=[],
+ feed=feed,
+ feed_url=feed.url,
+ id="entry-1",
+ important=False,
+ link="https://example.com/entry-1",
+ published=None,
+ read=False,
+ read_modified=None,
+ summary=summary,
+ title="Entry Title",
+ updated=None,
+ )
+
+
+@pytest.mark.parametrize("timestamp_tag", TIMESTAMP_FORMATS)
+def test_format_entry_html_for_discord_preserves_timestamp_tags(timestamp_tag: str) -> None:
+ escaped_timestamp_tag: str = timestamp_tag.replace("<", "<").replace(">", ">")
+ html_summary: str = f"Starts: 2026-03-13 23:30 UTC ({escaped_timestamp_tag})
"
+
+ rendered: str = format_entry_html_for_discord(html_summary)
+
+ assert timestamp_tag in rendered
+ assert "DISCORDTIMESTAMPPLACEHOLDER" not in rendered
+
+
+def test_format_entry_html_for_discord_empty_text_returns_empty_string() -> None:
+ rendered: str = format_entry_html_for_discord("")
+ assert not rendered
+
+
+def test_format_entry_html_for_discord_cleans_markdownified_https_link_text() -> None:
+ html_summary: str = "[https://example.com](https://example.com)"
+
+ rendered: str = format_entry_html_for_discord(html_summary)
+
+ assert "[example.com](https://example.com)" in rendered
+ assert "[https://example.com]" not in rendered
+
+
+def test_format_entry_html_for_discord_does_not_preserve_invalid_timestamp_style() -> None:
+ invalid_timestamp: str = ""
+ html_summary: str = f"Invalid style ({invalid_timestamp.replace('<', '<').replace('>', '>')})
"
+
+ rendered: str = format_entry_html_for_discord(html_summary)
+
+ assert invalid_timestamp not in rendered
+
+
+@patch("discord_rss_bot.custom_message.get_custom_message")
+def test_replace_tags_in_text_message_preserves_timestamp_tags(
+ mock_get_custom_message: MagicMock,
+) -> None:
+ mock_reader = MagicMock()
+ mock_get_custom_message.return_value = "{{entry_summary}}"
+ summary_parts: list[str] = [
+ f"Format {index}: ({timestamp_tag.replace('<', '<').replace('>', '>')})
"
+ for index, timestamp_tag in enumerate(TIMESTAMP_FORMATS, start=1)
+ ]
+ entry_ns: SimpleNamespace = make_entry("".join(summary_parts))
+
+ entry: Entry = typing.cast("Entry", entry_ns)
+ rendered: str = replace_tags_in_text_message(entry, reader=mock_reader)
+
+ for timestamp_tag in TIMESTAMP_FORMATS:
+ assert timestamp_tag in rendered
+
+
+@patch("discord_rss_bot.custom_message.get_embed")
+def test_replace_tags_in_embed_preserves_timestamp_tags(
+ mock_get_embed: MagicMock,
+) -> None:
+ mock_reader = MagicMock()
+ mock_get_embed.return_value = CustomEmbed(description="{{entry_summary}}")
+ summary_parts: list[str] = [
+ f"Format {index}: ({timestamp_tag.replace('<', '<').replace('>', '>')})
"
+ for index, timestamp_tag in enumerate(TIMESTAMP_FORMATS, start=1)
+ ]
+ entry_ns: SimpleNamespace = make_entry("".join(summary_parts))
+
+ entry: Entry = typing.cast("Entry", entry_ns)
+
+ embed: CustomEmbed = replace_tags_in_embed(entry_ns.feed, entry, reader=mock_reader)
+
+ for timestamp_tag in TIMESTAMP_FORMATS:
+ assert timestamp_tag in embed.description
diff --git a/tests/test_feeds.py b/tests/test_feeds.py
index e6e1381..84e836c 100644
--- a/tests/test_feeds.py
+++ b/tests/test_feeds.py
@@ -4,12 +4,20 @@ import os
import tempfile
from pathlib import Path
from typing import LiteralString
+from unittest.mock import MagicMock
+from unittest.mock import patch
import pytest
-from reader import Feed, Reader, make_reader
+from reader import Feed
+from reader import Reader
+from reader import make_reader
-from discord_rss_bot.feeds import send_to_discord, truncate_webhook_message
-from discord_rss_bot.missing_tags import add_missing_tags
+from discord_rss_bot.feeds import extract_domain
+from discord_rss_bot.feeds import is_youtube_feed
+from discord_rss_bot.feeds import send_entry_to_discord
+from discord_rss_bot.feeds import send_to_discord
+from discord_rss_bot.feeds import should_send_embed_check
+from discord_rss_bot.feeds import truncate_webhook_message
def test_send_to_discord() -> None:
@@ -26,8 +34,6 @@ def test_send_to_discord() -> None:
# Add a feed to the reader.
reader.add_feed("https://www.reddit.com/r/Python/.rss")
- add_missing_tags(reader)
-
# Update the feed to get the entries.
reader.update_feeds()
@@ -49,7 +55,7 @@ def test_send_to_discord() -> None:
assert reader.get_tag(feed, "webhook") == webhook_url, f"The webhook URL should be '{webhook_url}'."
# Send the feed to Discord.
- send_to_discord(custom_reader=reader, feed=feed, do_once=True)
+ send_to_discord(reader=reader, feed=feed, do_once=True)
# Close the reader, so we can delete the directory.
reader.close()
@@ -85,3 +91,186 @@ def test_truncate_webhook_message_long_message():
# Test the end of the message
assert_msg = "The end of the truncated message should be '...' to indicate truncation."
assert truncated_message[-half_length:] == "A" * half_length, assert_msg
+
+
+def test_is_youtube_feed():
+ """Test the is_youtube_feed function."""
+ # YouTube feed URLs
+ assert is_youtube_feed("https://www.youtube.com/feeds/videos.xml?channel_id=123456") is True
+ assert is_youtube_feed("https://www.youtube.com/feeds/videos.xml?user=username") is True
+
+ # Non-YouTube feed URLs
+ assert is_youtube_feed("https://www.example.com/feed.xml") is False
+ assert is_youtube_feed("https://www.youtube.com/watch?v=123456") is False
+ assert is_youtube_feed("https://www.reddit.com/r/Python/.rss") is False
+
+
+@patch("discord_rss_bot.feeds.logger")
+def test_should_send_embed_check_youtube_feeds(mock_logger: MagicMock) -> None:
+ """Test should_send_embed_check returns False for YouTube feeds regardless of settings."""
+ # Create mocks
+ mock_reader = MagicMock()
+ mock_entry = MagicMock()
+
+ # Configure a YouTube feed
+ mock_entry.feed.url = "https://www.youtube.com/feeds/videos.xml?channel_id=123456"
+
+ # Set reader to return True for should_send_embed (would normally create an embed)
+ mock_reader.get_tag.return_value = True
+
+ # Result should be False, overriding the feed settings
+ result = should_send_embed_check(mock_reader, mock_entry)
+ assert result is False, "YouTube feeds should never use embeds"
+
+ # Function should not even call get_tag for YouTube feeds
+ mock_reader.get_tag.assert_not_called()
+
+
+@patch("discord_rss_bot.feeds.logger")
+def test_should_send_embed_check_normal_feeds(mock_logger: MagicMock) -> None:
+ """Test should_send_embed_check returns feed settings for non-YouTube feeds."""
+ # Create mocks
+ mock_reader = MagicMock()
+ mock_entry = MagicMock()
+
+ # Configure a normal feed
+ mock_entry.feed.url = "https://www.example.com/feed.xml"
+
+ # Test with should_send_embed set to True
+ mock_reader.get_tag.return_value = True
+ result = should_send_embed_check(mock_reader, mock_entry)
+ assert result is True, "Normal feeds should use embeds when enabled"
+
+ # Test with should_send_embed set to False
+ mock_reader.get_tag.return_value = False
+ result = should_send_embed_check(mock_reader, mock_entry)
+ assert result is False, "Normal feeds should not use embeds when disabled"
+
+
+@patch("discord_rss_bot.feeds.get_reader")
+@patch("discord_rss_bot.feeds.get_custom_message")
+@patch("discord_rss_bot.feeds.replace_tags_in_text_message")
+@patch("discord_rss_bot.feeds.create_embed_webhook")
+@patch("discord_rss_bot.feeds.DiscordWebhook")
+@patch("discord_rss_bot.feeds.execute_webhook")
+def test_send_entry_to_discord_youtube_feed(
+ mock_execute_webhook: MagicMock,
+ mock_discord_webhook: MagicMock,
+ mock_create_embed: MagicMock,
+ mock_replace_tags: MagicMock,
+ mock_get_custom_message: MagicMock,
+ mock_get_reader: MagicMock,
+):
+ """Test send_entry_to_discord function with YouTube feeds."""
+ # Set up mocks
+ mock_reader = MagicMock()
+ mock_get_reader.return_value = mock_reader
+ mock_entry = MagicMock()
+ mock_feed = MagicMock()
+
+ # Configure a YouTube feed
+ mock_entry.feed = mock_feed
+ mock_entry.feed.url = "https://www.youtube.com/feeds/videos.xml?channel_id=123456"
+ mock_entry.feed_url = "https://www.youtube.com/feeds/videos.xml?channel_id=123456"
+
+ # Mock the tags
+ mock_reader.get_tag.side_effect = lambda feed, tag, default=None: { # noqa: ARG005
+ "webhook": "https://discord.com/api/webhooks/123/abc",
+ "should_send_embed": True, # This should be ignored for YouTube feeds
+ }.get(tag, default)
+
+ # Mock custom message
+ mock_get_custom_message.return_value = "Custom message"
+ mock_replace_tags.return_value = "Formatted message with {{entry_link}}"
+
+ # Mock webhook
+ mock_webhook = MagicMock()
+ mock_discord_webhook.return_value = mock_webhook
+
+ # Call the function
+ send_entry_to_discord(mock_entry, mock_reader)
+
+ # Assertions
+ mock_create_embed.assert_not_called()
+ mock_discord_webhook.assert_called_once()
+
+ # Check webhook was created with the right message
+ webhook_call_kwargs = mock_discord_webhook.call_args[1]
+ assert "content" in webhook_call_kwargs, "Webhook should have content"
+ assert webhook_call_kwargs["url"] == "https://discord.com/api/webhooks/123/abc"
+
+ # Verify execute_webhook was called
+ mock_execute_webhook.assert_called_once_with(mock_webhook, mock_entry, reader=mock_reader)
+
+
+def test_extract_domain_youtube_feed() -> None:
+ """Test extract_domain for YouTube feeds."""
+ url: str = "https://www.youtube.com/feeds/videos.xml?channel_id=123456"
+ assert extract_domain(url) == "YouTube", "YouTube feeds should return 'YouTube' as the domain."
+
+
+def test_extract_domain_reddit_feed() -> None:
+ """Test extract_domain for Reddit feeds."""
+ url: str = "https://www.reddit.com/r/Python/.rss"
+ assert extract_domain(url) == "Reddit", "Reddit feeds should return 'Reddit' as the domain."
+
+
+def test_extract_domain_github_feed() -> None:
+ """Test extract_domain for GitHub feeds."""
+ url: str = "https://www.github.com/user/repo"
+ assert extract_domain(url) == "GitHub", "GitHub feeds should return 'GitHub' as the domain."
+
+
+def test_extract_domain_custom_domain() -> None:
+ """Test extract_domain for custom domains."""
+ url: str = "https://www.example.com/feed"
+ assert extract_domain(url) == "Example", "Custom domains should return the capitalized first part of the domain."
+
+
+def test_extract_domain_no_www_prefix() -> None:
+ """Test extract_domain removes 'www.' prefix."""
+ url: str = "https://www.example.com/feed"
+ assert extract_domain(url) == "Example", "The 'www.' prefix should be removed from the domain."
+
+
+def test_extract_domain_no_tld() -> None:
+ """Test extract_domain for domains without a TLD."""
+ url: str = "https://localhost/feed"
+ assert extract_domain(url) == "Localhost", "Domains without a TLD should return the capitalized domain."
+
+
+def test_extract_domain_invalid_url() -> None:
+ """Test extract_domain for invalid URLs."""
+ url: str = "not-a-valid-url"
+ assert extract_domain(url) == "Other", "Invalid URLs should return 'Other' as the domain."
+
+
+def test_extract_domain_empty_url() -> None:
+ """Test extract_domain for empty URLs."""
+ url: str = ""
+ assert extract_domain(url) == "Other", "Empty URLs should return 'Other' as the domain."
+
+
+def test_extract_domain_special_characters() -> None:
+ """Test extract_domain for URLs with special characters."""
+ url: str = "https://www.ex-ample.com/feed"
+ assert extract_domain(url) == "Ex-ample", "Domains with special characters should return the capitalized domain."
+
+
+@pytest.mark.parametrize(
+ argnames=("url", "expected"),
+ argvalues=[
+ ("https://blog.something.com", "Something"),
+ ("https://www.something.com", "Something"),
+ ("https://subdomain.example.co.uk", "Example"),
+ ("https://github.com/user/repo", "GitHub"),
+ ("https://youtube.com/feeds/videos.xml?channel_id=abc", "YouTube"),
+ ("https://reddit.com/r/python/.rss", "Reddit"),
+ ("", "Other"),
+ ("not a url", "Other"),
+ ("https://www.example.com", "Example"),
+ ("https://foo.bar.baz.com", "Baz"),
+ ],
+)
+def test_extract_domain(url: str, expected: str) -> None:
+ assert extract_domain(url) == expected
diff --git a/tests/test_git_backup.py b/tests/test_git_backup.py
new file mode 100644
index 0000000..183d178
--- /dev/null
+++ b/tests/test_git_backup.py
@@ -0,0 +1,475 @@
+from __future__ import annotations
+
+import contextlib
+import json
+import shutil
+import subprocess # noqa: S404
+from pathlib import Path
+from typing import TYPE_CHECKING
+from typing import Any
+from unittest.mock import MagicMock
+from unittest.mock import patch
+
+import pytest
+from fastapi.testclient import TestClient
+
+from discord_rss_bot.git_backup import commit_state_change
+from discord_rss_bot.git_backup import export_state
+from discord_rss_bot.git_backup import get_backup_path
+from discord_rss_bot.git_backup import get_backup_remote
+from discord_rss_bot.git_backup import setup_backup_repo
+from discord_rss_bot.main import app
+
+if TYPE_CHECKING:
+ from pathlib import Path
+
+
+SKIP_IF_NO_GIT: pytest.MarkDecorator = pytest.mark.skipif(
+ shutil.which("git") is None,
+ reason="git executable not found",
+)
+
+
+def test_get_backup_path_unset(monkeypatch: pytest.MonkeyPatch) -> None:
+ """get_backup_path returns None when GIT_BACKUP_PATH is not set."""
+ monkeypatch.delenv("GIT_BACKUP_PATH", raising=False)
+ assert get_backup_path() is None
+
+
+def test_get_backup_path_set(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
+ """get_backup_path returns a Path when GIT_BACKUP_PATH is set."""
+ monkeypatch.setenv("GIT_BACKUP_PATH", str(tmp_path))
+ result: Path | None = get_backup_path()
+ assert result == tmp_path
+
+
+def test_get_backup_path_strips_whitespace(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
+ """get_backup_path strips surrounding whitespace from the env var value."""
+ monkeypatch.setenv("GIT_BACKUP_PATH", f" {tmp_path} ")
+ result: Path | None = get_backup_path()
+ assert result == tmp_path
+
+
+def test_get_backup_remote_unset(monkeypatch: pytest.MonkeyPatch) -> None:
+ """get_backup_remote returns empty string when GIT_BACKUP_REMOTE is not set."""
+ monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
+ assert not get_backup_remote()
+
+
+def test_get_backup_remote_set(monkeypatch: pytest.MonkeyPatch) -> None:
+ """get_backup_remote returns the configured remote URL."""
+ monkeypatch.setenv("GIT_BACKUP_REMOTE", "git@github.com:user/repo.git")
+ assert get_backup_remote() == "git@github.com:user/repo.git"
+
+
+@SKIP_IF_NO_GIT
+def test_setup_backup_repo_creates_git_repo(tmp_path: Path) -> None:
+ """setup_backup_repo initialises a git repo in a fresh directory."""
+ backup_path: Path = tmp_path / "backup"
+ result: bool = setup_backup_repo(backup_path)
+ assert result is True
+ assert (backup_path / ".git").exists()
+
+
+@SKIP_IF_NO_GIT
+def test_setup_backup_repo_idempotent(tmp_path: Path) -> None:
+ """setup_backup_repo does not fail when called on an existing repo."""
+ backup_path: Path = tmp_path / "backup"
+ assert setup_backup_repo(backup_path) is True
+ assert setup_backup_repo(backup_path) is True
+
+
+def test_setup_backup_repo_adds_origin_remote(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
+ """setup_backup_repo adds remote 'origin' when GIT_BACKUP_REMOTE is set."""
+ backup_path: Path = tmp_path / "backup"
+ monkeypatch.setenv("GIT_BACKUP_REMOTE", "git@github.com:user/private.git")
+
+ with patch("discord_rss_bot.git_backup.subprocess.run") as mock_run:
+ # git config --local queries fail initially so setup writes defaults.
+ mock_run.side_effect = [
+ MagicMock(returncode=0), # git init
+ MagicMock(returncode=1), # config user.email read
+ MagicMock(returncode=0), # config user.email write
+ MagicMock(returncode=1), # config user.name read
+ MagicMock(returncode=0), # config user.name write
+ MagicMock(returncode=1), # remote get-url origin (missing)
+ MagicMock(returncode=0), # remote add origin
+ ]
+
+ assert setup_backup_repo(backup_path) is True
+
+ called_commands: list[list[str]] = [call.args[0] for call in mock_run.call_args_list]
+ assert ["remote", "add", "origin", "git@github.com:user/private.git"] in [
+ cmd[-4:] for cmd in called_commands if len(cmd) >= 4
+ ]
+
+
+def test_setup_backup_repo_updates_origin_remote(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
+ """setup_backup_repo updates existing origin when URL differs."""
+ backup_path: Path = tmp_path / "backup"
+ monkeypatch.setenv("GIT_BACKUP_REMOTE", "git@github.com:user/new-private.git")
+
+ with patch("discord_rss_bot.git_backup.subprocess.run") as mock_run:
+ # Existing repo path: no git init call.
+ (backup_path / ".git").mkdir(parents=True)
+
+ mock_run.side_effect = [
+ MagicMock(returncode=0), # config user.email read
+ MagicMock(returncode=0), # config user.name read
+ MagicMock(returncode=0, stdout=b"git@github.com:user/old-private.git\n"), # remote get-url origin
+ MagicMock(returncode=0), # remote set-url origin
+ ]
+
+ assert setup_backup_repo(backup_path) is True
+
+ called_commands: list[list[str]] = [call.args[0] for call in mock_run.call_args_list]
+ assert ["remote", "set-url", "origin", "git@github.com:user/new-private.git"] in [
+ cmd[-4:] for cmd in called_commands if len(cmd) >= 4
+ ]
+
+
+def test_export_state_creates_state_json(tmp_path: Path) -> None:
+ """export_state writes a valid state.json to the backup directory."""
+ mock_reader = MagicMock()
+
+ # Feeds
+ feed1 = MagicMock()
+ feed1.url = "https://example.com/feed.rss"
+ mock_reader.get_feeds.return_value = [feed1]
+
+ # Tag values: webhook present, everything else absent (returns None)
+ def get_tag_side_effect(
+ feed_or_key: tuple | str,
+ tag: str | None = None,
+ default: str | None = None,
+ ) -> list[Any] | str | None:
+ if feed_or_key == () and tag is None:
+ # Called for global webhooks list
+ return []
+
+ if tag == "webhook":
+ return "https://discord.com/api/webhooks/123/abc"
+
+ return default
+
+ mock_reader.get_tag.side_effect = get_tag_side_effect
+
+ backup_path: Path = tmp_path / "backup"
+ backup_path.mkdir()
+ export_state(mock_reader, backup_path)
+
+ state_file: Path = backup_path / "state.json"
+ assert state_file.exists(), "state.json should be created by export_state"
+
+ data: dict[str, Any] = json.loads(state_file.read_text(encoding="utf-8"))
+ assert "feeds" in data
+ assert "webhooks" in data
+ assert data["feeds"][0]["url"] == "https://example.com/feed.rss"
+ assert data["feeds"][0]["webhook"] == "https://discord.com/api/webhooks/123/abc"
+
+
+def test_export_state_omits_empty_tags(tmp_path: Path) -> None:
+ """export_state does not include tags with empty-string or None values."""
+ mock_reader = MagicMock()
+ feed1 = MagicMock()
+ feed1.url = "https://example.com/feed.rss"
+ mock_reader.get_feeds.return_value = [feed1]
+
+ def get_tag_side_effect(
+ feed_or_key: tuple | str,
+ tag: str | None = None,
+ default: str | None = None,
+ ) -> list[Any] | str | None:
+ if feed_or_key == ():
+ return []
+
+ # Return empty string for all tags
+ return default # default is None
+
+ mock_reader.get_tag.side_effect = get_tag_side_effect
+
+ backup_path: Path = tmp_path / "backup"
+ backup_path.mkdir()
+ export_state(mock_reader, backup_path)
+
+ data: dict[str, Any] = json.loads((backup_path / "state.json").read_text())
+
+ # Only "url" key should be present (no empty-value tags)
+ assert list(data["feeds"][0].keys()) == ["url"]
+
+
+def test_commit_state_change_noop_when_not_configured(monkeypatch: pytest.MonkeyPatch) -> None:
+ """commit_state_change does nothing when GIT_BACKUP_PATH is not set."""
+ monkeypatch.delenv("GIT_BACKUP_PATH", raising=False)
+ mock_reader = MagicMock()
+
+ # Should not raise and should not call reader methods for export
+ commit_state_change(mock_reader, "Add feed example.com/rss")
+ mock_reader.get_feeds.assert_not_called()
+
+
+@SKIP_IF_NO_GIT
+def test_commit_state_change_commits(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
+ """commit_state_change creates a commit in the backup repo."""
+ backup_path: Path = tmp_path / "backup"
+ monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
+ monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
+
+ mock_reader = MagicMock()
+ mock_reader.get_feeds.return_value = []
+ mock_reader.get_tag.return_value = []
+
+ commit_state_change(mock_reader, "Add feed https://example.com/rss")
+
+ # Verify a commit was created in the backup repo
+ git_executable: str | None = shutil.which("git")
+
+ assert git_executable is not None, "git executable not found"
+ result: subprocess.CompletedProcess[str] = subprocess.run( # noqa: S603
+ [git_executable, "-C", str(backup_path), "log", "--oneline"],
+ capture_output=True,
+ text=True,
+ check=False,
+ )
+ assert result.returncode == 0
+ assert "Add feed https://example.com/rss" in result.stdout
+
+
+@SKIP_IF_NO_GIT
+def test_commit_state_change_no_double_commit(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
+ """commit_state_change does not create a commit when state has not changed."""
+ backup_path: Path = tmp_path / "backup"
+ monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
+ monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
+
+ mock_reader = MagicMock()
+ mock_reader.get_feeds.return_value = []
+ mock_reader.get_tag.return_value = []
+
+ commit_state_change(mock_reader, "First commit")
+ commit_state_change(mock_reader, "Should not appear")
+
+ git_executable: str | None = shutil.which("git")
+ assert git_executable is not None, "git executable not found"
+ result: subprocess.CompletedProcess[str] = subprocess.run( # noqa: S603
+ [git_executable, "-C", str(backup_path), "log", "--oneline"],
+ capture_output=True,
+ text=True,
+ check=False,
+ )
+ assert result.returncode == 0
+ assert "First commit" in result.stdout
+ assert "Should not appear" not in result.stdout
+
+
+def test_commit_state_change_push_when_remote_set(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
+ """commit_state_change calls git push when GIT_BACKUP_REMOTE is configured."""
+ backup_path: Path = tmp_path / "backup"
+ monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
+ monkeypatch.setenv("GIT_BACKUP_REMOTE", "git@github.com:user/private.git")
+
+ mock_reader = MagicMock()
+ mock_reader.get_feeds.return_value = []
+ mock_reader.get_tag.return_value = []
+
+ with patch("discord_rss_bot.git_backup.subprocess.run") as mock_run:
+ # Make all subprocess calls succeed
+ mock_run.return_value = MagicMock(returncode=1) # returncode=1 means staged changes exist
+ commit_state_change(mock_reader, "Add feed https://example.com/rss")
+
+ called_commands: list[list[str]] = [call.args[0] for call in mock_run.call_args_list]
+ push_calls: list[list[str]] = [cmd for cmd in called_commands if "push" in cmd]
+ assert push_calls, "git push should have been called when GIT_BACKUP_REMOTE is set"
+ assert any(cmd[-3:] == ["push", "origin", "HEAD"] for cmd in called_commands), (
+ "git push should target configured remote name 'origin'"
+ )
+
+
+def test_commit_state_change_no_push_when_remote_unset(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
+ """commit_state_change does not call git push when GIT_BACKUP_REMOTE is not set."""
+ backup_path: Path = tmp_path / "backup"
+ monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
+ monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
+
+ mock_reader = MagicMock()
+ mock_reader.get_feeds.return_value = []
+ mock_reader.get_tag.return_value = []
+
+ with patch("discord_rss_bot.git_backup.subprocess.run") as mock_run:
+ mock_run.return_value = MagicMock(returncode=1)
+ commit_state_change(mock_reader, "Add feed https://example.com/rss")
+
+ called_commands: list[list[str]] = [call.args[0] for call in mock_run.call_args_list]
+ push_calls: list[list[str]] = [cmd for cmd in called_commands if "push" in cmd]
+ assert not push_calls, "git push should NOT be called when GIT_BACKUP_REMOTE is not set"
+
+
+client: TestClient = TestClient(app)
+test_webhook_name: str = "Test Backup Webhook"
+test_webhook_url: str = "https://discord.com/api/webhooks/999999999/testbackupwebhook"
+test_feed_url: str = "https://lovinator.space/rss_test.xml"
+
+
+def setup_test_feed() -> None:
+ """Set up a test webhook and feed for endpoint tests."""
+ # Clean up existing test data
+ with contextlib.suppress(Exception):
+ client.post(url="/remove", data={"feed_url": test_feed_url})
+
+ with contextlib.suppress(Exception):
+ client.post(url="/delete_webhook", data={"webhook_url": test_webhook_url})
+
+ # Create webhook and feed
+ client.post(
+ url="/add_webhook",
+ data={"webhook_name": test_webhook_name, "webhook_url": test_webhook_url},
+ )
+ client.post(url="/add", data={"feed_url": test_feed_url, "webhook_dropdown": test_webhook_name})
+
+
+def test_post_embed_triggers_backup(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
+ """Posting to /embed should trigger a git backup with appropriate message."""
+ # Set up git backup
+ backup_path: Path = tmp_path / "backup"
+ monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
+ monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
+
+ setup_test_feed()
+
+ with patch("discord_rss_bot.main.commit_state_change") as mock_commit:
+ response = client.post(
+ url="/embed",
+ data={
+ "feed_url": test_feed_url,
+ "title": "Custom Title",
+ "description": "Custom Description",
+ "color": "#FF5733",
+ },
+ )
+ assert response.status_code == 200, f"Failed to post embed: {response.text}"
+ mock_commit.assert_called_once()
+
+ # Verify the commit message contains the feed URL
+ call_args = mock_commit.call_args
+ assert call_args is not None
+ commit_message: str = call_args[0][1]
+ assert "Update embed settings" in commit_message
+ assert test_feed_url in commit_message
+
+
+def test_post_use_embed_triggers_backup(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
+ """Posting to /use_embed should trigger a git backup."""
+ backup_path: Path = tmp_path / "backup"
+ monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
+ monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
+
+ setup_test_feed()
+
+ with patch("discord_rss_bot.main.commit_state_change") as mock_commit:
+ response = client.post(url="/use_embed", data={"feed_url": test_feed_url})
+ assert response.status_code == 200, f"Failed to enable embed: {response.text}"
+ mock_commit.assert_called_once()
+
+ # Verify the commit message
+ call_args = mock_commit.call_args
+ assert call_args is not None
+ commit_message: str = call_args[0][1]
+ assert "Enable embed mode" in commit_message
+ assert test_feed_url in commit_message
+
+
+def test_post_use_text_triggers_backup(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
+ """Posting to /use_text should trigger a git backup."""
+ backup_path: Path = tmp_path / "backup"
+ monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
+ monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
+
+ setup_test_feed()
+
+ with patch("discord_rss_bot.main.commit_state_change") as mock_commit:
+ response = client.post(url="/use_text", data={"feed_url": test_feed_url})
+ assert response.status_code == 200, f"Failed to disable embed: {response.text}"
+ mock_commit.assert_called_once()
+
+ # Verify the commit message
+ call_args = mock_commit.call_args
+ assert call_args is not None
+ commit_message: str = call_args[0][1]
+ assert "Disable embed mode" in commit_message
+ assert test_feed_url in commit_message
+
+
+def test_post_custom_message_triggers_backup(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
+ """Posting to /custom should trigger a git backup."""
+ backup_path: Path = tmp_path / "backup"
+ monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
+ monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
+
+ setup_test_feed()
+
+ with patch("discord_rss_bot.main.commit_state_change") as mock_commit:
+ response = client.post(
+ url="/custom",
+ data={
+ "feed_url": test_feed_url,
+ "custom_message": "Check out this entry: {entry.title}",
+ },
+ )
+ assert response.status_code == 200, f"Failed to set custom message: {response.text}"
+ mock_commit.assert_called_once()
+
+ # Verify the commit message
+ call_args = mock_commit.call_args
+ assert call_args is not None
+ commit_message: str = call_args[0][1]
+ assert "Update custom message" in commit_message
+ assert test_feed_url in commit_message
+
+
+@SKIP_IF_NO_GIT
+def test_embed_backup_end_to_end(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
+ """End-to-end test: customizing embed creates a real commit in the backup repo."""
+ git_executable: str | None = shutil.which("git")
+ assert git_executable is not None, "git executable not found"
+
+ backup_path: Path = tmp_path / "backup"
+ monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
+ monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
+
+ setup_test_feed()
+
+ # Post embed customization
+ response = client.post(
+ url="/embed",
+ data={
+ "feed_url": test_feed_url,
+ "title": "{entry.title}",
+ "description": "{entry.summary}",
+ "color": "#0099FF",
+ "image_url": "{entry.image}",
+ },
+ )
+ assert response.status_code == 200, f"Failed to customize embed: {response.text}"
+
+ # Verify a commit was created
+ result: subprocess.CompletedProcess[str] = subprocess.run( # noqa: S603
+ [git_executable, "-C", str(backup_path), "log", "--oneline"],
+ capture_output=True,
+ text=True,
+ check=False,
+ )
+ assert result.returncode == 0, f"Failed to read git log: {result.stderr}"
+ assert "Update embed settings" in result.stdout, f"Commit not found in log: {result.stdout}"
+
+ # Verify state.json contains embed data
+ state_file: Path = backup_path / "state.json"
+ assert state_file.exists(), "state.json should exist in backup repo"
+ state_data: dict[str, Any] = json.loads(state_file.read_text(encoding="utf-8"))
+
+ # Find our test feed in the state
+ test_feed_data = next((feed for feed in state_data["feeds"] if feed["url"] == test_feed_url), None)
+ assert test_feed_data is not None, f"Test feed not found in state.json: {state_data}"
+
+ # The embed settings are stored as a nested dict under custom_embed tag
+ # This verifies the embed customization was persisted
+ assert "webhook" in test_feed_data, "Feed should have webhook set"
diff --git a/tests/test_hoyolab_api.py b/tests/test_hoyolab_api.py
new file mode 100644
index 0000000..60c83ae
--- /dev/null
+++ b/tests/test_hoyolab_api.py
@@ -0,0 +1,39 @@
+from __future__ import annotations
+
+from discord_rss_bot.hoyolab_api import extract_post_id_from_hoyolab_url
+
+
+class TestExtractPostIdFromHoyolabUrl:
+ def test_extract_post_id_from_article_url(self) -> None:
+ """Test extracting post ID from a direct article URL."""
+ test_cases: list[str] = [
+ "https://www.hoyolab.com/article/38588239",
+ "http://hoyolab.com/article/12345",
+ "https://www.hoyolab.com/article/987654321/comments",
+ ]
+
+ expected_ids: list[str] = ["38588239", "12345", "987654321"]
+
+ for url, expected_id in zip(test_cases, expected_ids, strict=False):
+ assert extract_post_id_from_hoyolab_url(url) == expected_id
+
+ def test_url_without_post_id(self) -> None:
+ """Test with a URL that doesn't have a post ID."""
+ test_cases: list[str] = [
+ "https://www.hoyolab.com/community",
+ ]
+
+ for url in test_cases:
+ assert extract_post_id_from_hoyolab_url(url) is None
+
+ def test_edge_cases(self) -> None:
+ """Test edge cases like None, empty string, and malformed URLs."""
+ test_cases: list[str | None] = [
+ None,
+ "",
+ "not_a_url",
+ "http:/", # Malformed URL
+ ]
+
+ for url in test_cases:
+ assert extract_post_id_from_hoyolab_url(url) is None # type: ignore
diff --git a/tests/test_main.py b/tests/test_main.py
index 59bd109..f6396eb 100644
--- a/tests/test_main.py
+++ b/tests/test_main.py
@@ -1,14 +1,29 @@
from __future__ import annotations
+import re
import urllib.parse
+from dataclasses import dataclass
+from dataclasses import field
+from datetime import UTC
+from datetime import datetime
from typing import TYPE_CHECKING
+from typing import cast
+from unittest.mock import MagicMock
+from unittest.mock import patch
from fastapi.testclient import TestClient
+import discord_rss_bot.main as main_module
from discord_rss_bot.main import app
+from discord_rss_bot.main import create_html_for_feed
+from discord_rss_bot.main import get_reader_dependency
if TYPE_CHECKING:
+ from pathlib import Path
+
+ import pytest
from httpx import Response
+ from reader import Entry
client: TestClient = TestClient(app)
webhook_name: str = "Hello, I am a webhook!"
@@ -45,7 +60,7 @@ def test_search() -> None:
# Check that the feed was added.
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
- assert feed_url in response.text, f"Feed not found in /: {response.text}"
+ assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
# Search for an entry.
response: Response = client.get(url="/search/?query=a")
@@ -72,6 +87,14 @@ def test_add_webhook() -> None:
def test_create_feed() -> None:
"""Test the /create_feed page."""
+ # Ensure webhook exists for this test regardless of test order.
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get(url="/")
if feed_url in feeds.text:
@@ -85,11 +108,19 @@ def test_create_feed() -> None:
# Check that the feed was added.
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
- assert feed_url in response.text, f"Feed not found in /: {response.text}"
+ assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
def test_get() -> None:
"""Test the /create_feed page."""
+ # Ensure webhook exists for this test regardless of test order.
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get("/")
if feed_url in feeds.text:
@@ -103,7 +134,7 @@ def test_get() -> None:
# Check that the feed was added.
response = client.get("/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
- assert feed_url in response.text, f"Feed not found in /: {response.text}"
+ assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
response: Response = client.get(url="/add")
assert response.status_code == 200, f"/add failed: {response.text}"
@@ -129,12 +160,23 @@ def test_get() -> None:
response: Response = client.get(url="/webhooks")
assert response.status_code == 200, f"/webhooks failed: {response.text}"
+ response = client.get(url="/webhook_entries", params={"webhook_url": webhook_url})
+ assert response.status_code == 200, f"/webhook_entries failed: {response.text}"
+
response: Response = client.get(url="/whitelist", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/whitelist failed: {response.text}"
def test_pause_feed() -> None:
"""Test the /pause_feed page."""
+ # Ensure webhook exists for this test regardless of test order.
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get(url="/")
if feed_url in feeds.text:
@@ -143,6 +185,7 @@ def test_pause_feed() -> None:
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
+ assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Unpause the feed if it is paused.
feeds: Response = client.get(url="/")
@@ -157,11 +200,19 @@ def test_pause_feed() -> None:
# Check that the feed was paused.
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
- assert feed_url in response.text, f"Feed not found in /: {response.text}"
+ assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
def test_unpause_feed() -> None:
"""Test the /unpause_feed page."""
+ # Ensure webhook exists for this test regardless of test order.
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get("/")
if feed_url in feeds.text:
@@ -170,6 +221,7 @@ def test_unpause_feed() -> None:
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
+ assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Pause the feed if it is unpaused.
feeds: Response = client.get(url="/")
@@ -184,11 +236,19 @@ def test_unpause_feed() -> None:
# Check that the feed was unpaused.
response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
- assert feed_url in response.text, f"Feed not found in /: {response.text}"
+ assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
def test_remove_feed() -> None:
"""Test the /remove page."""
+ # Ensure webhook exists for this test regardless of test order.
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
# Remove the feed if it already exists before we run the test.
feeds: Response = client.get(url="/")
if feed_url in feeds.text:
@@ -197,6 +257,7 @@ def test_remove_feed() -> None:
# Add the feed.
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
+ assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Remove the feed.
response: Response = client.post(url="/remove", data={"feed_url": feed_url})
@@ -208,6 +269,186 @@ def test_remove_feed() -> None:
assert feed_url not in response.text, f"Feed found in /: {response.text}"
+def test_change_feed_url() -> None:
+ """Test changing a feed URL from the feed page endpoint."""
+ new_feed_url = "https://lovinator.space/rss_test_small.xml"
+
+ # Ensure test feeds do not already exist.
+ client.post(url="/remove", data={"feed_url": feed_url})
+ client.post(url="/remove", data={"feed_url": new_feed_url})
+
+ # Ensure webhook exists.
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
+ # Add the original feed.
+ response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
+ assert response.status_code == 200, f"Failed to add feed: {response.text}"
+
+ # Change feed URL.
+ response = client.post(
+ url="/change_feed_url",
+ data={"old_feed_url": feed_url, "new_feed_url": new_feed_url},
+ )
+ assert response.status_code == 200, f"Failed to change feed URL: {response.text}"
+
+ # New feed should be accessible.
+ response = client.get(url="/feed", params={"feed_url": new_feed_url})
+ assert response.status_code == 200, f"New feed URL is not accessible: {response.text}"
+
+ # Old feed should no longer be accessible.
+ response = client.get(url="/feed", params={"feed_url": feed_url})
+ assert response.status_code == 404, "Old feed URL should no longer exist"
+
+ # Cleanup.
+ client.post(url="/remove", data={"feed_url": new_feed_url})
+
+
+def test_change_feed_url_marks_entries_as_read() -> None:
+ """After changing a feed URL all entries on the new feed should be marked read to prevent resending."""
+ new_feed_url = "https://lovinator.space/rss_test_small.xml"
+
+ # Ensure feeds do not already exist.
+ client.post(url="/remove", data={"feed_url": feed_url})
+ client.post(url="/remove", data={"feed_url": new_feed_url})
+
+ # Ensure webhook exists.
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ client.post(url="/add_webhook", data={"webhook_name": webhook_name, "webhook_url": webhook_url})
+
+ # Add the original feed.
+ response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
+ assert response.status_code == 200, f"Failed to add feed: {response.text}"
+
+ # Patch reader on the main module so we can observe calls.
+ mock_entry_a = MagicMock()
+ mock_entry_a.id = "entry-a"
+ mock_entry_b = MagicMock()
+ mock_entry_b.id = "entry-b"
+
+ real_reader = main_module.get_reader_dependency()
+
+ # Use a no-redirect client so the POST response is inspected directly; the
+ # redirect target (/feed?feed_url=…) would 404 because change_feed_url is mocked.
+ no_redirect_client = TestClient(app, follow_redirects=False)
+
+ with (
+ patch.object(real_reader, "get_entries", return_value=[mock_entry_a, mock_entry_b]) as mock_get_entries,
+ patch.object(real_reader, "set_entry_read") as mock_set_read,
+ patch.object(real_reader, "update_feed") as mock_update_feed,
+ patch.object(real_reader, "change_feed_url"),
+ ):
+ response = no_redirect_client.post(
+ url="/change_feed_url",
+ data={"old_feed_url": feed_url, "new_feed_url": new_feed_url},
+ )
+ assert response.status_code == 303, f"Expected 303 redirect, got {response.status_code}: {response.text}"
+
+ # update_feed should have been called with the new URL.
+ mock_update_feed.assert_called_once_with(new_feed_url)
+
+ # get_entries should have been called to fetch unread entries on the new URL.
+ mock_get_entries.assert_called_once_with(feed=new_feed_url, read=False)
+
+ # Every returned entry should have been marked as read.
+ assert mock_set_read.call_count == 2, f"Expected 2 set_entry_read calls, got {mock_set_read.call_count}"
+ mock_set_read.assert_any_call(mock_entry_a, True)
+ mock_set_read.assert_any_call(mock_entry_b, True)
+
+ # Cleanup.
+ client.post(url="/remove", data={"feed_url": feed_url})
+ client.post(url="/remove", data={"feed_url": new_feed_url})
+
+
+def test_change_feed_url_empty_old_url_returns_400() -> None:
+ """Submitting an empty old_feed_url should return HTTP 400."""
+ response: Response = client.post(
+ url="/change_feed_url",
+ data={"old_feed_url": " ", "new_feed_url": "https://example.com/feed.xml"},
+ )
+ assert response.status_code == 400, f"Expected 400 for empty old URL, got {response.status_code}"
+
+
+def test_change_feed_url_empty_new_url_returns_400() -> None:
+ """Submitting a blank new_feed_url should return HTTP 400."""
+ response: Response = client.post(
+ url="/change_feed_url",
+ data={"old_feed_url": feed_url, "new_feed_url": " "},
+ )
+ assert response.status_code == 400, f"Expected 400 for blank new URL, got {response.status_code}"
+
+
+def test_change_feed_url_nonexistent_old_url_returns_404() -> None:
+ """Trying to rename a feed that does not exist should return HTTP 404."""
+ non_existent = "https://does-not-exist.example.com/rss.xml"
+ # Make sure it really is absent.
+ client.post(url="/remove", data={"feed_url": non_existent})
+
+ response: Response = client.post(
+ url="/change_feed_url",
+ data={"old_feed_url": non_existent, "new_feed_url": "https://example.com/new.xml"},
+ )
+ assert response.status_code == 404, f"Expected 404 for non-existent feed, got {response.status_code}"
+
+
+def test_change_feed_url_new_url_already_exists_returns_409() -> None:
+ """Changing to a URL that is already tracked should return HTTP 409."""
+ second_feed_url = "https://lovinator.space/rss_test_small.xml"
+
+ # Ensure both feeds are absent.
+ client.post(url="/remove", data={"feed_url": feed_url})
+ client.post(url="/remove", data={"feed_url": second_feed_url})
+
+ # Ensure webhook exists.
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ client.post(url="/add_webhook", data={"webhook_name": webhook_name, "webhook_url": webhook_url})
+
+ # Add both feeds.
+ client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
+ client.post(url="/add", data={"feed_url": second_feed_url, "webhook_dropdown": webhook_name})
+
+ # Try to rename one to the other.
+ response: Response = client.post(
+ url="/change_feed_url",
+ data={"old_feed_url": feed_url, "new_feed_url": second_feed_url},
+ )
+ assert response.status_code == 409, f"Expected 409 when new URL already exists, got {response.status_code}"
+
+ # Cleanup.
+ client.post(url="/remove", data={"feed_url": feed_url})
+ client.post(url="/remove", data={"feed_url": second_feed_url})
+
+
+def test_change_feed_url_same_url_redirects_without_error() -> None:
+ """Changing a feed's URL to itself should redirect cleanly without any error."""
+ # Ensure webhook exists.
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ client.post(url="/add_webhook", data={"webhook_name": webhook_name, "webhook_url": webhook_url})
+
+ # Add the feed.
+ client.post(url="/remove", data={"feed_url": feed_url})
+ response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
+ assert response.status_code == 200, f"Failed to add feed: {response.text}"
+
+ # Submit the same URL as both old and new.
+ response = client.post(
+ url="/change_feed_url",
+ data={"old_feed_url": feed_url, "new_feed_url": feed_url},
+ )
+ assert response.status_code == 200, f"Expected 200 redirect for same URL, got {response.status_code}"
+
+ # Feed should still be accessible.
+ response = client.get(url="/feed", params={"feed_url": feed_url})
+ assert response.status_code == 200, f"Feed should still exist after no-op URL change: {response.text}"
+
+ # Cleanup.
+ client.post(url="/remove", data={"feed_url": feed_url})
+
+
def test_delete_webhook() -> None:
"""Test the /delete_webhook page."""
# Remove the feed if it already exists before we run the test.
@@ -229,3 +470,1152 @@ def test_delete_webhook() -> None:
response = client.get(url="/webhooks")
assert response.status_code == 200, f"Failed to get /webhooks: {response.text}"
assert webhook_name not in response.text, f"Webhook found in /webhooks: {response.text}"
+
+
+def test_update_feed_not_found() -> None:
+ """Test updating a non-existent feed."""
+ # Generate a feed URL that does not exist
+ nonexistent_feed_url = "https://nonexistent-feed.example.com/rss.xml"
+
+ # Try to update the non-existent feed
+ response: Response = client.get(url="/update", params={"feed_url": urllib.parse.quote(nonexistent_feed_url)})
+
+ # Check that it returns a 404 status code
+ assert response.status_code == 404, f"Expected 404 for non-existent feed, got: {response.status_code}"
+ assert "Feed not found" in response.text
+
+
+def test_post_entry_send_to_discord() -> None:
+ """Test that /post_entry sends an entry to Discord and redirects to the feed page.
+
+ Regression test for the bug where the injected reader was not passed to
+ send_entry_to_discord, meaning the dependency-injected reader was silently ignored.
+ """
+ # Ensure webhook and feed exist.
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
+ client.post(url="/remove", data={"feed_url": feed_url})
+ response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
+ assert response.status_code == 200, f"Failed to add feed: {response.text}"
+
+ # Retrieve an entry from the feed to get a valid entry ID.
+ reader: main_module.Reader = main_module.get_reader_dependency()
+ entries: list[Entry] = list(reader.get_entries(feed=feed_url, limit=1))
+ assert entries, "Feed should have at least one entry to send"
+ entry_to_send: main_module.Entry = entries[0]
+ encoded_id: str = urllib.parse.quote(entry_to_send.id)
+
+ no_redirect_client = TestClient(app, follow_redirects=False)
+
+ # Patch execute_webhook so no real HTTP requests are made to Discord.
+ with patch("discord_rss_bot.feeds.execute_webhook") as mock_execute:
+ response = no_redirect_client.get(
+ url="/post_entry",
+ params={"entry_id": encoded_id, "feed_url": urllib.parse.quote(feed_url)},
+ )
+
+ assert response.status_code == 303, f"Expected redirect after sending, got {response.status_code}: {response.text}"
+ location: str = response.headers.get("location", "")
+ assert "feed?feed_url=" in location, f"Should redirect to feed page, got: {location}"
+ assert mock_execute.called, "execute_webhook should have been called to deliver the entry to Discord"
+
+ # Cleanup.
+ client.post(url="/remove", data={"feed_url": feed_url})
+
+
+def test_post_entry_unknown_id_returns_404() -> None:
+ """Test that /post_entry returns 404 when the entry ID does not exist."""
+ response: Response = client.get(
+ url="/post_entry",
+ params={"entry_id": "https://nonexistent.example.com/entry-that-does-not-exist"},
+ )
+ assert response.status_code == 404, f"Expected 404 for unknown entry, got {response.status_code}"
+
+
+def test_post_entry_uses_feed_url_to_disambiguate_duplicate_ids() -> None:
+ """When IDs collide across feeds, /post_entry should pick the entry from provided feed_url."""
+
+ @dataclass(slots=True)
+ class DummyFeed:
+ url: str
+
+ @dataclass(slots=True)
+ class DummyEntry:
+ id: str
+ feed: DummyFeed
+ feed_url: str
+
+ feed_a = "https://example.com/feed-a.xml"
+ feed_b = "https://example.com/feed-b.xml"
+ shared_id = "https://example.com/shared-entry-id"
+
+ entry_a: Entry = cast("Entry", DummyEntry(id=shared_id, feed=DummyFeed(feed_a), feed_url=feed_a))
+ entry_b: Entry = cast("Entry", DummyEntry(id=shared_id, feed=DummyFeed(feed_b), feed_url=feed_b))
+
+ class StubReader:
+ def get_entries(self, feed: str | None = None) -> list[Entry]:
+ if feed == feed_a:
+ return [entry_a]
+ if feed == feed_b:
+ return [entry_b]
+ return [entry_a, entry_b]
+
+ selected_feed_urls: list[str] = []
+
+ def fake_send_entry_to_discord(entry: Entry, reader: object) -> None:
+ selected_feed_urls.append(entry.feed.url)
+
+ app.dependency_overrides[get_reader_dependency] = StubReader
+ no_redirect_client = TestClient(app, follow_redirects=False)
+
+ try:
+ with patch("discord_rss_bot.main.send_entry_to_discord", side_effect=fake_send_entry_to_discord):
+ response: Response = no_redirect_client.get(
+ url="/post_entry",
+ params={"entry_id": urllib.parse.quote(shared_id), "feed_url": urllib.parse.quote(feed_b)},
+ )
+
+ assert response.status_code == 303, f"Expected redirect after sending, got {response.status_code}"
+ assert selected_feed_urls == [feed_b], f"Expected feed-b entry, got: {selected_feed_urls}"
+
+ location = response.headers.get("location", "")
+ assert urllib.parse.quote(feed_b) in location, f"Expected redirect to feed-b page, got: {location}"
+ finally:
+ app.dependency_overrides = {}
+
+
+def test_navbar_backup_link_hidden_when_not_configured(monkeypatch: pytest.MonkeyPatch) -> None:
+ """Test that the backup link is not shown in the navbar when GIT_BACKUP_PATH is not set."""
+ # Ensure GIT_BACKUP_PATH is not set
+ monkeypatch.delenv("GIT_BACKUP_PATH", raising=False)
+
+ # Get the index page
+ response: Response = client.get(url="/")
+ assert response.status_code == 200, f"Failed to get /: {response.text}"
+
+ # Check that the backup button is not in the response
+ assert "Backup" not in response.text or 'action="/backup"' not in response.text, (
+ "Backup button should not be visible when GIT_BACKUP_PATH is not configured"
+ )
+
+
+def test_navbar_backup_link_visible_when_configured(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
+ """Test that the backup link is shown in the navbar when GIT_BACKUP_PATH is set."""
+ # Set GIT_BACKUP_PATH
+ monkeypatch.setenv("GIT_BACKUP_PATH", str(tmp_path))
+
+ # Get the index page
+ response: Response = client.get(url="/")
+ assert response.status_code == 200, f"Failed to get /: {response.text}"
+
+ # Check that the backup button is in the response
+ assert "Backup" in response.text, "Backup button text should be visible when GIT_BACKUP_PATH is configured"
+ assert 'action="/backup"' in response.text, "Backup form should be visible when GIT_BACKUP_PATH is configured"
+
+
+def test_backup_endpoint_returns_error_when_not_configured(monkeypatch: pytest.MonkeyPatch) -> None:
+ """Test that the backup endpoint returns an error when GIT_BACKUP_PATH is not set."""
+ # Ensure GIT_BACKUP_PATH is not set
+ monkeypatch.delenv("GIT_BACKUP_PATH", raising=False)
+
+ # Try to trigger a backup
+ response: Response = client.post(url="/backup")
+
+ # Should redirect to index with error message
+ assert response.status_code == 200, f"Failed to post /backup: {response.text}"
+ assert "Git backup is not configured" in response.text or "GIT_BACKUP_PATH" in response.text, (
+ "Error message about backup not being configured should be shown"
+ )
+
+
+def test_show_more_entries_button_visible_when_many_entries() -> None:
+ """Test that the 'Show more entries' button is visible when there are more than 20 entries."""
+ # Add the webhook first
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
+ # Remove the feed if it already exists
+ feeds: Response = client.get(url="/")
+ if feed_url in feeds.text:
+ client.post(url="/remove", data={"feed_url": feed_url})
+
+ # Add the feed
+ response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
+ assert response.status_code == 200, f"Failed to add feed: {response.text}"
+
+ # Get the feed page
+ response: Response = client.get(url="/feed", params={"feed_url": feed_url})
+ assert response.status_code == 200, f"Failed to get /feed: {response.text}"
+
+ # Check if the feed has more than 20 entries by looking at the response
+ # The button should be visible if there are more than 20 entries
+ # We check for both the button text and the link structure
+ if "Show more entries" in response.text:
+ # Button is visible - verify it has the correct structure
+ assert "starting_after=" in response.text, "Show more entries button should contain starting_after parameter"
+ # The button should be a link to the feed page with pagination
+ assert (
+ f'href="/feed?feed_url={urllib.parse.quote(feed_url)}' in response.text
+ or f'href="/feed?feed_url={encoded_feed_url(feed_url)}' in response.text
+ ), "Show more entries button should link back to the feed page"
+
+
+def test_show_more_entries_button_not_visible_when_few_entries() -> None:
+ """Test that the 'Show more entries' button is not visible when there are 20 or fewer entries."""
+ # Ensure webhook exists for this test regardless of test order.
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
+ # Use a feed with very few entries
+ small_feed_url = "https://lovinator.space/rss_test_small.xml"
+
+ # Clean up if exists
+ client.post(url="/remove", data={"feed_url": small_feed_url})
+
+ # Add a small feed (this may not exist, so this test is conditional)
+ response: Response = client.post(url="/add", data={"feed_url": small_feed_url, "webhook_dropdown": webhook_name})
+
+ if response.status_code == 200:
+ # Get the feed page
+ response: Response = client.get(url="/feed", params={"feed_url": small_feed_url})
+ assert response.status_code == 200, f"Failed to get /feed: {response.text}"
+
+ # If the feed has 20 or fewer entries, the button should not be visible
+ # We check the total entry count in the page
+ if "0 entries" in response.text or " entries)" in response.text:
+ # Extract entry count and verify button visibility
+
+ match: re.Match[str] | None = re.search(r"\((\d+) entries\)", response.text)
+ if match:
+ entry_count = int(match.group(1))
+ if entry_count <= 20:
+ assert "Show more entries" not in response.text, (
+ f"Show more entries button should not be visible when there are {entry_count} entries"
+ )
+
+ # Clean up
+ client.post(url="/remove", data={"feed_url": small_feed_url})
+
+
+def test_show_more_entries_pagination_works() -> None:
+ """Test that pagination with starting_after parameter works correctly."""
+ # Add the webhook first
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
+ # Remove the feed if it already exists
+ feeds: Response = client.get(url="/")
+ if feed_url in feeds.text:
+ client.post(url="/remove", data={"feed_url": feed_url})
+
+ # Add the feed
+ response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
+ assert response.status_code == 200, f"Failed to add feed: {response.text}"
+
+ # Get the first page
+ response: Response = client.get(url="/feed", params={"feed_url": feed_url})
+ assert response.status_code == 200, f"Failed to get /feed: {response.text}"
+
+ # Check if pagination is available
+ if "Show more entries" in response.text and "starting_after=" in response.text:
+ # Extract the starting_after parameter from the button link
+ match: re.Match[str] | None = re.search(r'starting_after=([^"&]+)', response.text)
+ if match:
+ starting_after_id: str = match.group(1)
+
+ # Request the second page
+ response: Response = client.get(
+ url="/feed",
+ params={"feed_url": feed_url, "starting_after": starting_after_id},
+ )
+ assert response.status_code == 200, f"Failed to get paginated feed: {response.text}"
+
+ # Verify we got a valid response (the page should contain entries)
+ assert "entries)" in response.text, "Paginated page should show entry count"
+
+
+def test_show_more_entries_button_context_variable() -> None:
+ """Test that the button visibility variable is correctly passed to the template context."""
+ # Add the webhook first
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
+ # Remove the feed if it already exists
+ feeds: Response = client.get(url="/")
+ if feed_url in feeds.text:
+ client.post(url="/remove", data={"feed_url": feed_url})
+
+ # Add the feed
+ response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
+ assert response.status_code == 200, f"Failed to add feed: {response.text}"
+
+ # Get the feed page
+ response: Response = client.get(url="/feed", params={"feed_url": feed_url})
+ assert response.status_code == 200, f"Failed to get /feed: {response.text}"
+
+ # Extract the total entries count from the page
+ match: re.Match[str] | None = re.search(r"\((\d+) entries\)", response.text)
+ if match:
+ entry_count = int(match.group(1))
+
+ # If more than 20 entries, button should be visible
+ if entry_count > 20:
+ assert "Show more entries" in response.text, (
+ f"Button should be visible when there are {entry_count} entries (more than 20)"
+ )
+ # If 20 or fewer entries, button should not be visible
+ else:
+ assert "Show more entries" not in response.text, (
+ f"Button should not be visible when there are {entry_count} entries (20 or fewer)"
+ )
+
+
+def test_create_html_marks_entries_from_another_feed(monkeypatch: pytest.MonkeyPatch) -> None:
+ """Entries from another feed should be marked in /feed html output."""
+
+ @dataclass(slots=True)
+ class DummyContent:
+ value: str
+
+ @dataclass(slots=True)
+ class DummyFeed:
+ url: str
+
+ @dataclass(slots=True)
+ class DummyEntry:
+ feed: DummyFeed
+ id: str
+ original_feed_url: str | None = None
+ link: str = "https://example.com/post"
+ title: str = "Example title"
+ author: str = "Author"
+ summary: str = "Summary"
+ content: list[DummyContent] = field(default_factory=lambda: [DummyContent("Content")])
+ published: None = None
+
+ def __post_init__(self) -> None:
+ if self.original_feed_url is None:
+ self.original_feed_url = self.feed.url
+
+ selected_feed_url = "https://example.com/feed-a.xml"
+ same_feed_entry = DummyEntry(DummyFeed(selected_feed_url), "same")
+ # feed.url matches selected feed, but original_feed_url differs; marker should still show.
+ other_feed_entry = DummyEntry(
+ DummyFeed(selected_feed_url),
+ "other",
+ original_feed_url="https://example.com/feed-b.xml",
+ )
+
+ monkeypatch.setattr(
+ "discord_rss_bot.main.replace_tags_in_text_message",
+ lambda _entry, **_kwargs: "Rendered content",
+ )
+ monkeypatch.setattr("discord_rss_bot.main.entry_is_blacklisted", lambda _entry, **_kwargs: False)
+ monkeypatch.setattr("discord_rss_bot.main.entry_is_whitelisted", lambda _entry, **_kwargs: False)
+
+ same_feed_entry_typed: Entry = cast("Entry", same_feed_entry)
+ other_feed_entry_typed: Entry = cast("Entry", other_feed_entry)
+
+ html: str = create_html_for_feed(
+ reader=MagicMock(),
+ current_feed_url=selected_feed_url,
+ entries=[
+ same_feed_entry_typed,
+ other_feed_entry_typed,
+ ],
+ )
+
+ assert "From another feed: https://example.com/feed-b.xml" in html
+ assert "From another feed: https://example.com/feed-a.xml" not in html
+
+
+def test_webhook_entries_webhook_not_found() -> None:
+ """Test webhook_entries endpoint returns 404 when webhook doesn't exist."""
+ nonexistent_webhook_url = "https://discord.com/api/webhooks/999999/nonexistent"
+
+ response: Response = client.get(
+ url="/webhook_entries",
+ params={"webhook_url": nonexistent_webhook_url},
+ )
+
+ assert response.status_code == 404, f"Expected 404 for non-existent webhook, got: {response.status_code}"
+ assert "Webhook not found" in response.text
+
+
+def test_webhook_entries_no_feeds() -> None:
+ """Test webhook_entries endpoint displays message when webhook has no feeds."""
+ # Clean up any existing feeds first
+ client.post(url="/remove", data={"feed_url": feed_url})
+
+ # Clean up and create a webhook
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
+ # Get webhook_entries without adding any feeds
+ response = client.get(
+ url="/webhook_entries",
+ params={"webhook_url": webhook_url},
+ )
+
+ assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
+ assert webhook_name in response.text, "Webhook name not found in response"
+ assert "No feeds found" in response.text or "Add feeds" in response.text, "Expected message about no feeds"
+
+
+def test_webhook_entries_no_feeds_still_shows_webhook_settings() -> None:
+ """The webhook detail view should show settings/actions even with no attached feeds."""
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
+ response = client.get(
+ url="/webhook_entries",
+ params={"webhook_url": webhook_url},
+ )
+
+ assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
+ assert "Settings" in response.text, "Expected settings card on webhook detail view"
+ assert "Modify Webhook" in response.text, "Expected modify form on webhook detail view"
+ assert "Delete Webhook" in response.text, "Expected delete action on webhook detail view"
+ assert "Back to dashboard" in response.text, "Expected dashboard navigation link"
+ assert "All webhooks" in response.text, "Expected all webhooks navigation link"
+ assert f'name="old_hook" value="{webhook_url}"' in response.text, "Expected old_hook hidden input"
+ assert f'value="/webhook_entries?webhook_url={urllib.parse.quote(webhook_url)}"' in response.text, (
+ "Expected modify form to redirect back to the current webhook detail view"
+ )
+
+
+def test_webhook_entries_with_feeds_no_entries() -> None:
+ """Test webhook_entries endpoint when webhook has feeds but no entries yet."""
+ # Clean up and create fresh webhook
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
+ # Use a feed URL that exists but has no entries (or clean feed)
+ empty_feed_url = "https://lovinator.space/empty_feed.xml"
+ client.post(url="/remove", data={"feed_url": empty_feed_url})
+
+ # Add the feed
+ response = client.post(
+ url="/add",
+ data={"feed_url": empty_feed_url, "webhook_dropdown": webhook_name},
+ )
+
+ # Get webhook_entries
+ response = client.get(
+ url="/webhook_entries",
+ params={"webhook_url": webhook_url},
+ )
+
+ assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
+ assert webhook_name in response.text, "Webhook name not found in response"
+
+ # Clean up
+ client.post(url="/remove", data={"feed_url": empty_feed_url})
+
+
+def test_webhook_entries_with_entries() -> None:
+ """Test webhook_entries endpoint displays entries correctly."""
+ # Clean up and create webhook
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
+ # Remove and add the feed
+ client.post(url="/remove", data={"feed_url": feed_url})
+ response = client.post(
+ url="/add",
+ data={"feed_url": feed_url, "webhook_dropdown": webhook_name},
+ )
+ assert response.status_code == 200, f"Failed to add feed: {response.text}"
+
+ # Get webhook_entries
+ response = client.get(
+ url="/webhook_entries",
+ params={"webhook_url": webhook_url},
+ )
+
+ assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
+ assert webhook_name in response.text, "Webhook name not found in response"
+ # Should show entries (the feed has entries)
+ assert "total from" in response.text, "Expected to see entry count"
+ assert "Modify Webhook" in response.text, "Expected webhook settings to be visible"
+ assert "Attached feeds" in response.text, "Expected attached feeds section to be visible"
+
+
+def test_webhook_entries_shows_attached_feed_link() -> None:
+ """The webhook detail view should list attached feeds linking to their feed pages."""
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
+ client.post(url="/remove", data={"feed_url": feed_url})
+ response = client.post(
+ url="/add",
+ data={"feed_url": feed_url, "webhook_dropdown": webhook_name},
+ )
+ assert response.status_code == 200, f"Failed to add feed: {response.text}"
+
+ response = client.get(
+ url="/webhook_entries",
+ params={"webhook_url": webhook_url},
+ )
+
+ assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
+ assert f"/feed?feed_url={urllib.parse.quote(feed_url)}" in response.text, (
+ "Expected attached feed to link to its feed detail page"
+ )
+ assert "Latest entries" in response.text, "Expected latest entries heading on webhook detail view"
+
+ client.post(url="/remove", data={"feed_url": feed_url})
+
+
+def test_webhook_entries_multiple_feeds() -> None:
+ """Test webhook_entries endpoint shows feed count correctly."""
+ # Clean up and create webhook
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
+ # Remove and add feed
+ client.post(url="/remove", data={"feed_url": feed_url})
+ response = client.post(
+ url="/add",
+ data={"feed_url": feed_url, "webhook_dropdown": webhook_name},
+ )
+ assert response.status_code == 200, f"Failed to add feed: {response.text}"
+
+ # Get webhook_entries
+ response = client.get(
+ url="/webhook_entries",
+ params={"webhook_url": webhook_url},
+ )
+
+ assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
+ assert webhook_name in response.text, "Webhook name not found in response"
+ # Should show entries and feed count
+ assert "feed" in response.text.lower(), "Expected to see feed information"
+
+ # Clean up
+ client.post(url="/remove", data={"feed_url": feed_url})
+
+
+def test_webhook_entries_sort_newest_and_non_null_published_first() -> None:
+ """Webhook entries should be sorted newest-first with published=None entries placed last."""
+
+ @dataclass(slots=True)
+ class DummyFeed:
+ url: str
+ title: str | None = None
+ updates_enabled: bool = True
+ last_exception: None = None
+
+ @dataclass(slots=True)
+ class DummyEntry:
+ id: str
+ feed: DummyFeed
+ published: datetime | None
+
+ dummy_feed = DummyFeed(url="https://example.com/feed.xml", title="Example Feed")
+
+ # Intentionally unsorted input with two dated entries and two undated entries.
+ unsorted_entries: list[Entry] = [
+ cast("Entry", DummyEntry(id="old", feed=dummy_feed, published=datetime(2024, 1, 1, tzinfo=UTC))),
+ cast("Entry", DummyEntry(id="none-1", feed=dummy_feed, published=None)),
+ cast("Entry", DummyEntry(id="new", feed=dummy_feed, published=datetime(2024, 2, 1, tzinfo=UTC))),
+ cast("Entry", DummyEntry(id="none-2", feed=dummy_feed, published=None)),
+ ]
+
+ class StubReader:
+ def get_tag(self, resource: object, key: str, default: object = None) -> object:
+ if resource == () and key == "webhooks":
+ return [{"name": webhook_name, "url": webhook_url}]
+ if key == "webhook" and isinstance(resource, str):
+ return webhook_url
+ return default
+
+ def get_feeds(self) -> list[DummyFeed]:
+ return [dummy_feed]
+
+ def get_entries(self, **_kwargs: object) -> list[Entry]:
+ return unsorted_entries
+
+ observed_order: list[str] = []
+
+ def capture_entries(*, reader: object, entries: list[Entry], current_feed_url: str = "") -> str:
+ del reader, current_feed_url
+ observed_order.extend(entry.id for entry in entries)
+ return ""
+
+ app.dependency_overrides[get_reader_dependency] = StubReader
+ try:
+ with (
+ patch(
+ "discord_rss_bot.main.get_data_from_hook_url",
+ return_value=main_module.WebhookInfo(custom_name=webhook_name, url=webhook_url),
+ ),
+ patch("discord_rss_bot.main.create_html_for_feed", side_effect=capture_entries),
+ ):
+ response: Response = client.get(
+ url="/webhook_entries",
+ params={"webhook_url": webhook_url},
+ )
+
+ assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
+ assert observed_order == ["new", "old", "none-1", "none-2"], (
+ "Expected newest published entries first and published=None entries last"
+ )
+ finally:
+ app.dependency_overrides = {}
+
+
+def test_webhook_entries_pagination() -> None:
+ """Test webhook_entries endpoint pagination functionality."""
+ # Clean up and create webhook
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
+ # Remove and add the feed
+ client.post(url="/remove", data={"feed_url": feed_url})
+ response = client.post(
+ url="/add",
+ data={"feed_url": feed_url, "webhook_dropdown": webhook_name},
+ )
+ assert response.status_code == 200, f"Failed to add feed: {response.text}"
+
+ # Get first page of webhook_entries
+ response = client.get(
+ url="/webhook_entries",
+ params={"webhook_url": webhook_url},
+ )
+
+ assert response.status_code == 200, f"Failed to get /webhook_entries: {response.text}"
+
+ # Check if pagination button is shown when there are many entries
+ # The button should be visible if total_entries > 20 (entries_per_page)
+ if "Load More Entries" in response.text:
+ # Extract the starting_after parameter from the pagination form
+ # This is a simple check that pagination elements exist
+ assert 'name="starting_after"' in response.text, "Expected pagination form with starting_after parameter"
+
+ # Clean up
+ client.post(url="/remove", data={"feed_url": feed_url})
+
+
+def test_webhook_entries_url_encoding() -> None:
+ """Test webhook_entries endpoint handles URL encoding correctly."""
+ # Clean up and create webhook
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
+ # Remove and add the feed
+ client.post(url="/remove", data={"feed_url": feed_url})
+ response = client.post(
+ url="/add",
+ data={"feed_url": feed_url, "webhook_dropdown": webhook_name},
+ )
+ assert response.status_code == 200, f"Failed to add feed: {response.text}"
+
+ # Get webhook_entries with URL-encoded webhook URL
+ encoded_webhook_url = urllib.parse.quote(webhook_url)
+ response = client.get(
+ url="/webhook_entries",
+ params={"webhook_url": encoded_webhook_url},
+ )
+
+ assert response.status_code == 200, f"Failed to get /webhook_entries with encoded URL: {response.text}"
+ assert webhook_name in response.text, "Webhook name not found in response"
+
+ # Clean up
+ client.post(url="/remove", data={"feed_url": feed_url})
+
+
+def test_dashboard_webhook_name_links_to_webhook_detail() -> None:
+ """Webhook names on the dashboard should open the webhook detail view."""
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
+ client.post(url="/remove", data={"feed_url": feed_url})
+ response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
+ assert response.status_code == 200, f"Failed to add feed: {response.text}"
+
+ response = client.get(url="/")
+ assert response.status_code == 200, f"Failed to get /: {response.text}"
+
+ expected_link = f"/webhook_entries?webhook_url={urllib.parse.quote(webhook_url)}"
+ assert expected_link in response.text, "Expected dashboard webhook link to point to the webhook detail view"
+
+ client.post(url="/remove", data={"feed_url": feed_url})
+
+
+def test_modify_webhook_redirects_back_to_webhook_detail() -> None:
+ """Webhook updates from the detail view should redirect back to that view with the new URL."""
+ original_webhook_url = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz"
+ new_webhook_url = "https://discord.com/api/webhooks/1234567890/updated-token"
+
+ client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url})
+ client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url})
+
+
+def test_modify_webhook_triggers_git_backup_commit() -> None:
+ """Modifying a webhook URL should record a state change for git backup."""
+ original_webhook_url = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz"
+ new_webhook_url = "https://discord.com/api/webhooks/1234567890/updated-token"
+
+ client.post(url="/delete_webhook", data={"webhook_url": original_webhook_url})
+ client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url})
+
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": original_webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
+ no_redirect_client = TestClient(app, follow_redirects=False)
+ with patch("discord_rss_bot.main.commit_state_change") as mock_commit_state_change:
+ response = no_redirect_client.post(
+ url="/modify_webhook",
+ data={
+ "old_hook": original_webhook_url,
+ "new_hook": new_webhook_url,
+ "redirect_to": f"/webhook_entries?webhook_url={urllib.parse.quote(original_webhook_url)}",
+ },
+ )
+
+ assert response.status_code == 303, f"Expected 303 redirect, got {response.status_code}: {response.text}"
+ assert mock_commit_state_change.call_count == 1, "Expected webhook modification to trigger git backup commit"
+
+ client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url})
+
+ response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": original_webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
+ no_redirect_client = TestClient(app, follow_redirects=False)
+ response = no_redirect_client.post(
+ url="/modify_webhook",
+ data={
+ "old_hook": original_webhook_url,
+ "new_hook": new_webhook_url,
+ "redirect_to": f"/webhook_entries?webhook_url={urllib.parse.quote(original_webhook_url)}",
+ },
+ )
+
+ assert response.status_code == 303, f"Expected 303 redirect, got {response.status_code}: {response.text}"
+ assert response.headers["location"] == (f"/webhook_entries?webhook_url={urllib.parse.quote(new_webhook_url)}"), (
+ f"Unexpected redirect location: {response.headers['location']}"
+ )
+
+ client.post(url="/delete_webhook", data={"webhook_url": new_webhook_url})
+
+
+def test_webhook_entries_mass_update_preview_shows_old_and_new_urls() -> None:
+ """Preview should list old->new feed URLs for webhook bulk replacement."""
+
+ @dataclass(slots=True)
+ class DummyFeed:
+ url: str
+ title: str | None = None
+ updates_enabled: bool = True
+ last_exception: None = None
+
+ class StubReader:
+ def __init__(self) -> None:
+ self._feeds: list[DummyFeed] = [
+ DummyFeed(url="https://old.example.com/rss/a.xml", title="A"),
+ DummyFeed(url="https://old.example.com/rss/b.xml", title="B"),
+ DummyFeed(url="https://unchanged.example.com/rss/c.xml", title="C"),
+ ]
+
+ def get_tag(self, resource: object, key: str, default: object = None) -> object:
+ if resource == () and key == "webhooks":
+ return [{"name": webhook_name, "url": webhook_url}]
+ if key == "webhook" and isinstance(resource, str):
+ if resource.startswith("https://old.example.com"):
+ return webhook_url
+ if resource.startswith("https://unchanged.example.com"):
+ return webhook_url
+ return default
+
+ def get_feeds(self) -> list[DummyFeed]:
+ return self._feeds
+
+ def get_entries(self, **_kwargs: object) -> list[Entry]:
+ return []
+
+ app.dependency_overrides[get_reader_dependency] = StubReader
+ try:
+ with (
+ patch(
+ "discord_rss_bot.main.get_data_from_hook_url",
+ return_value=main_module.WebhookInfo(custom_name=webhook_name, url=webhook_url),
+ ),
+ patch(
+ "discord_rss_bot.main.resolve_final_feed_url",
+ side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
+ ),
+ ):
+ response: Response = client.get(
+ url="/webhook_entries",
+ params={
+ "webhook_url": webhook_url,
+ "replace_from": "old.example.com",
+ "replace_to": "new.example.com",
+ "resolve_urls": "true",
+ },
+ )
+
+ assert response.status_code == 200, f"Failed to get preview: {response.text}"
+ assert "Mass update feed URLs" in response.text
+ assert "old.example.com/rss/a.xml" in response.text
+ assert "new.example.com/rss/a.xml" in response.text
+ assert "Will update" in response.text
+ assert "Matched: 2" in response.text
+ assert "Will update: 2" in response.text
+ finally:
+ app.dependency_overrides = {}
+
+
+def test_bulk_change_feed_urls_updates_matching_feeds() -> None:
+ """Mass updater should change all matching feed URLs for a webhook."""
+
+ @dataclass(slots=True)
+ class DummyFeed:
+ url: str
+
+ class StubReader:
+ def __init__(self) -> None:
+ self._feeds = [
+ DummyFeed(url="https://old.example.com/rss/a.xml"),
+ DummyFeed(url="https://old.example.com/rss/b.xml"),
+ DummyFeed(url="https://unchanged.example.com/rss/c.xml"),
+ ]
+ self.change_calls: list[tuple[str, str]] = []
+ self.updated_feeds: list[str] = []
+
+ def get_tag(self, resource: object, key: str, default: object = None) -> object:
+ if resource == () and key == "webhooks":
+ return [{"name": webhook_name, "url": webhook_url}]
+ if key == "webhook" and isinstance(resource, str):
+ return webhook_url
+ return default
+
+ def get_feeds(self) -> list[DummyFeed]:
+ return self._feeds
+
+ def change_feed_url(self, old_url: str, new_url: str) -> None:
+ self.change_calls.append((old_url, new_url))
+
+ def update_feed(self, feed_url: str) -> None:
+ self.updated_feeds.append(feed_url)
+
+ def get_entries(self, **_kwargs: object) -> list[Entry]:
+ return []
+
+ def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001
+ return
+
+ stub_reader = StubReader()
+ app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
+ no_redirect_client = TestClient(app, follow_redirects=False)
+
+ try:
+ with patch(
+ "discord_rss_bot.main.resolve_final_feed_url",
+ side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
+ ):
+ response: Response = no_redirect_client.post(
+ url="/bulk_change_feed_urls",
+ data={
+ "webhook_url": webhook_url,
+ "replace_from": "old.example.com",
+ "replace_to": "new.example.com",
+ "resolve_urls": "true",
+ },
+ )
+
+ assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}"
+ assert "Updated%202%20feed%20URL%28s%29" in response.headers.get("location", "")
+ assert sorted(stub_reader.change_calls) == sorted([
+ ("https://old.example.com/rss/a.xml", "https://new.example.com/rss/a.xml"),
+ ("https://old.example.com/rss/b.xml", "https://new.example.com/rss/b.xml"),
+ ])
+ assert sorted(stub_reader.updated_feeds) == sorted([
+ "https://new.example.com/rss/a.xml",
+ "https://new.example.com/rss/b.xml",
+ ])
+ finally:
+ app.dependency_overrides = {}
+
+
+def test_webhook_entries_mass_update_preview_fragment_endpoint() -> None:
+ """HTMX preview endpoint should render only the mass-update preview fragment."""
+
+ @dataclass(slots=True)
+ class DummyFeed:
+ url: str
+ title: str | None = None
+ updates_enabled: bool = True
+ last_exception: None = None
+
+ class StubReader:
+ def __init__(self) -> None:
+ self._feeds: list[DummyFeed] = [
+ DummyFeed(url="https://old.example.com/rss/a.xml", title="A"),
+ DummyFeed(url="https://old.example.com/rss/b.xml", title="B"),
+ ]
+
+ def get_tag(self, resource: object, key: str, default: object = None) -> object:
+ if key == "webhook" and isinstance(resource, str):
+ return webhook_url
+ return default
+
+ def get_feeds(self) -> list[DummyFeed]:
+ return self._feeds
+
+ app.dependency_overrides[get_reader_dependency] = StubReader
+ try:
+ with patch(
+ "discord_rss_bot.main.resolve_final_feed_url",
+ side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
+ ):
+ response: Response = client.get(
+ url="/webhook_entries_mass_update_preview",
+ params={
+ "webhook_url": webhook_url,
+ "replace_from": "old.example.com",
+ "replace_to": "new.example.com",
+ "resolve_urls": "true",
+ },
+ )
+
+ assert response.status_code == 200, f"Failed to get HTMX preview fragment: {response.text}"
+ assert "Will update: 2" in response.text
+ assert " None: # noqa: C901
+ """Force update should overwrite conflicting target URLs instead of skipping them."""
+
+ @dataclass(slots=True)
+ class DummyFeed:
+ url: str
+
+ class StubReader:
+ def __init__(self) -> None:
+ self._feeds = [
+ DummyFeed(url="https://old.example.com/rss/a.xml"),
+ DummyFeed(url="https://new.example.com/rss/a.xml"),
+ ]
+ self.delete_calls: list[str] = []
+ self.change_calls: list[tuple[str, str]] = []
+
+ def get_tag(self, resource: object, key: str, default: object = None) -> object:
+ if resource == () and key == "webhooks":
+ return [{"name": webhook_name, "url": webhook_url}]
+ if key == "webhook" and isinstance(resource, str):
+ return webhook_url
+ return default
+
+ def get_feeds(self) -> list[DummyFeed]:
+ return self._feeds
+
+ def delete_feed(self, feed_url: str) -> None:
+ self.delete_calls.append(feed_url)
+
+ def change_feed_url(self, old_url: str, new_url: str) -> None:
+ self.change_calls.append((old_url, new_url))
+
+ def update_feed(self, _feed_url: str) -> None:
+ return
+
+ def get_entries(self, **_kwargs: object) -> list[Entry]:
+ return []
+
+ def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001
+ return
+
+ stub_reader = StubReader()
+ app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
+ no_redirect_client = TestClient(app, follow_redirects=False)
+
+ try:
+ with patch(
+ "discord_rss_bot.main.resolve_final_feed_url",
+ side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
+ ):
+ response: Response = no_redirect_client.post(
+ url="/bulk_change_feed_urls",
+ data={
+ "webhook_url": webhook_url,
+ "replace_from": "old.example.com",
+ "replace_to": "new.example.com",
+ "resolve_urls": "true",
+ "force_update": "true",
+ },
+ )
+
+ assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}"
+ assert stub_reader.delete_calls == ["https://new.example.com/rss/a.xml"]
+ assert stub_reader.change_calls == [
+ (
+ "https://old.example.com/rss/a.xml",
+ "https://new.example.com/rss/a.xml",
+ ),
+ ]
+ assert "Force%20overwrote%201" in response.headers.get("location", "")
+ finally:
+ app.dependency_overrides = {}
+
+
+def test_bulk_change_feed_urls_force_update_ignores_resolution_error() -> None:
+ """Force update should proceed even when URL resolution returns an error (e.g. HTTP 404)."""
+
+ @dataclass(slots=True)
+ class DummyFeed:
+ url: str
+
+ class StubReader:
+ def __init__(self) -> None:
+ self._feeds = [
+ DummyFeed(url="https://old.example.com/rss/a.xml"),
+ ]
+ self.change_calls: list[tuple[str, str]] = []
+
+ def get_tag(self, resource: object, key: str, default: object = None) -> object:
+ if resource == () and key == "webhooks":
+ return [{"name": webhook_name, "url": webhook_url}]
+ if key == "webhook" and isinstance(resource, str):
+ return webhook_url
+ return default
+
+ def get_feeds(self) -> list[DummyFeed]:
+ return self._feeds
+
+ def change_feed_url(self, old_url: str, new_url: str) -> None:
+ self.change_calls.append((old_url, new_url))
+
+ def update_feed(self, _feed_url: str) -> None:
+ return
+
+ def get_entries(self, **_kwargs: object) -> list[Entry]:
+ return []
+
+ def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001
+ return
+
+ stub_reader = StubReader()
+ app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
+ no_redirect_client = TestClient(app, follow_redirects=False)
+
+ try:
+ with patch(
+ "discord_rss_bot.main.resolve_final_feed_url",
+ return_value=("https://new.example.com/rss/a.xml", "HTTP 404"),
+ ):
+ response: Response = no_redirect_client.post(
+ url="/bulk_change_feed_urls",
+ data={
+ "webhook_url": webhook_url,
+ "replace_from": "old.example.com",
+ "replace_to": "new.example.com",
+ "resolve_urls": "true",
+ "force_update": "true",
+ },
+ )
+
+ assert response.status_code == 303, f"Expected redirect, got {response.status_code}: {response.text}"
+ assert stub_reader.change_calls == [
+ (
+ "https://old.example.com/rss/a.xml",
+ "https://new.example.com/rss/a.xml",
+ ),
+ ]
+ location = response.headers.get("location", "")
+ assert "Updated%201%20feed%20URL%28s%29" in location
+ assert "Failed%200" in location
+ finally:
+ app.dependency_overrides = {}
+
+
+def test_reader_dependency_override_is_used() -> None:
+ """Reader should be injectable and overridable via FastAPI dependency overrides."""
+
+ class StubReader:
+ def get_tag(self, _resource: str, _key: str, default: str | None = None) -> str | None:
+ """Stub get_tag that always returns the default value.
+
+ Args:
+ _resource: Ignored.
+ _key: Ignored.
+ default: The value to return.
+
+ Returns:
+ The default value, simulating a missing tag.
+ """
+ return default
+
+ app.dependency_overrides[get_reader_dependency] = StubReader
+ try:
+ response: Response = client.get(url="/add")
+ assert response.status_code == 200, f"Expected /add to render with overridden reader: {response.text}"
+ finally:
+ app.dependency_overrides = {}
diff --git a/tests/test_search.py b/tests/test_search.py
index 7518963..77681cf 100644
--- a/tests/test_search.py
+++ b/tests/test_search.py
@@ -4,16 +4,18 @@ import tempfile
from pathlib import Path
from typing import TYPE_CHECKING
-from reader import Feed, Reader, make_reader
+from reader import Feed
+from reader import Reader
+from reader import make_reader
-from discord_rss_bot.search import create_html_for_search_results
+from discord_rss_bot.search import create_search_context
if TYPE_CHECKING:
from collections.abc import Iterable
-def test_create_html_for_search_results() -> None:
- """Test create_html_for_search_results."""
+def test_create_search_context() -> None:
+ """Test create_search_context."""
# Create a reader.
with tempfile.TemporaryDirectory() as temp_dir:
# Create the temp directory.
@@ -43,10 +45,9 @@ def test_create_html_for_search_results() -> None:
reader.enable_search()
reader.update_search()
- # Create the HTML and check if it is not empty.
- search_html: str = create_html_for_search_results("a", reader)
- assert search_html is not None, f"The search HTML should not be None. Got: {search_html}"
- assert len(search_html) > 10, f"The search HTML should be longer than 10 characters. Got: {len(search_html)}"
+ # Create the search context.
+ context: dict = create_search_context("test", reader=reader)
+ assert context is not None, f"The context should not be None. Got: {context}"
# Close the reader, so we can delete the directory.
reader.close()
diff --git a/tests/test_settings.py b/tests/test_settings.py
index dd5b44e..bcab720 100644
--- a/tests/test_settings.py
+++ b/tests/test_settings.py
@@ -6,7 +6,9 @@ from pathlib import Path
from reader import Reader
-from discord_rss_bot.settings import data_dir, default_custom_message, get_reader
+from discord_rss_bot.settings import data_dir
+from discord_rss_bot.settings import default_custom_message
+from discord_rss_bot.settings import get_reader
def test_reader() -> None:
@@ -20,12 +22,12 @@ def test_reader() -> None:
Path.mkdir(Path(temp_dir), exist_ok=True)
custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite")
- custom_reader: Reader = get_reader(custom_location=str(custom_loc))
- assert_msg = f"The custom reader should be an instance of Reader. But it was '{type(custom_reader)}'."
- assert isinstance(custom_reader, Reader), assert_msg
+ reader: Reader = get_reader(custom_location=str(custom_loc))
+ assert_msg = f"The custom reader should be an instance of Reader. But it was '{type(reader)}'."
+ assert isinstance(reader, Reader), assert_msg
# Close the reader, so we can delete the directory.
- custom_reader.close()
+ reader.close()
def test_data_dir() -> None:
@@ -47,16 +49,16 @@ def test_get_webhook_for_entry() -> None:
Path.mkdir(Path(temp_dir), exist_ok=True)
custom_loc: pathlib.Path = pathlib.Path(temp_dir, "custom_loc_db.sqlite")
- custom_reader: Reader = get_reader(custom_location=str(custom_loc))
+ reader: Reader = get_reader(custom_location=str(custom_loc))
# Add a feed to the database.
- custom_reader.add_feed("https://www.reddit.com/r/movies.rss")
- custom_reader.update_feed("https://www.reddit.com/r/movies.rss")
+ reader.add_feed("https://www.reddit.com/r/movies.rss")
+ reader.update_feed("https://www.reddit.com/r/movies.rss")
# Add a webhook to the database.
- custom_reader.set_tag("https://www.reddit.com/r/movies.rss", "webhook", "https://example.com") # pyright: ignore[reportArgumentType]
- our_tag = custom_reader.get_tag("https://www.reddit.com/r/movies.rss", "webhook") # pyright: ignore[reportArgumentType]
+ reader.set_tag("https://www.reddit.com/r/movies.rss", "webhook", "https://example.com") # pyright: ignore[reportArgumentType]
+ our_tag = reader.get_tag("https://www.reddit.com/r/movies.rss", "webhook") # pyright: ignore[reportArgumentType]
assert our_tag == "https://example.com", f"The tag should be 'https://example.com'. But it was '{our_tag}'."
# Close the reader, so we can delete the directory.
- custom_reader.close()
+ reader.close()
diff --git a/tests/test_update_interval.py b/tests/test_update_interval.py
new file mode 100644
index 0000000..26c5421
--- /dev/null
+++ b/tests/test_update_interval.py
@@ -0,0 +1,101 @@
+from __future__ import annotations
+
+import urllib.parse
+from typing import TYPE_CHECKING
+
+from fastapi.testclient import TestClient
+
+from discord_rss_bot.main import app
+
+if TYPE_CHECKING:
+ from httpx import Response
+
+client: TestClient = TestClient(app)
+webhook_name: str = "Test Webhook for Update Interval"
+webhook_url: str = "https://discord.com/api/webhooks/1234567890/test_update_interval"
+feed_url: str = "https://lovinator.space/rss_test.xml"
+
+
+def test_global_update_interval() -> None:
+ """Test setting the global update interval."""
+ # Set global update interval to 30 minutes
+ response: Response = client.post("/set_global_update_interval", data={"interval_minutes": "30"})
+ assert response.status_code == 200, f"Failed to set global interval: {response.text}"
+
+ # Check that the settings page shows the new interval
+ response = client.get("/settings")
+ assert response.status_code == 200, f"Failed to get settings page: {response.text}"
+ assert "30" in response.text, "Global interval not updated on settings page"
+
+
+def test_per_feed_update_interval() -> None:
+ """Test setting per-feed update interval."""
+ # Clean up any existing feed/webhook
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ client.post(url="/remove", data={"feed_url": feed_url})
+
+ # Add webhook and feed
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
+ response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
+ assert response.status_code == 200, f"Failed to add feed: {response.text}"
+
+ # Set feed-specific update interval to 15 minutes
+ response = client.post("/set_update_interval", data={"feed_url": feed_url, "interval_minutes": "15"})
+ assert response.status_code == 200, f"Failed to set feed interval: {response.text}"
+
+ # Check that the feed page shows the custom interval
+ encoded_url = urllib.parse.quote(feed_url)
+ response = client.get(f"/feed?feed_url={encoded_url}")
+ assert response.status_code == 200, f"Failed to get feed page: {response.text}"
+ assert "15" in response.text, "Feed interval not displayed on feed page"
+ assert "Custom" in response.text, "Custom badge not shown for feed-specific interval"
+
+
+def test_reset_feed_update_interval() -> None:
+ """Test resetting feed update interval to global default."""
+ # Ensure feed/webhook setup exists regardless of test order
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
+ client.post(url="/remove", data={"feed_url": feed_url})
+
+ response: Response = client.post(
+ url="/add_webhook",
+ data={"webhook_name": webhook_name, "webhook_url": webhook_url},
+ )
+ assert response.status_code == 200, f"Failed to add webhook: {response.text}"
+
+ response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
+ assert response.status_code == 200, f"Failed to add feed: {response.text}"
+
+ # First set a custom interval
+ response = client.post("/set_update_interval", data={"feed_url": feed_url, "interval_minutes": "15"})
+ assert response.status_code == 200, f"Failed to set feed interval: {response.text}"
+
+ # Reset to global default
+ response = client.post("/reset_update_interval", data={"feed_url": feed_url})
+ assert response.status_code == 200, f"Failed to reset feed interval: {response.text}"
+
+ # Check that the feed page shows global default
+ encoded_url = urllib.parse.quote(feed_url)
+ response = client.get(f"/feed?feed_url={encoded_url}")
+ assert response.status_code == 200, f"Failed to get feed page: {response.text}"
+ assert "Using global default" in response.text, "Global default badge not shown after reset"
+
+
+def test_update_interval_validation() -> None:
+ """Test that update interval validation works."""
+ # Try to set an interval below minimum (should be clamped to 1)
+ response: Response = client.post("/set_global_update_interval", data={"interval_minutes": "0"})
+ assert response.status_code == 200, f"Failed to handle minimum interval: {response.text}"
+
+ # Try to set an interval above maximum (should be clamped to 10080)
+ response = client.post("/set_global_update_interval", data={"interval_minutes": "20000"})
+ assert response.status_code == 200, f"Failed to handle maximum interval: {response.text}"
+
+ # Clean up
+ client.post(url="/remove", data={"feed_url": feed_url})
+ client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
diff --git a/tests/test_utils.py b/tests/test_utils.py
index 0bccb6b..d4ee2ae 100644
--- a/tests/test_utils.py
+++ b/tests/test_utils.py
@@ -1,5 +1,6 @@
from __future__ import annotations
+from discord_rss_bot.filter.utils import is_regex_match
from discord_rss_bot.filter.utils import is_word_in_text
@@ -14,3 +15,51 @@ def test_is_word_in_text() -> None:
assert is_word_in_text("Alert,Forma", "Outbreak - Mutagen Mass - Rhea (Saturn)") is False, msg_false
assert is_word_in_text("Alert,Forma", "Outbreak - Mutagen Mass - Rhea (Saturn)") is False, msg_false
assert is_word_in_text("word1,word2", "This is a sample text containing none of the words.") is False, msg_false
+
+
+def test_is_regex_match() -> None:
+ msg_true = "Should return True"
+ msg_false = "Should return False"
+
+ # Test basic regex patterns
+ assert is_regex_match(r"word\d+", "This text contains word123") is True, msg_true
+ assert is_regex_match(r"^Hello", "Hello world") is True, msg_true
+ assert is_regex_match(r"world$", "Hello world") is True, msg_true
+
+ # Test case insensitivity
+ assert is_regex_match(r"hello", "This text contains HELLO") is True, msg_true
+
+ # Test comma-separated patterns
+ assert is_regex_match(r"pattern1,pattern2", "This contains pattern2") is True, msg_true
+ assert is_regex_match(r"pattern1, pattern2", "This contains pattern1") is True, msg_true
+
+ # Test regex that shouldn't match
+ assert is_regex_match(r"^start", "This doesn't start with the pattern") is False, msg_false
+ assert is_regex_match(r"end$", "This doesn't end with the pattern") is False, msg_false
+
+ # Test with empty input
+ assert is_regex_match("", "Some text") is False, msg_false
+ assert is_regex_match("pattern", "") is False, msg_false
+
+ # Test with invalid regex (should not raise an exception and return False)
+ assert is_regex_match(r"[incomplete", "Some text") is False, msg_false
+
+ # Test with multiple patterns where one is invalid
+ assert is_regex_match(r"valid, [invalid, \w+", "Contains word") is True, msg_true
+
+ # Test newline-separated patterns
+ newline_patterns = "pattern1\n^start\ncontains\\d+"
+ assert is_regex_match(newline_patterns, "This contains123 text") is True, msg_true
+ assert is_regex_match(newline_patterns, "start of line") is True, msg_true
+ assert is_regex_match(newline_patterns, "pattern1 is here") is True, msg_true
+ assert is_regex_match(newline_patterns, "None of these match") is False, msg_false
+
+ # Test mixed newline and comma patterns (for backward compatibility)
+ mixed_patterns = "pattern1\npattern2,pattern3\npattern4"
+ assert is_regex_match(mixed_patterns, "Contains pattern3") is True, msg_true
+ assert is_regex_match(mixed_patterns, "Contains pattern4") is True, msg_true
+
+ # Test with empty lines and spaces
+ whitespace_patterns = "\\s+\n \n\npattern\n\n"
+ assert is_regex_match(whitespace_patterns, "text with spaces") is True, msg_true
+ assert is_regex_match(whitespace_patterns, "text with pattern") is True, msg_true
diff --git a/tests/test_whitelist.py b/tests/test_whitelist.py
index cf39aa0..6e911fe 100644
--- a/tests/test_whitelist.py
+++ b/tests/test_whitelist.py
@@ -4,9 +4,13 @@ import tempfile
from pathlib import Path
from typing import TYPE_CHECKING
-from reader import Entry, Feed, Reader, make_reader
+from reader import Entry
+from reader import Feed
+from reader import Reader
+from reader import make_reader
-from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent
+from discord_rss_bot.filter.whitelist import has_white_tags
+from discord_rss_bot.filter.whitelist import should_be_sent
if TYPE_CHECKING:
from collections.abc import Iterable
@@ -33,11 +37,18 @@ def test_has_white_tags() -> None:
reader.update_feeds()
# Test feed without any whitelist tags
- assert has_white_tags(custom_reader=get_reader(), feed=feed) is False, "Feed should not have any whitelist tags"
+ assert has_white_tags(reader=get_reader(), feed=feed) is False, "Feed should not have any whitelist tags"
check_if_has_tag(reader, feed, "whitelist_title")
check_if_has_tag(reader, feed, "whitelist_summary")
check_if_has_tag(reader, feed, "whitelist_content")
+ check_if_has_tag(reader, feed, "whitelist_author")
+
+ # Test regex whitelist tags
+ check_if_has_tag(reader, feed, "regex_whitelist_title")
+ check_if_has_tag(reader, feed, "regex_whitelist_summary")
+ check_if_has_tag(reader, feed, "regex_whitelist_content")
+ check_if_has_tag(reader, feed, "regex_whitelist_author")
# Clean up
reader.delete_feed(feed_url)
@@ -45,9 +56,9 @@ def test_has_white_tags() -> None:
def check_if_has_tag(reader: Reader, feed: Feed, whitelist_name: str) -> None:
reader.set_tag(feed, whitelist_name, "a") # pyright: ignore[reportArgumentType]
- assert has_white_tags(custom_reader=reader, feed=feed) is True, "Feed should have whitelist tags"
+ assert has_white_tags(reader=reader, feed=feed) is True, "Feed should have whitelist tags"
reader.delete_tag(feed, whitelist_name)
- assert has_white_tags(custom_reader=reader, feed=feed) is False, "Feed should not have any whitelist tags"
+ assert has_white_tags(reader=reader, feed=feed) is False, "Feed should not have any whitelist tags"
def test_should_be_sent() -> None:
@@ -109,3 +120,67 @@ def test_should_be_sent() -> None:
assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
reader.delete_tag(feed, "whitelist_author")
assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
+
+
+def test_regex_should_be_sent() -> None:
+ """Test the regex filtering functionality for whitelist."""
+ reader: Reader = get_reader()
+
+ # Add feed and update entries
+ reader.add_feed(feed_url)
+ feed: Feed = reader.get_feed(feed_url)
+ reader.update_feeds()
+
+ # Get first entry
+ first_entry: list[Entry] = []
+ entries: Iterable[Entry] = reader.get_entries(feed=feed)
+ assert entries is not None, "Entries should not be None"
+ for entry in entries:
+ first_entry.append(entry)
+ break
+ assert len(first_entry) == 1, "First entry should be added"
+
+ # Test entry without any regex whitelists
+ assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
+
+ # Test regex whitelist for title
+ reader.set_tag(feed, "regex_whitelist_title", r"fvnnn\w+") # pyright: ignore[reportArgumentType]
+ assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with regex title match"
+ reader.delete_tag(feed, "regex_whitelist_title")
+ assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
+
+ # Test regex whitelist for summary
+ reader.set_tag(feed, "regex_whitelist_summary", r"ffdnfdn\w+") # pyright: ignore[reportArgumentType]
+ assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with regex summary match"
+ reader.delete_tag(feed, "regex_whitelist_summary")
+ assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
+
+ # Test regex whitelist for content
+ reader.set_tag(feed, "regex_whitelist_content", r"ffdnfdnfdn\w+") # pyright: ignore[reportArgumentType]
+ assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with regex content match"
+ reader.delete_tag(feed, "regex_whitelist_content")
+ assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
+
+ # Test regex whitelist for author
+ reader.set_tag(feed, "regex_whitelist_author", r"TheLovinator\d*") # pyright: ignore[reportArgumentType]
+ assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with regex author match"
+ reader.delete_tag(feed, "regex_whitelist_author")
+ assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
+
+ # Test invalid regex pattern (should not raise an exception)
+ reader.set_tag(feed, "regex_whitelist_title", r"[incomplete") # pyright: ignore[reportArgumentType]
+ assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent with invalid regex"
+ reader.delete_tag(feed, "regex_whitelist_title")
+
+ # Test multiple regex patterns separated by commas
+ reader.set_tag(feed, "regex_whitelist_author", r"pattern1,TheLovinator\d*,pattern3") # pyright: ignore[reportArgumentType]
+ assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with one matching pattern in list"
+ reader.delete_tag(feed, "regex_whitelist_author")
+ assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
+
+ # Test newline-separated regex patterns
+ newline_patterns = "pattern1\nTheLovinator\\d*\npattern3"
+ reader.set_tag(feed, "regex_whitelist_author", newline_patterns) # pyright: ignore[reportArgumentType]
+ assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with newline-separated patterns"
+ reader.delete_tag(feed, "regex_whitelist_author")
+ assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"