Compare commits
2 commits
b82ec01570
...
459df727a9
| Author | SHA1 | Date | |
|---|---|---|---|
|
459df727a9 |
|||
|
9183d7ddec |
7 changed files with 425 additions and 39 deletions
|
|
@ -79,6 +79,21 @@ type SentWebhookRecord = dict[str, JsonValue]
|
||||||
type UpdateCallback = Callable[[], UpdatedFeed | None]
|
type UpdateCallback = Callable[[], UpdatedFeed | None]
|
||||||
|
|
||||||
|
|
||||||
|
class FeedUpdateError(HTTPException):
|
||||||
|
"""Raised when the initial update for a newly added feed fails."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
status_code: int,
|
||||||
|
detail: str,
|
||||||
|
autodiscover_links: object | None = None,
|
||||||
|
) -> None:
|
||||||
|
"""Initialize an initial-update error and preserve advertised feed links."""
|
||||||
|
super().__init__(status_code=status_code, detail=detail)
|
||||||
|
self.autodiscover_links = autodiscover_links
|
||||||
|
|
||||||
|
|
||||||
class JsonResponseLike(Protocol):
|
class JsonResponseLike(Protocol):
|
||||||
"""Response interface needed for Discord webhook JSON parsing."""
|
"""Response interface needed for Discord webhook JSON parsing."""
|
||||||
|
|
||||||
|
|
@ -1890,6 +1905,22 @@ def truncate_webhook_message(webhook_message: str) -> str:
|
||||||
return webhook_message
|
return webhook_message
|
||||||
|
|
||||||
|
|
||||||
|
def get_raw_autodiscover_links(reader: Reader, feed_url: str) -> object | None:
|
||||||
|
"""Return advertised feed links stored after a failed initial update."""
|
||||||
|
try:
|
||||||
|
return reader.get_tag(feed_url, ".reader.autodiscover", None)
|
||||||
|
except ReaderError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def remove_invalid_new_feed(reader: Reader, feed_url: str) -> None:
|
||||||
|
"""Remove a feed inserted before its initial update failed."""
|
||||||
|
try:
|
||||||
|
reader.delete_feed(feed_url)
|
||||||
|
except ReaderError:
|
||||||
|
logger.exception("Failed to remove invalid feed after initial update: %s", feed_url)
|
||||||
|
|
||||||
|
|
||||||
def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None: # noqa: C901, PLR0912
|
def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None: # noqa: C901, PLR0912
|
||||||
"""Add a new feed, update it and mark every entry as read.
|
"""Add a new feed, update it and mark every entry as read.
|
||||||
|
|
||||||
|
|
@ -1899,10 +1930,12 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None:
|
||||||
webhook_dropdown: The webhook we should send entries to.
|
webhook_dropdown: The webhook we should send entries to.
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
|
FeedUpdateError: If the initial feed update fails.
|
||||||
HTTPException: If webhook_dropdown does not equal a webhook or default_custom_message not found.
|
HTTPException: If webhook_dropdown does not equal a webhook or default_custom_message not found.
|
||||||
"""
|
"""
|
||||||
clean_feed_url: str = feed_url.strip()
|
clean_feed_url: str = feed_url.strip()
|
||||||
webhook_url: str = ""
|
webhook_url: str = ""
|
||||||
|
feed_was_added: bool = False
|
||||||
if hooks := reader.get_tag((), "webhooks", []):
|
if hooks := reader.get_tag((), "webhooks", []):
|
||||||
# Get the webhook URL from the dropdown.
|
# Get the webhook URL from the dropdown.
|
||||||
for hook in hooks:
|
for hook in hooks:
|
||||||
|
|
@ -1919,6 +1952,7 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
reader.add_feed(clean_feed_url)
|
reader.add_feed(clean_feed_url)
|
||||||
|
feed_was_added = True
|
||||||
except FeedExistsError:
|
except FeedExistsError:
|
||||||
# Add the webhook to an already added feed if it doesn't have a webhook instead of trying to create a new.
|
# Add the webhook to an already added feed if it doesn't have a webhook instead of trying to create a new.
|
||||||
if not reader.get_tag(clean_feed_url, "webhook", ""):
|
if not reader.get_tag(clean_feed_url, "webhook", ""):
|
||||||
|
|
@ -1929,7 +1963,16 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None:
|
||||||
try:
|
try:
|
||||||
reader.update_feed(clean_feed_url)
|
reader.update_feed(clean_feed_url)
|
||||||
except ReaderError as e:
|
except ReaderError as e:
|
||||||
raise HTTPException(status_code=404, detail=f"Error updating feed: {e}") from e
|
autodiscover_links = get_raw_autodiscover_links(reader, clean_feed_url)
|
||||||
|
|
||||||
|
if feed_was_added:
|
||||||
|
remove_invalid_new_feed(reader, clean_feed_url)
|
||||||
|
|
||||||
|
raise FeedUpdateError(
|
||||||
|
status_code=404,
|
||||||
|
detail=f"Error updating feed: {e}",
|
||||||
|
autodiscover_links=autodiscover_links,
|
||||||
|
) from e
|
||||||
|
|
||||||
# Mark every entry as read, so we don't send all the old entries to Discord.
|
# Mark every entry as read, so we don't send all the old entries to Discord.
|
||||||
entries: Iterable[Entry] = reader.get_entries(feed=clean_feed_url, read=False)
|
entries: Iterable[Entry] = reader.get_entries(feed=clean_feed_url, read=False)
|
||||||
|
|
|
||||||
|
|
@ -52,6 +52,7 @@ from discord_rss_bot.custom_message import get_embed
|
||||||
from discord_rss_bot.custom_message import get_first_image
|
from discord_rss_bot.custom_message import get_first_image
|
||||||
from discord_rss_bot.custom_message import replace_tags_in_text_message
|
from discord_rss_bot.custom_message import replace_tags_in_text_message
|
||||||
from discord_rss_bot.custom_message import save_embed
|
from discord_rss_bot.custom_message import save_embed
|
||||||
|
from discord_rss_bot.feeds import FeedUpdateError
|
||||||
from discord_rss_bot.feeds import SentWebhookRecord
|
from discord_rss_bot.feeds import SentWebhookRecord
|
||||||
from discord_rss_bot.feeds import coerce_media_gallery_image_limit
|
from discord_rss_bot.feeds import coerce_media_gallery_image_limit
|
||||||
from discord_rss_bot.feeds import create_feed
|
from discord_rss_bot.feeds import create_feed
|
||||||
|
|
@ -122,6 +123,12 @@ class FilterPreviewContext(TypedDict):
|
||||||
preview_helper_text: str
|
preview_helper_text: str
|
||||||
|
|
||||||
|
|
||||||
|
class AutodiscoverLink(TypedDict):
|
||||||
|
href: str
|
||||||
|
type: str | None
|
||||||
|
title: str | None
|
||||||
|
|
||||||
|
|
||||||
LOGGING_CONFIG = {
|
LOGGING_CONFIG = {
|
||||||
"version": 1,
|
"version": 1,
|
||||||
"disable_existing_loggers": False,
|
"disable_existing_loggers": False,
|
||||||
|
|
@ -262,6 +269,63 @@ templates.env.globals["get_backup_path"] = get_backup_path # pyright: ignore[re
|
||||||
templates.env.globals["has_webhooks"] = has_webhooks # pyright: ignore[reportArgumentType]
|
templates.env.globals["has_webhooks"] = has_webhooks # pyright: ignore[reportArgumentType]
|
||||||
|
|
||||||
|
|
||||||
|
def get_global_delivery_mode(reader: Reader) -> str:
|
||||||
|
"""Return the normalized default delivery mode for new feeds.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
reader: The Reader instance.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The configured delivery mode, falling back to embed.
|
||||||
|
"""
|
||||||
|
global_delivery_mode: str = str(reader.get_tag((), "delivery_mode", "embed")).strip().lower()
|
||||||
|
return global_delivery_mode if global_delivery_mode in {"embed", "text"} else "embed"
|
||||||
|
|
||||||
|
|
||||||
|
def get_autodiscover_links(
|
||||||
|
reader: Reader,
|
||||||
|
feed_url: str,
|
||||||
|
stored_links: object | None = None,
|
||||||
|
) -> list[AutodiscoverLink]:
|
||||||
|
"""Return valid autodiscovered links stored for a failed feed update.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
reader: The Reader instance.
|
||||||
|
feed_url: The URL that failed to parse as a feed.
|
||||||
|
stored_links: Optional advertised links preserved before deleting an invalid feed.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Valid discovered feed link dictionaries.
|
||||||
|
"""
|
||||||
|
if stored_links is None:
|
||||||
|
try:
|
||||||
|
stored_links = reader.get_tag(feed_url, ".reader.autodiscover", [])
|
||||||
|
except ReaderError:
|
||||||
|
return []
|
||||||
|
|
||||||
|
if not isinstance(stored_links, list):
|
||||||
|
return []
|
||||||
|
|
||||||
|
links: list[AutodiscoverLink] = []
|
||||||
|
for stored_link in stored_links:
|
||||||
|
if not isinstance(stored_link, dict):
|
||||||
|
continue
|
||||||
|
|
||||||
|
href = stored_link.get("href")
|
||||||
|
if not isinstance(href, str) or not href:
|
||||||
|
continue
|
||||||
|
|
||||||
|
link_type = stored_link.get("type")
|
||||||
|
title = stored_link.get("title")
|
||||||
|
links.append({
|
||||||
|
"href": href,
|
||||||
|
"type": link_type if isinstance(link_type, str) else None,
|
||||||
|
"title": title if isinstance(title, str) else None,
|
||||||
|
})
|
||||||
|
|
||||||
|
return links
|
||||||
|
|
||||||
|
|
||||||
@app.post("/add_webhook")
|
@app.post("/add_webhook")
|
||||||
async def post_add_webhook(
|
async def post_add_webhook(
|
||||||
webhook_name: Annotated[str, Form()],
|
webhook_name: Annotated[str, Form()],
|
||||||
|
|
@ -357,24 +421,51 @@ async def post_delete_webhook(
|
||||||
return RedirectResponse(url="/", status_code=303)
|
return RedirectResponse(url="/", status_code=303)
|
||||||
|
|
||||||
|
|
||||||
@app.post("/add")
|
@app.post("/add", response_model=None)
|
||||||
async def post_create_feed(
|
async def post_create_feed(
|
||||||
|
request: Request,
|
||||||
feed_url: Annotated[str, Form()],
|
feed_url: Annotated[str, Form()],
|
||||||
webhook_dropdown: Annotated[str, Form()],
|
webhook_dropdown: Annotated[str, Form()],
|
||||||
reader: Annotated[Reader, Depends(get_reader_dependency)],
|
reader: Annotated[Reader, Depends(get_reader_dependency)],
|
||||||
) -> RedirectResponse:
|
) -> RedirectResponse | HTMLResponse:
|
||||||
"""Add a feed to the database.
|
"""Add a feed to the database.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
request: The request object.
|
||||||
feed_url: The feed to add.
|
feed_url: The feed to add.
|
||||||
webhook_dropdown: The webhook to use.
|
webhook_dropdown: The webhook to use.
|
||||||
reader: The Reader instance.
|
reader: The Reader instance.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
RedirectResponse: Redirect to the feed page.
|
RedirectResponse: Redirect to the feed page.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
FeedUpdateError: If updating the feed fails without discovered feed links.
|
||||||
"""
|
"""
|
||||||
clean_feed_url: str = feed_url.strip()
|
clean_feed_url: str = feed_url.strip()
|
||||||
create_feed(reader, feed_url, webhook_dropdown)
|
try:
|
||||||
|
create_feed(reader, feed_url, webhook_dropdown)
|
||||||
|
except FeedUpdateError as exception:
|
||||||
|
autodiscover_links = get_autodiscover_links(reader, clean_feed_url, exception.autodiscover_links)
|
||||||
|
if not autodiscover_links:
|
||||||
|
raise
|
||||||
|
|
||||||
|
context = {
|
||||||
|
"request": request,
|
||||||
|
"webhooks": reader.get_tag((), "webhooks", []),
|
||||||
|
"global_delivery_mode": get_global_delivery_mode(reader),
|
||||||
|
"feed_url": clean_feed_url,
|
||||||
|
"selected_webhook": webhook_dropdown,
|
||||||
|
"messages": exception.detail,
|
||||||
|
"autodiscover_links": autodiscover_links,
|
||||||
|
}
|
||||||
|
return templates.TemplateResponse(
|
||||||
|
request=request,
|
||||||
|
name="add.html",
|
||||||
|
context=context,
|
||||||
|
status_code=exception.status_code,
|
||||||
|
)
|
||||||
|
|
||||||
commit_state_change(reader, f"Add feed {clean_feed_url}")
|
commit_state_change(reader, f"Add feed {clean_feed_url}")
|
||||||
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
|
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
|
||||||
|
|
||||||
|
|
@ -1609,14 +1700,10 @@ def get_add(
|
||||||
Returns:
|
Returns:
|
||||||
HTMLResponse: The add feed page.
|
HTMLResponse: The add feed page.
|
||||||
"""
|
"""
|
||||||
global_delivery_mode: str = str(reader.get_tag((), "delivery_mode", "embed")).strip().lower()
|
|
||||||
if global_delivery_mode not in {"embed", "text"}:
|
|
||||||
global_delivery_mode = "embed"
|
|
||||||
|
|
||||||
context = {
|
context = {
|
||||||
"request": request,
|
"request": request,
|
||||||
"webhooks": reader.get_tag((), "webhooks", []),
|
"webhooks": reader.get_tag((), "webhooks", []),
|
||||||
"global_delivery_mode": global_delivery_mode,
|
"global_delivery_mode": get_global_delivery_mode(reader),
|
||||||
}
|
}
|
||||||
return templates.TemplateResponse(request=request, name="add.html", context=context)
|
return templates.TemplateResponse(request=request, name="add.html", context=context)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ from __future__ import annotations
|
||||||
import os
|
import os
|
||||||
import typing
|
import typing
|
||||||
from functools import lru_cache
|
from functools import lru_cache
|
||||||
|
from importlib.util import find_spec
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from platformdirs import user_data_dir
|
from platformdirs import user_data_dir
|
||||||
|
|
@ -31,6 +32,31 @@ default_custom_embed: dict[str, str] = {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def has_plugin(plugin_name: str) -> bool:
|
||||||
|
"""Return whether the installed reader version provides a built-in plugin.
|
||||||
|
|
||||||
|
We started using .autodiscover, but that is from Reader version 3.25.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
return find_spec(f"reader.plugins.{plugin_name.removeprefix('.')}") is not None
|
||||||
|
except ModuleNotFoundError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def make_app_reader(db_location: Path) -> Reader:
|
||||||
|
"""Create a reader with plugins supported by the installed reader version.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The configured reader.
|
||||||
|
"""
|
||||||
|
plugins_we_want = (".ua_fallback", ".autodiscover")
|
||||||
|
plugins: list[str] = [name for name in plugins_we_want if has_plugin(name)]
|
||||||
|
|
||||||
|
if plugins:
|
||||||
|
return make_reader(url=str(db_location), plugins=plugins)
|
||||||
|
return make_reader(url=str(db_location))
|
||||||
|
|
||||||
|
|
||||||
@lru_cache(maxsize=1)
|
@lru_cache(maxsize=1)
|
||||||
def get_reader(custom_location: Path | None = None) -> Reader:
|
def get_reader(custom_location: Path | None = None) -> Reader:
|
||||||
"""Get the reader.
|
"""Get the reader.
|
||||||
|
|
@ -42,7 +68,7 @@ def get_reader(custom_location: Path | None = None) -> Reader:
|
||||||
The reader.
|
The reader.
|
||||||
"""
|
"""
|
||||||
db_location: Path = custom_location or Path(data_dir) / "db.sqlite"
|
db_location: Path = custom_location or Path(data_dir) / "db.sqlite"
|
||||||
reader: Reader = make_reader(url=str(db_location))
|
reader: Reader = make_app_reader(db_location)
|
||||||
|
|
||||||
# https://reader.readthedocs.io/en/latest/api.html#reader.types.UpdateConfig
|
# https://reader.readthedocs.io/en/latest/api.html#reader.types.UpdateConfig
|
||||||
# Set the default update interval to 15 minutes if not already configured
|
# Set the default update interval to 15 minutes if not already configured
|
||||||
|
|
|
||||||
|
|
@ -8,11 +8,6 @@
|
||||||
{% block content %}
|
{% block content %}
|
||||||
<div class="p-2 border border-dark">
|
<div class="p-2 border border-dark">
|
||||||
<form action="/add" method="post">
|
<form action="/add" method="post">
|
||||||
<div class="mb-3 text-muted">
|
|
||||||
New feeds currently default to
|
|
||||||
<strong>{{ global_delivery_mode }}</strong>
|
|
||||||
delivery mode.
|
|
||||||
</div>
|
|
||||||
<!-- Feed URL -->
|
<!-- Feed URL -->
|
||||||
<div class="row pb-2">
|
<div class="row pb-2">
|
||||||
<label for="feed_url" class="col-sm-2 col-form-label">Feed URL</label>
|
<label for="feed_url" class="col-sm-2 col-form-label">Feed URL</label>
|
||||||
|
|
@ -20,7 +15,8 @@
|
||||||
<input name="feed_url"
|
<input name="feed_url"
|
||||||
type="text"
|
type="text"
|
||||||
class="form-control bg-dark border-dark text-muted"
|
class="form-control bg-dark border-dark text-muted"
|
||||||
id="feed_url" />
|
id="feed_url"
|
||||||
|
value="{{ feed_url }}" />
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
<!-- Webhook dropdown -->
|
<!-- Webhook dropdown -->
|
||||||
|
|
@ -30,8 +26,11 @@
|
||||||
<select class="col-auto form-select bg-dark border-dark text-muted"
|
<select class="col-auto form-select bg-dark border-dark text-muted"
|
||||||
id="webhook_dropdown"
|
id="webhook_dropdown"
|
||||||
name="webhook_dropdown">
|
name="webhook_dropdown">
|
||||||
<option selected>Choose webhook...</option>
|
<option {% if not selected_webhook %}selected{% endif %}>Choose webhook...</option>
|
||||||
{% for hook in webhooks %}<option value="{{ hook.name }}">{{- hook.name -}}</option>{% endfor %}
|
{% for hook in webhooks %}
|
||||||
|
<option value="{{ hook.name }}"
|
||||||
|
{% if hook.name == selected_webhook %}selected{% endif %}>{{ hook.name }}</option>
|
||||||
|
{% endfor %}
|
||||||
</select>
|
</select>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
@ -41,4 +40,28 @@
|
||||||
</div>
|
</div>
|
||||||
</form>
|
</form>
|
||||||
</div>
|
</div>
|
||||||
|
{% if autodiscover_links %}
|
||||||
|
<section class="card border border-dark shadow-sm text-light rounded-0 mt-3">
|
||||||
|
<div class="card-body p-3 p-md-4">
|
||||||
|
<h2 class="h6 text-uppercase text-muted mb-2">Discovered feed links</h2>
|
||||||
|
<p class="text-muted mb-3">The submitted URL is not a feed. Choose one of the feeds advertised by that page.</p>
|
||||||
|
<div class="d-flex flex-column gap-2">
|
||||||
|
{% for link in autodiscover_links %}
|
||||||
|
<form action="/add"
|
||||||
|
method="post"
|
||||||
|
class="d-flex flex-column flex-md-row align-items-md-center gap-3 p-3 border border-dark rounded-0">
|
||||||
|
<input type="hidden" name="feed_url" value="{{ link.href }}" />
|
||||||
|
<input type="hidden" name="webhook_dropdown" value="{{ selected_webhook }}" />
|
||||||
|
<div class="flex-grow-1 feed-page__content">
|
||||||
|
{% if link.title %}<div class="text-light fw-semibold">{{ link.title }}</div>{% endif %}
|
||||||
|
<code class="d-block text-muted feed-page__wrap">{{ link.href }}</code>
|
||||||
|
{% if link.type %}<div class="small text-muted mt-1">{{ link.type }}</div>{% endif %}
|
||||||
|
</div>
|
||||||
|
<button class="btn btn-outline-light btn-sm ms-md-auto" type="submit">Add feed</button>
|
||||||
|
</form>
|
||||||
|
{% endfor %}
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
</section>
|
||||||
|
{% endif %}
|
||||||
{% endblock content %}
|
{% endblock content %}
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@ from unittest.mock import patch
|
||||||
import pytest
|
import pytest
|
||||||
from reader import EntryNotFoundError
|
from reader import EntryNotFoundError
|
||||||
from reader import Feed
|
from reader import Feed
|
||||||
|
from reader import FeedExistsError
|
||||||
from reader import FeedNotFoundError
|
from reader import FeedNotFoundError
|
||||||
from reader import Reader
|
from reader import Reader
|
||||||
from reader import StorageError
|
from reader import StorageError
|
||||||
|
|
@ -448,6 +449,38 @@ def test_create_feed_falls_back_to_embed_when_global_delivery_mode_is_invalid()
|
||||||
reader.set_tag.assert_any_call("https://example.com/feed.xml", "should_send_embed", True)
|
reader.set_tag.assert_any_call("https://example.com/feed.xml", "should_send_embed", True)
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_feed_removes_new_feed_when_initial_update_fails() -> None:
|
||||||
|
feed_url = "https://example.com/not-a-feed"
|
||||||
|
autodiscover_links = [{"href": "https://example.com/feed.xml", "type": "application/rss+xml"}]
|
||||||
|
reader = MagicMock()
|
||||||
|
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
|
||||||
|
"webhooks": [{"name": "Main", "url": "https://discord.com/api/webhooks/123/abc"}],
|
||||||
|
".reader.autodiscover": autodiscover_links,
|
||||||
|
}.get(key, default)
|
||||||
|
reader.update_feed.side_effect = StorageError("invalid feed")
|
||||||
|
|
||||||
|
with pytest.raises(feeds.FeedUpdateError) as exc_info:
|
||||||
|
create_feed(reader, feed_url, "Main")
|
||||||
|
|
||||||
|
reader.delete_feed.assert_called_once_with(feed_url)
|
||||||
|
assert exc_info.value.autodiscover_links == autodiscover_links
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_feed_does_not_remove_existing_feed_when_update_fails() -> None:
|
||||||
|
feed_url = "https://example.com/existing-feed.xml"
|
||||||
|
reader = MagicMock()
|
||||||
|
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
|
||||||
|
"webhooks": [{"name": "Main", "url": "https://discord.com/api/webhooks/123/abc"}],
|
||||||
|
}.get(key, default)
|
||||||
|
reader.add_feed.side_effect = FeedExistsError(feed_url)
|
||||||
|
reader.update_feed.side_effect = StorageError("temporary failure")
|
||||||
|
|
||||||
|
with pytest.raises(feeds.FeedUpdateError):
|
||||||
|
create_feed(reader, feed_url, "Main")
|
||||||
|
|
||||||
|
reader.delete_feed.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
@patch("discord_rss_bot.feeds.capture_full_page_screenshot")
|
@patch("discord_rss_bot.feeds.capture_full_page_screenshot")
|
||||||
@patch("discord_rss_bot.feeds.DiscordWebhook")
|
@patch("discord_rss_bot.feeds.DiscordWebhook")
|
||||||
def test_create_screenshot_webhook_adds_image_file(
|
def test_create_screenshot_webhook_adds_image_file(
|
||||||
|
|
|
||||||
|
|
@ -188,6 +188,54 @@ def test_create_feed() -> None:
|
||||||
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
|
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_feed_suggests_autodiscovered_links() -> None:
|
||||||
|
"""A page URL that fails to parse should render its advertised feed links."""
|
||||||
|
submitted_url = "https://example.com/blog"
|
||||||
|
discovered_url = "https://example.com/rss.xml"
|
||||||
|
stub_reader = MagicMock()
|
||||||
|
|
||||||
|
def get_tag(resource: str | tuple[()], key: str, default: TestTagValue = None) -> TestTagValue:
|
||||||
|
if resource == () and key == "webhooks":
|
||||||
|
return [{"name": webhook_name, "url": webhook_url}]
|
||||||
|
if resource == () and key == "delivery_mode":
|
||||||
|
return "embed"
|
||||||
|
return default
|
||||||
|
|
||||||
|
stub_reader.get_tag.side_effect = get_tag
|
||||||
|
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
|
||||||
|
|
||||||
|
try:
|
||||||
|
with patch.object(
|
||||||
|
main_module,
|
||||||
|
"create_feed",
|
||||||
|
side_effect=feeds.FeedUpdateError(
|
||||||
|
status_code=404,
|
||||||
|
detail="Error updating feed",
|
||||||
|
autodiscover_links=[
|
||||||
|
{
|
||||||
|
"href": discovered_url,
|
||||||
|
"title": "Example feed",
|
||||||
|
"type": "application/rss+xml",
|
||||||
|
}
|
||||||
|
],
|
||||||
|
),
|
||||||
|
):
|
||||||
|
response: Response = client.post(
|
||||||
|
url="/add",
|
||||||
|
data={"feed_url": submitted_url, "webhook_dropdown": webhook_name},
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides = {}
|
||||||
|
|
||||||
|
assert response.status_code == 404
|
||||||
|
assert "Discovered feed links" in response.text
|
||||||
|
assert "Example feed" in response.text
|
||||||
|
assert discovered_url in response.text
|
||||||
|
assert "application/rss+xml" in response.text
|
||||||
|
assert f'value="{submitted_url}"' in response.text
|
||||||
|
assert f'value="{webhook_name}"' in response.text
|
||||||
|
|
||||||
|
|
||||||
def test_get() -> None:
|
def test_get() -> None:
|
||||||
"""Test the /create_feed page."""
|
"""Test the /create_feed page."""
|
||||||
# Ensure webhook exists for this test regardless of test order.
|
# Ensure webhook exists for this test regardless of test order.
|
||||||
|
|
@ -572,7 +620,6 @@ def test_add_page_shows_global_default_delivery_mode_hint() -> None:
|
||||||
|
|
||||||
response = client.get(url="/add")
|
response = client.get(url="/add")
|
||||||
assert response.status_code == 200, f"/add failed: {response.text}"
|
assert response.status_code == 200, f"/add failed: {response.text}"
|
||||||
assert "New feeds currently default to" in response.text
|
|
||||||
assert "text" in response.text
|
assert "text" in response.text
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -995,31 +1042,23 @@ def test_change_feed_url_nonexistent_old_url_returns_404() -> None:
|
||||||
|
|
||||||
def test_change_feed_url_new_url_already_exists_returns_409() -> None:
|
def test_change_feed_url_new_url_already_exists_returns_409() -> None:
|
||||||
"""Changing to a URL that is already tracked should return HTTP 409."""
|
"""Changing to a URL that is already tracked should return HTTP 409."""
|
||||||
second_feed_url = "https://lovinator.space/rss_test_small.xml"
|
second_feed_url = "https://example.com/existing.xml"
|
||||||
|
|
||||||
# Ensure both feeds are absent.
|
class StubReader:
|
||||||
client.post(url="/remove", data={"feed_url": feed_url})
|
def change_feed_url(self, _old_url: str, new_url: str) -> None:
|
||||||
client.post(url="/remove", data={"feed_url": second_feed_url})
|
raise feeds.FeedExistsError(new_url)
|
||||||
|
|
||||||
# Ensure webhook exists.
|
app.dependency_overrides[get_reader_dependency] = StubReader
|
||||||
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
|
try:
|
||||||
client.post(url="/add_webhook", data={"webhook_name": webhook_name, "webhook_url": webhook_url})
|
response: Response = client.post(
|
||||||
|
url="/change_feed_url",
|
||||||
|
data={"old_feed_url": feed_url, "new_feed_url": second_feed_url},
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides = {}
|
||||||
|
|
||||||
# Add both feeds.
|
|
||||||
client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
|
|
||||||
client.post(url="/add", data={"feed_url": second_feed_url, "webhook_dropdown": webhook_name})
|
|
||||||
|
|
||||||
# Try to rename one to the other.
|
|
||||||
response: Response = client.post(
|
|
||||||
url="/change_feed_url",
|
|
||||||
data={"old_feed_url": feed_url, "new_feed_url": second_feed_url},
|
|
||||||
)
|
|
||||||
assert response.status_code == 409, f"Expected 409 when new URL already exists, got {response.status_code}"
|
assert response.status_code == 409, f"Expected 409 when new URL already exists, got {response.status_code}"
|
||||||
|
|
||||||
# Cleanup.
|
|
||||||
client.post(url="/remove", data={"feed_url": feed_url})
|
|
||||||
client.post(url="/remove", data={"feed_url": second_feed_url})
|
|
||||||
|
|
||||||
|
|
||||||
def test_change_feed_url_same_url_redirects_without_error() -> None:
|
def test_change_feed_url_same_url_redirects_without_error() -> None:
|
||||||
"""Changing a feed's URL to itself should redirect cleanly without any error."""
|
"""Changing a feed's URL to itself should redirect cleanly without any error."""
|
||||||
|
|
|
||||||
|
|
@ -2,13 +2,62 @@ from __future__ import annotations
|
||||||
|
|
||||||
import pathlib
|
import pathlib
|
||||||
import tempfile
|
import tempfile
|
||||||
|
from contextlib import closing
|
||||||
|
from contextlib import contextmanager
|
||||||
|
from http.server import BaseHTTPRequestHandler
|
||||||
|
from http.server import ThreadingHTTPServer
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from threading import Thread
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from reader import ParseError
|
||||||
from reader import Reader
|
from reader import Reader
|
||||||
|
|
||||||
|
import discord_rss_bot.settings as settings_module
|
||||||
from discord_rss_bot.settings import data_dir
|
from discord_rss_bot.settings import data_dir
|
||||||
from discord_rss_bot.settings import default_custom_message
|
from discord_rss_bot.settings import default_custom_message
|
||||||
from discord_rss_bot.settings import get_reader
|
from discord_rss_bot.settings import get_reader
|
||||||
|
from discord_rss_bot.settings import has_plugin
|
||||||
|
from discord_rss_bot.settings import make_app_reader
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from collections.abc import Iterator
|
||||||
|
|
||||||
|
|
||||||
|
class _AutodiscoverHandler(BaseHTTPRequestHandler):
|
||||||
|
"""Serve an HTML page that advertises an RSS feed."""
|
||||||
|
|
||||||
|
def do_GET(self) -> None:
|
||||||
|
"""Return HTML instead of a feed so reader attempts autodiscovery."""
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header("Content-Type", "text/html")
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(
|
||||||
|
b'<html><head><link rel="alternate" href="/rss.xml" '
|
||||||
|
b'type="application/rss+xml" title="Example"></head></html>'
|
||||||
|
)
|
||||||
|
|
||||||
|
def log_message(self, _format: str, *_args: object) -> None:
|
||||||
|
"""Suppress HTTP request logging during tests."""
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def _serve_autodiscover_html() -> Iterator[str]:
|
||||||
|
"""Serve an HTML page URL while the context is active.
|
||||||
|
|
||||||
|
Yields:
|
||||||
|
The URL of the HTML page.
|
||||||
|
"""
|
||||||
|
with ThreadingHTTPServer(("127.0.0.1", 0), _AutodiscoverHandler) as server:
|
||||||
|
server_thread = Thread(target=server.serve_forever, daemon=True)
|
||||||
|
server_thread.start()
|
||||||
|
try:
|
||||||
|
yield f"http://127.0.0.1:{server.server_port}/"
|
||||||
|
finally:
|
||||||
|
server.shutdown()
|
||||||
|
server_thread.join()
|
||||||
|
|
||||||
|
|
||||||
def test_reader() -> None:
|
def test_reader() -> None:
|
||||||
|
|
@ -141,3 +190,89 @@ def test_get_reader_preserves_existing_global_delivery_mode() -> None:
|
||||||
|
|
||||||
second_reader.close()
|
second_reader.close()
|
||||||
get_reader.cache_clear()
|
get_reader.cache_clear()
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_reader_enables_autodiscover_plugin() -> None:
|
||||||
|
"""get_reader should store advertised feed links when HTML parsing fails."""
|
||||||
|
get_reader.cache_clear()
|
||||||
|
|
||||||
|
try:
|
||||||
|
with (
|
||||||
|
tempfile.TemporaryDirectory() as temp_dir,
|
||||||
|
_serve_autodiscover_html() as feed_url,
|
||||||
|
closing(get_reader(custom_location=Path(temp_dir, "autodiscover_db.sqlite"))) as reader,
|
||||||
|
):
|
||||||
|
reader.add_feed(feed_url)
|
||||||
|
|
||||||
|
with pytest.raises(ParseError):
|
||||||
|
reader.update_feed(feed_url)
|
||||||
|
|
||||||
|
assert reader.get_tag(feed_url, ".reader.autodiscover", None) == [
|
||||||
|
{
|
||||||
|
"href": f"{feed_url}rss.xml",
|
||||||
|
"type": "application/rss+xml",
|
||||||
|
"title": "Example",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
finally:
|
||||||
|
get_reader.cache_clear()
|
||||||
|
|
||||||
|
|
||||||
|
def test_make_app_reader_enables_supported_builtin_plugins(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
"""Supported reader versions should load both built-in plugins explicitly."""
|
||||||
|
reader = object()
|
||||||
|
make_reader = MagicMock(return_value=reader)
|
||||||
|
monkeypatch.setattr(settings_module, "has_plugin", lambda _plugin_name: True)
|
||||||
|
monkeypatch.setattr(settings_module, "make_reader", make_reader)
|
||||||
|
|
||||||
|
assert make_app_reader(Path("db.sqlite")) is reader
|
||||||
|
make_reader.assert_called_once_with(
|
||||||
|
url="db.sqlite",
|
||||||
|
plugins=[".ua_fallback", ".autodiscover"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
("available_plugin", "expected_plugin"),
|
||||||
|
[
|
||||||
|
(".ua_fallback", ".ua_fallback"),
|
||||||
|
(".autodiscover", ".autodiscover"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_make_app_reader_loads_only_available_builtin_plugin(
|
||||||
|
monkeypatch: pytest.MonkeyPatch,
|
||||||
|
available_plugin: str,
|
||||||
|
expected_plugin: str,
|
||||||
|
) -> None:
|
||||||
|
"""Reader construction should not receive unavailable built-in plugins."""
|
||||||
|
reader = object()
|
||||||
|
make_reader = MagicMock(return_value=reader)
|
||||||
|
monkeypatch.setattr(settings_module, "has_plugin", lambda plugin_name: plugin_name == available_plugin)
|
||||||
|
monkeypatch.setattr(settings_module, "make_reader", make_reader)
|
||||||
|
|
||||||
|
assert make_app_reader(Path("db.sqlite")) is reader
|
||||||
|
make_reader.assert_called_once_with(url="db.sqlite", plugins=[expected_plugin])
|
||||||
|
|
||||||
|
|
||||||
|
def test_make_app_reader_preserves_defaults_without_builtin_plugins(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
"""Reader versions without built-in plugins should start with their defaults."""
|
||||||
|
reader = object()
|
||||||
|
make_reader = MagicMock(return_value=reader)
|
||||||
|
monkeypatch.setattr(settings_module, "has_plugin", lambda _plugin_name: False)
|
||||||
|
monkeypatch.setattr(settings_module, "make_reader", make_reader)
|
||||||
|
|
||||||
|
assert make_app_reader(Path("db.sqlite")) is reader
|
||||||
|
make_reader.assert_called_once_with(url="db.sqlite")
|
||||||
|
|
||||||
|
|
||||||
|
def test_has_plugin_handles_reader_versions_without_plugins_package(
|
||||||
|
monkeypatch: pytest.MonkeyPatch,
|
||||||
|
) -> None:
|
||||||
|
"""Older reader versions without a plugins package should be supported."""
|
||||||
|
|
||||||
|
def find_spec(_name: str) -> None:
|
||||||
|
raise ModuleNotFoundError
|
||||||
|
|
||||||
|
monkeypatch.setattr(settings_module, "find_spec", find_spec)
|
||||||
|
|
||||||
|
assert has_plugin(".autodiscover") is False
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue