Compare commits
2 commits
b82ec01570
...
459df727a9
| Author | SHA1 | Date | |
|---|---|---|---|
|
459df727a9 |
|||
|
9183d7ddec |
7 changed files with 425 additions and 39 deletions
|
|
@ -79,6 +79,21 @@ type SentWebhookRecord = dict[str, JsonValue]
|
|||
type UpdateCallback = Callable[[], UpdatedFeed | None]
|
||||
|
||||
|
||||
class FeedUpdateError(HTTPException):
|
||||
"""Raised when the initial update for a newly added feed fails."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
status_code: int,
|
||||
detail: str,
|
||||
autodiscover_links: object | None = None,
|
||||
) -> None:
|
||||
"""Initialize an initial-update error and preserve advertised feed links."""
|
||||
super().__init__(status_code=status_code, detail=detail)
|
||||
self.autodiscover_links = autodiscover_links
|
||||
|
||||
|
||||
class JsonResponseLike(Protocol):
|
||||
"""Response interface needed for Discord webhook JSON parsing."""
|
||||
|
||||
|
|
@ -1890,6 +1905,22 @@ def truncate_webhook_message(webhook_message: str) -> str:
|
|||
return webhook_message
|
||||
|
||||
|
||||
def get_raw_autodiscover_links(reader: Reader, feed_url: str) -> object | None:
|
||||
"""Return advertised feed links stored after a failed initial update."""
|
||||
try:
|
||||
return reader.get_tag(feed_url, ".reader.autodiscover", None)
|
||||
except ReaderError:
|
||||
return None
|
||||
|
||||
|
||||
def remove_invalid_new_feed(reader: Reader, feed_url: str) -> None:
|
||||
"""Remove a feed inserted before its initial update failed."""
|
||||
try:
|
||||
reader.delete_feed(feed_url)
|
||||
except ReaderError:
|
||||
logger.exception("Failed to remove invalid feed after initial update: %s", feed_url)
|
||||
|
||||
|
||||
def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None: # noqa: C901, PLR0912
|
||||
"""Add a new feed, update it and mark every entry as read.
|
||||
|
||||
|
|
@ -1899,10 +1930,12 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None:
|
|||
webhook_dropdown: The webhook we should send entries to.
|
||||
|
||||
Raises:
|
||||
FeedUpdateError: If the initial feed update fails.
|
||||
HTTPException: If webhook_dropdown does not equal a webhook or default_custom_message not found.
|
||||
"""
|
||||
clean_feed_url: str = feed_url.strip()
|
||||
webhook_url: str = ""
|
||||
feed_was_added: bool = False
|
||||
if hooks := reader.get_tag((), "webhooks", []):
|
||||
# Get the webhook URL from the dropdown.
|
||||
for hook in hooks:
|
||||
|
|
@ -1919,6 +1952,7 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None:
|
|||
|
||||
try:
|
||||
reader.add_feed(clean_feed_url)
|
||||
feed_was_added = True
|
||||
except FeedExistsError:
|
||||
# Add the webhook to an already added feed if it doesn't have a webhook instead of trying to create a new.
|
||||
if not reader.get_tag(clean_feed_url, "webhook", ""):
|
||||
|
|
@ -1929,7 +1963,16 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None:
|
|||
try:
|
||||
reader.update_feed(clean_feed_url)
|
||||
except ReaderError as e:
|
||||
raise HTTPException(status_code=404, detail=f"Error updating feed: {e}") from e
|
||||
autodiscover_links = get_raw_autodiscover_links(reader, clean_feed_url)
|
||||
|
||||
if feed_was_added:
|
||||
remove_invalid_new_feed(reader, clean_feed_url)
|
||||
|
||||
raise FeedUpdateError(
|
||||
status_code=404,
|
||||
detail=f"Error updating feed: {e}",
|
||||
autodiscover_links=autodiscover_links,
|
||||
) from e
|
||||
|
||||
# Mark every entry as read, so we don't send all the old entries to Discord.
|
||||
entries: Iterable[Entry] = reader.get_entries(feed=clean_feed_url, read=False)
|
||||
|
|
|
|||
|
|
@ -52,6 +52,7 @@ from discord_rss_bot.custom_message import get_embed
|
|||
from discord_rss_bot.custom_message import get_first_image
|
||||
from discord_rss_bot.custom_message import replace_tags_in_text_message
|
||||
from discord_rss_bot.custom_message import save_embed
|
||||
from discord_rss_bot.feeds import FeedUpdateError
|
||||
from discord_rss_bot.feeds import SentWebhookRecord
|
||||
from discord_rss_bot.feeds import coerce_media_gallery_image_limit
|
||||
from discord_rss_bot.feeds import create_feed
|
||||
|
|
@ -122,6 +123,12 @@ class FilterPreviewContext(TypedDict):
|
|||
preview_helper_text: str
|
||||
|
||||
|
||||
class AutodiscoverLink(TypedDict):
|
||||
href: str
|
||||
type: str | None
|
||||
title: str | None
|
||||
|
||||
|
||||
LOGGING_CONFIG = {
|
||||
"version": 1,
|
||||
"disable_existing_loggers": False,
|
||||
|
|
@ -262,6 +269,63 @@ templates.env.globals["get_backup_path"] = get_backup_path # pyright: ignore[re
|
|||
templates.env.globals["has_webhooks"] = has_webhooks # pyright: ignore[reportArgumentType]
|
||||
|
||||
|
||||
def get_global_delivery_mode(reader: Reader) -> str:
|
||||
"""Return the normalized default delivery mode for new feeds.
|
||||
|
||||
Args:
|
||||
reader: The Reader instance.
|
||||
|
||||
Returns:
|
||||
The configured delivery mode, falling back to embed.
|
||||
"""
|
||||
global_delivery_mode: str = str(reader.get_tag((), "delivery_mode", "embed")).strip().lower()
|
||||
return global_delivery_mode if global_delivery_mode in {"embed", "text"} else "embed"
|
||||
|
||||
|
||||
def get_autodiscover_links(
|
||||
reader: Reader,
|
||||
feed_url: str,
|
||||
stored_links: object | None = None,
|
||||
) -> list[AutodiscoverLink]:
|
||||
"""Return valid autodiscovered links stored for a failed feed update.
|
||||
|
||||
Args:
|
||||
reader: The Reader instance.
|
||||
feed_url: The URL that failed to parse as a feed.
|
||||
stored_links: Optional advertised links preserved before deleting an invalid feed.
|
||||
|
||||
Returns:
|
||||
Valid discovered feed link dictionaries.
|
||||
"""
|
||||
if stored_links is None:
|
||||
try:
|
||||
stored_links = reader.get_tag(feed_url, ".reader.autodiscover", [])
|
||||
except ReaderError:
|
||||
return []
|
||||
|
||||
if not isinstance(stored_links, list):
|
||||
return []
|
||||
|
||||
links: list[AutodiscoverLink] = []
|
||||
for stored_link in stored_links:
|
||||
if not isinstance(stored_link, dict):
|
||||
continue
|
||||
|
||||
href = stored_link.get("href")
|
||||
if not isinstance(href, str) or not href:
|
||||
continue
|
||||
|
||||
link_type = stored_link.get("type")
|
||||
title = stored_link.get("title")
|
||||
links.append({
|
||||
"href": href,
|
||||
"type": link_type if isinstance(link_type, str) else None,
|
||||
"title": title if isinstance(title, str) else None,
|
||||
})
|
||||
|
||||
return links
|
||||
|
||||
|
||||
@app.post("/add_webhook")
|
||||
async def post_add_webhook(
|
||||
webhook_name: Annotated[str, Form()],
|
||||
|
|
@ -357,24 +421,51 @@ async def post_delete_webhook(
|
|||
return RedirectResponse(url="/", status_code=303)
|
||||
|
||||
|
||||
@app.post("/add")
|
||||
@app.post("/add", response_model=None)
|
||||
async def post_create_feed(
|
||||
request: Request,
|
||||
feed_url: Annotated[str, Form()],
|
||||
webhook_dropdown: Annotated[str, Form()],
|
||||
reader: Annotated[Reader, Depends(get_reader_dependency)],
|
||||
) -> RedirectResponse:
|
||||
) -> RedirectResponse | HTMLResponse:
|
||||
"""Add a feed to the database.
|
||||
|
||||
Args:
|
||||
request: The request object.
|
||||
feed_url: The feed to add.
|
||||
webhook_dropdown: The webhook to use.
|
||||
reader: The Reader instance.
|
||||
|
||||
Returns:
|
||||
RedirectResponse: Redirect to the feed page.
|
||||
|
||||
Raises:
|
||||
FeedUpdateError: If updating the feed fails without discovered feed links.
|
||||
"""
|
||||
clean_feed_url: str = feed_url.strip()
|
||||
create_feed(reader, feed_url, webhook_dropdown)
|
||||
try:
|
||||
create_feed(reader, feed_url, webhook_dropdown)
|
||||
except FeedUpdateError as exception:
|
||||
autodiscover_links = get_autodiscover_links(reader, clean_feed_url, exception.autodiscover_links)
|
||||
if not autodiscover_links:
|
||||
raise
|
||||
|
||||
context = {
|
||||
"request": request,
|
||||
"webhooks": reader.get_tag((), "webhooks", []),
|
||||
"global_delivery_mode": get_global_delivery_mode(reader),
|
||||
"feed_url": clean_feed_url,
|
||||
"selected_webhook": webhook_dropdown,
|
||||
"messages": exception.detail,
|
||||
"autodiscover_links": autodiscover_links,
|
||||
}
|
||||
return templates.TemplateResponse(
|
||||
request=request,
|
||||
name="add.html",
|
||||
context=context,
|
||||
status_code=exception.status_code,
|
||||
)
|
||||
|
||||
commit_state_change(reader, f"Add feed {clean_feed_url}")
|
||||
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
|
||||
|
||||
|
|
@ -1609,14 +1700,10 @@ def get_add(
|
|||
Returns:
|
||||
HTMLResponse: The add feed page.
|
||||
"""
|
||||
global_delivery_mode: str = str(reader.get_tag((), "delivery_mode", "embed")).strip().lower()
|
||||
if global_delivery_mode not in {"embed", "text"}:
|
||||
global_delivery_mode = "embed"
|
||||
|
||||
context = {
|
||||
"request": request,
|
||||
"webhooks": reader.get_tag((), "webhooks", []),
|
||||
"global_delivery_mode": global_delivery_mode,
|
||||
"global_delivery_mode": get_global_delivery_mode(reader),
|
||||
}
|
||||
return templates.TemplateResponse(request=request, name="add.html", context=context)
|
||||
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ from __future__ import annotations
|
|||
import os
|
||||
import typing
|
||||
from functools import lru_cache
|
||||
from importlib.util import find_spec
|
||||
from pathlib import Path
|
||||
|
||||
from platformdirs import user_data_dir
|
||||
|
|
@ -31,6 +32,31 @@ default_custom_embed: dict[str, str] = {
|
|||
}
|
||||
|
||||
|
||||
def has_plugin(plugin_name: str) -> bool:
|
||||
"""Return whether the installed reader version provides a built-in plugin.
|
||||
|
||||
We started using .autodiscover, but that is from Reader version 3.25.
|
||||
"""
|
||||
try:
|
||||
return find_spec(f"reader.plugins.{plugin_name.removeprefix('.')}") is not None
|
||||
except ModuleNotFoundError:
|
||||
return False
|
||||
|
||||
|
||||
def make_app_reader(db_location: Path) -> Reader:
|
||||
"""Create a reader with plugins supported by the installed reader version.
|
||||
|
||||
Returns:
|
||||
The configured reader.
|
||||
"""
|
||||
plugins_we_want = (".ua_fallback", ".autodiscover")
|
||||
plugins: list[str] = [name for name in plugins_we_want if has_plugin(name)]
|
||||
|
||||
if plugins:
|
||||
return make_reader(url=str(db_location), plugins=plugins)
|
||||
return make_reader(url=str(db_location))
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def get_reader(custom_location: Path | None = None) -> Reader:
|
||||
"""Get the reader.
|
||||
|
|
@ -42,7 +68,7 @@ def get_reader(custom_location: Path | None = None) -> Reader:
|
|||
The reader.
|
||||
"""
|
||||
db_location: Path = custom_location or Path(data_dir) / "db.sqlite"
|
||||
reader: Reader = make_reader(url=str(db_location))
|
||||
reader: Reader = make_app_reader(db_location)
|
||||
|
||||
# https://reader.readthedocs.io/en/latest/api.html#reader.types.UpdateConfig
|
||||
# Set the default update interval to 15 minutes if not already configured
|
||||
|
|
|
|||
|
|
@ -8,11 +8,6 @@
|
|||
{% block content %}
|
||||
<div class="p-2 border border-dark">
|
||||
<form action="/add" method="post">
|
||||
<div class="mb-3 text-muted">
|
||||
New feeds currently default to
|
||||
<strong>{{ global_delivery_mode }}</strong>
|
||||
delivery mode.
|
||||
</div>
|
||||
<!-- Feed URL -->
|
||||
<div class="row pb-2">
|
||||
<label for="feed_url" class="col-sm-2 col-form-label">Feed URL</label>
|
||||
|
|
@ -20,7 +15,8 @@
|
|||
<input name="feed_url"
|
||||
type="text"
|
||||
class="form-control bg-dark border-dark text-muted"
|
||||
id="feed_url" />
|
||||
id="feed_url"
|
||||
value="{{ feed_url }}" />
|
||||
</div>
|
||||
</div>
|
||||
<!-- Webhook dropdown -->
|
||||
|
|
@ -30,8 +26,11 @@
|
|||
<select class="col-auto form-select bg-dark border-dark text-muted"
|
||||
id="webhook_dropdown"
|
||||
name="webhook_dropdown">
|
||||
<option selected>Choose webhook...</option>
|
||||
{% for hook in webhooks %}<option value="{{ hook.name }}">{{- hook.name -}}</option>{% endfor %}
|
||||
<option {% if not selected_webhook %}selected{% endif %}>Choose webhook...</option>
|
||||
{% for hook in webhooks %}
|
||||
<option value="{{ hook.name }}"
|
||||
{% if hook.name == selected_webhook %}selected{% endif %}>{{ hook.name }}</option>
|
||||
{% endfor %}
|
||||
</select>
|
||||
</div>
|
||||
</div>
|
||||
|
|
@ -41,4 +40,28 @@
|
|||
</div>
|
||||
</form>
|
||||
</div>
|
||||
{% if autodiscover_links %}
|
||||
<section class="card border border-dark shadow-sm text-light rounded-0 mt-3">
|
||||
<div class="card-body p-3 p-md-4">
|
||||
<h2 class="h6 text-uppercase text-muted mb-2">Discovered feed links</h2>
|
||||
<p class="text-muted mb-3">The submitted URL is not a feed. Choose one of the feeds advertised by that page.</p>
|
||||
<div class="d-flex flex-column gap-2">
|
||||
{% for link in autodiscover_links %}
|
||||
<form action="/add"
|
||||
method="post"
|
||||
class="d-flex flex-column flex-md-row align-items-md-center gap-3 p-3 border border-dark rounded-0">
|
||||
<input type="hidden" name="feed_url" value="{{ link.href }}" />
|
||||
<input type="hidden" name="webhook_dropdown" value="{{ selected_webhook }}" />
|
||||
<div class="flex-grow-1 feed-page__content">
|
||||
{% if link.title %}<div class="text-light fw-semibold">{{ link.title }}</div>{% endif %}
|
||||
<code class="d-block text-muted feed-page__wrap">{{ link.href }}</code>
|
||||
{% if link.type %}<div class="small text-muted mt-1">{{ link.type }}</div>{% endif %}
|
||||
</div>
|
||||
<button class="btn btn-outline-light btn-sm ms-md-auto" type="submit">Add feed</button>
|
||||
</form>
|
||||
{% endfor %}
|
||||
</div>
|
||||
</div>
|
||||
</section>
|
||||
{% endif %}
|
||||
{% endblock content %}
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ from unittest.mock import patch
|
|||
import pytest
|
||||
from reader import EntryNotFoundError
|
||||
from reader import Feed
|
||||
from reader import FeedExistsError
|
||||
from reader import FeedNotFoundError
|
||||
from reader import Reader
|
||||
from reader import StorageError
|
||||
|
|
@ -448,6 +449,38 @@ def test_create_feed_falls_back_to_embed_when_global_delivery_mode_is_invalid()
|
|||
reader.set_tag.assert_any_call("https://example.com/feed.xml", "should_send_embed", True)
|
||||
|
||||
|
||||
def test_create_feed_removes_new_feed_when_initial_update_fails() -> None:
|
||||
feed_url = "https://example.com/not-a-feed"
|
||||
autodiscover_links = [{"href": "https://example.com/feed.xml", "type": "application/rss+xml"}]
|
||||
reader = MagicMock()
|
||||
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
|
||||
"webhooks": [{"name": "Main", "url": "https://discord.com/api/webhooks/123/abc"}],
|
||||
".reader.autodiscover": autodiscover_links,
|
||||
}.get(key, default)
|
||||
reader.update_feed.side_effect = StorageError("invalid feed")
|
||||
|
||||
with pytest.raises(feeds.FeedUpdateError) as exc_info:
|
||||
create_feed(reader, feed_url, "Main")
|
||||
|
||||
reader.delete_feed.assert_called_once_with(feed_url)
|
||||
assert exc_info.value.autodiscover_links == autodiscover_links
|
||||
|
||||
|
||||
def test_create_feed_does_not_remove_existing_feed_when_update_fails() -> None:
|
||||
feed_url = "https://example.com/existing-feed.xml"
|
||||
reader = MagicMock()
|
||||
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
|
||||
"webhooks": [{"name": "Main", "url": "https://discord.com/api/webhooks/123/abc"}],
|
||||
}.get(key, default)
|
||||
reader.add_feed.side_effect = FeedExistsError(feed_url)
|
||||
reader.update_feed.side_effect = StorageError("temporary failure")
|
||||
|
||||
with pytest.raises(feeds.FeedUpdateError):
|
||||
create_feed(reader, feed_url, "Main")
|
||||
|
||||
reader.delete_feed.assert_not_called()
|
||||
|
||||
|
||||
@patch("discord_rss_bot.feeds.capture_full_page_screenshot")
|
||||
@patch("discord_rss_bot.feeds.DiscordWebhook")
|
||||
def test_create_screenshot_webhook_adds_image_file(
|
||||
|
|
|
|||
|
|
@ -188,6 +188,54 @@ def test_create_feed() -> None:
|
|||
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
|
||||
|
||||
|
||||
def test_create_feed_suggests_autodiscovered_links() -> None:
|
||||
"""A page URL that fails to parse should render its advertised feed links."""
|
||||
submitted_url = "https://example.com/blog"
|
||||
discovered_url = "https://example.com/rss.xml"
|
||||
stub_reader = MagicMock()
|
||||
|
||||
def get_tag(resource: str | tuple[()], key: str, default: TestTagValue = None) -> TestTagValue:
|
||||
if resource == () and key == "webhooks":
|
||||
return [{"name": webhook_name, "url": webhook_url}]
|
||||
if resource == () and key == "delivery_mode":
|
||||
return "embed"
|
||||
return default
|
||||
|
||||
stub_reader.get_tag.side_effect = get_tag
|
||||
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
|
||||
|
||||
try:
|
||||
with patch.object(
|
||||
main_module,
|
||||
"create_feed",
|
||||
side_effect=feeds.FeedUpdateError(
|
||||
status_code=404,
|
||||
detail="Error updating feed",
|
||||
autodiscover_links=[
|
||||
{
|
||||
"href": discovered_url,
|
||||
"title": "Example feed",
|
||||
"type": "application/rss+xml",
|
||||
}
|
||||
],
|
||||
),
|
||||
):
|
||||
response: Response = client.post(
|
||||
url="/add",
|
||||
data={"feed_url": submitted_url, "webhook_dropdown": webhook_name},
|
||||
)
|
||||
finally:
|
||||
app.dependency_overrides = {}
|
||||
|
||||
assert response.status_code == 404
|
||||
assert "Discovered feed links" in response.text
|
||||
assert "Example feed" in response.text
|
||||
assert discovered_url in response.text
|
||||
assert "application/rss+xml" in response.text
|
||||
assert f'value="{submitted_url}"' in response.text
|
||||
assert f'value="{webhook_name}"' in response.text
|
||||
|
||||
|
||||
def test_get() -> None:
|
||||
"""Test the /create_feed page."""
|
||||
# Ensure webhook exists for this test regardless of test order.
|
||||
|
|
@ -572,7 +620,6 @@ def test_add_page_shows_global_default_delivery_mode_hint() -> None:
|
|||
|
||||
response = client.get(url="/add")
|
||||
assert response.status_code == 200, f"/add failed: {response.text}"
|
||||
assert "New feeds currently default to" in response.text
|
||||
assert "text" in response.text
|
||||
|
||||
|
||||
|
|
@ -995,31 +1042,23 @@ def test_change_feed_url_nonexistent_old_url_returns_404() -> None:
|
|||
|
||||
def test_change_feed_url_new_url_already_exists_returns_409() -> None:
|
||||
"""Changing to a URL that is already tracked should return HTTP 409."""
|
||||
second_feed_url = "https://lovinator.space/rss_test_small.xml"
|
||||
second_feed_url = "https://example.com/existing.xml"
|
||||
|
||||
# Ensure both feeds are absent.
|
||||
client.post(url="/remove", data={"feed_url": feed_url})
|
||||
client.post(url="/remove", data={"feed_url": second_feed_url})
|
||||
class StubReader:
|
||||
def change_feed_url(self, _old_url: str, new_url: str) -> None:
|
||||
raise feeds.FeedExistsError(new_url)
|
||||
|
||||
# Ensure webhook exists.
|
||||
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
|
||||
client.post(url="/add_webhook", data={"webhook_name": webhook_name, "webhook_url": webhook_url})
|
||||
app.dependency_overrides[get_reader_dependency] = StubReader
|
||||
try:
|
||||
response: Response = client.post(
|
||||
url="/change_feed_url",
|
||||
data={"old_feed_url": feed_url, "new_feed_url": second_feed_url},
|
||||
)
|
||||
finally:
|
||||
app.dependency_overrides = {}
|
||||
|
||||
# Add both feeds.
|
||||
client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
|
||||
client.post(url="/add", data={"feed_url": second_feed_url, "webhook_dropdown": webhook_name})
|
||||
|
||||
# Try to rename one to the other.
|
||||
response: Response = client.post(
|
||||
url="/change_feed_url",
|
||||
data={"old_feed_url": feed_url, "new_feed_url": second_feed_url},
|
||||
)
|
||||
assert response.status_code == 409, f"Expected 409 when new URL already exists, got {response.status_code}"
|
||||
|
||||
# Cleanup.
|
||||
client.post(url="/remove", data={"feed_url": feed_url})
|
||||
client.post(url="/remove", data={"feed_url": second_feed_url})
|
||||
|
||||
|
||||
def test_change_feed_url_same_url_redirects_without_error() -> None:
|
||||
"""Changing a feed's URL to itself should redirect cleanly without any error."""
|
||||
|
|
|
|||
|
|
@ -2,13 +2,62 @@ from __future__ import annotations
|
|||
|
||||
import pathlib
|
||||
import tempfile
|
||||
from contextlib import closing
|
||||
from contextlib import contextmanager
|
||||
from http.server import BaseHTTPRequestHandler
|
||||
from http.server import ThreadingHTTPServer
|
||||
from pathlib import Path
|
||||
from threading import Thread
|
||||
from typing import TYPE_CHECKING
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
from reader import ParseError
|
||||
from reader import Reader
|
||||
|
||||
import discord_rss_bot.settings as settings_module
|
||||
from discord_rss_bot.settings import data_dir
|
||||
from discord_rss_bot.settings import default_custom_message
|
||||
from discord_rss_bot.settings import get_reader
|
||||
from discord_rss_bot.settings import has_plugin
|
||||
from discord_rss_bot.settings import make_app_reader
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterator
|
||||
|
||||
|
||||
class _AutodiscoverHandler(BaseHTTPRequestHandler):
|
||||
"""Serve an HTML page that advertises an RSS feed."""
|
||||
|
||||
def do_GET(self) -> None:
|
||||
"""Return HTML instead of a feed so reader attempts autodiscovery."""
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "text/html")
|
||||
self.end_headers()
|
||||
self.wfile.write(
|
||||
b'<html><head><link rel="alternate" href="/rss.xml" '
|
||||
b'type="application/rss+xml" title="Example"></head></html>'
|
||||
)
|
||||
|
||||
def log_message(self, _format: str, *_args: object) -> None:
|
||||
"""Suppress HTTP request logging during tests."""
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _serve_autodiscover_html() -> Iterator[str]:
|
||||
"""Serve an HTML page URL while the context is active.
|
||||
|
||||
Yields:
|
||||
The URL of the HTML page.
|
||||
"""
|
||||
with ThreadingHTTPServer(("127.0.0.1", 0), _AutodiscoverHandler) as server:
|
||||
server_thread = Thread(target=server.serve_forever, daemon=True)
|
||||
server_thread.start()
|
||||
try:
|
||||
yield f"http://127.0.0.1:{server.server_port}/"
|
||||
finally:
|
||||
server.shutdown()
|
||||
server_thread.join()
|
||||
|
||||
|
||||
def test_reader() -> None:
|
||||
|
|
@ -141,3 +190,89 @@ def test_get_reader_preserves_existing_global_delivery_mode() -> None:
|
|||
|
||||
second_reader.close()
|
||||
get_reader.cache_clear()
|
||||
|
||||
|
||||
def test_get_reader_enables_autodiscover_plugin() -> None:
|
||||
"""get_reader should store advertised feed links when HTML parsing fails."""
|
||||
get_reader.cache_clear()
|
||||
|
||||
try:
|
||||
with (
|
||||
tempfile.TemporaryDirectory() as temp_dir,
|
||||
_serve_autodiscover_html() as feed_url,
|
||||
closing(get_reader(custom_location=Path(temp_dir, "autodiscover_db.sqlite"))) as reader,
|
||||
):
|
||||
reader.add_feed(feed_url)
|
||||
|
||||
with pytest.raises(ParseError):
|
||||
reader.update_feed(feed_url)
|
||||
|
||||
assert reader.get_tag(feed_url, ".reader.autodiscover", None) == [
|
||||
{
|
||||
"href": f"{feed_url}rss.xml",
|
||||
"type": "application/rss+xml",
|
||||
"title": "Example",
|
||||
}
|
||||
]
|
||||
finally:
|
||||
get_reader.cache_clear()
|
||||
|
||||
|
||||
def test_make_app_reader_enables_supported_builtin_plugins(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
"""Supported reader versions should load both built-in plugins explicitly."""
|
||||
reader = object()
|
||||
make_reader = MagicMock(return_value=reader)
|
||||
monkeypatch.setattr(settings_module, "has_plugin", lambda _plugin_name: True)
|
||||
monkeypatch.setattr(settings_module, "make_reader", make_reader)
|
||||
|
||||
assert make_app_reader(Path("db.sqlite")) is reader
|
||||
make_reader.assert_called_once_with(
|
||||
url="db.sqlite",
|
||||
plugins=[".ua_fallback", ".autodiscover"],
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("available_plugin", "expected_plugin"),
|
||||
[
|
||||
(".ua_fallback", ".ua_fallback"),
|
||||
(".autodiscover", ".autodiscover"),
|
||||
],
|
||||
)
|
||||
def test_make_app_reader_loads_only_available_builtin_plugin(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
available_plugin: str,
|
||||
expected_plugin: str,
|
||||
) -> None:
|
||||
"""Reader construction should not receive unavailable built-in plugins."""
|
||||
reader = object()
|
||||
make_reader = MagicMock(return_value=reader)
|
||||
monkeypatch.setattr(settings_module, "has_plugin", lambda plugin_name: plugin_name == available_plugin)
|
||||
monkeypatch.setattr(settings_module, "make_reader", make_reader)
|
||||
|
||||
assert make_app_reader(Path("db.sqlite")) is reader
|
||||
make_reader.assert_called_once_with(url="db.sqlite", plugins=[expected_plugin])
|
||||
|
||||
|
||||
def test_make_app_reader_preserves_defaults_without_builtin_plugins(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
"""Reader versions without built-in plugins should start with their defaults."""
|
||||
reader = object()
|
||||
make_reader = MagicMock(return_value=reader)
|
||||
monkeypatch.setattr(settings_module, "has_plugin", lambda _plugin_name: False)
|
||||
monkeypatch.setattr(settings_module, "make_reader", make_reader)
|
||||
|
||||
assert make_app_reader(Path("db.sqlite")) is reader
|
||||
make_reader.assert_called_once_with(url="db.sqlite")
|
||||
|
||||
|
||||
def test_has_plugin_handles_reader_versions_without_plugins_package(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
"""Older reader versions without a plugins package should be supported."""
|
||||
|
||||
def find_spec(_name: str) -> None:
|
||||
raise ModuleNotFoundError
|
||||
|
||||
monkeypatch.setattr(settings_module, "find_spec", find_spec)
|
||||
|
||||
assert has_plugin(".autodiscover") is False
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue