Edit sent Discord webhooks if entry values updates
All checks were successful
Test and build Docker image / docker (push) Successful in 1m48s

This commit is contained in:
Joakim Hellsén 2026-05-09 04:41:50 +02:00
commit 36d55566fc
Signed by: Joakim Hellsén
SSH key fingerprint: SHA256:/9h/CsExpFp+PRhsfA0xznFx2CGfTT5R/kpuFfUgEQk
13 changed files with 1313 additions and 141 deletions

View file

@ -14,6 +14,9 @@ from markdownify import markdownify
from discord_rss_bot.is_url_valid import is_url_valid
if TYPE_CHECKING:
from collections.abc import Sequence
from reader import Content
from reader import Entry
from reader import Feed
from reader import Reader
@ -193,7 +196,7 @@ def replace_tags_in_text_message(entry: Entry, reader: Reader) -> str:
return custom_message.replace("\\n", "\n")
def get_first_image(summary: str | None, content: str | None) -> str: # noqa: C901
def get_first_image(summary: str | None, content: str | Sequence[Content] | None) -> str: # noqa: C901
"""Get image from summary or content.
Args:
@ -204,7 +207,7 @@ def get_first_image(summary: str | None, content: str | None) -> str: # noqa: C
The first image
"""
def extract_string(data: str | list | tuple | None) -> str | None:
def extract_string(data: str | list | tuple | Sequence[Content] | None) -> str | None:
if not data:
return None
if isinstance(data, str):

View file

@ -3,18 +3,23 @@ from __future__ import annotations
import asyncio
import concurrent.futures
import datetime
import hashlib
import json
import logging
import os
import pprint
import re
from collections.abc import Callable
from contextlib import suppress
from typing import TYPE_CHECKING
from typing import Any
from typing import Literal
from typing import Protocol
from typing import cast
from urllib.parse import ParseResult
from urllib.parse import parse_qs
from urllib.parse import urlparse
import httpx
import tldextract
from discord_webhook import DiscordEmbed
from discord_webhook import DiscordWebhook
@ -32,6 +37,9 @@ from reader import FeedNotFoundError
from reader import Reader
from reader import ReaderError
from reader import StorageError
from reader.types import EntryUpdateStatus
from reader.types import UpdatedFeed
from requests import RequestException
from discord_rss_bot.custom_message import CustomEmbed
from discord_rss_bot.custom_message import get_custom_message
@ -50,6 +58,7 @@ from discord_rss_bot.settings import get_reader
if TYPE_CHECKING:
from collections.abc import Iterable
from reader._types import EntryData
from requests import Response
logger: logging.Logger = logging.getLogger(__name__)
@ -57,9 +66,53 @@ logger: logging.Logger = logging.getLogger(__name__)
type DeliveryMode = Literal["embed", "text", "screenshot"]
type ScreenshotLayout = Literal["desktop", "mobile"]
type ScreenshotFileType = Literal["png", "jpeg"]
type JsonValue = bool | int | float | str | list[JsonValue] | dict[str, JsonValue] | None
type JsonObject = dict[str, JsonValue]
type SentWebhookRecord = dict[str, JsonValue]
type UpdateCallback = Callable[[], UpdatedFeed | None]
class JsonResponseLike(Protocol):
"""Response interface needed for Discord webhook JSON parsing."""
@property
def status_code(self) -> int:
"""HTTP status code."""
...
@property
def text(self) -> str:
"""Response body decoded as text."""
...
@property
def content(self) -> bytes:
"""Raw response body."""
...
def json(self) -> JsonValue:
"""Decode response body as JSON.
Returns:
JsonValue: Decoded JSON response body.
"""
...
MAX_DISCORD_UPLOAD_BYTES: int = 8 * 1024 * 1024
JPEG_QUALITY_STEPS: tuple[int, ...] = (85, 70, 55, 40)
SENT_WEBHOOKS_TAG: str = "sent_webhooks"
SAVE_SENT_WEBHOOKS_TAG: str = "save_sent_webhooks"
MESSAGE_PAYLOAD_KEYS: tuple[str, ...] = (
"allowed_mentions",
"attachments",
"avatar_url",
"content",
"embeds",
"flags",
"tts",
"username",
)
def extract_domain(url: str) -> str: # noqa: PLR0911
@ -137,30 +190,12 @@ def send_entry_to_discord(entry: Entry, reader: Reader) -> str | None:
delivery_mode,
)
# Hoyolab/c3kay feeds use a custom embed only when embed mode is selected.
if delivery_mode == "embed" and is_c3kay_feed(entry.feed.url):
entry_link: str | None = entry.link
if entry_link:
post_id: str | None = extract_post_id_from_hoyolab_url(entry_link)
if post_id:
post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id)
if post_data:
webhook = create_hoyolab_webhook(webhook_url, entry, post_data)
execute_webhook(webhook, entry, reader=reader)
return None
logger.warning(
"Failed to create Hoyolab webhook for feed %s, falling back to regular processing",
entry.feed.url,
)
else:
logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url)
if delivery_mode == "embed":
webhook: DiscordWebhook = create_embed_webhook(webhook_url, entry, reader=reader)
elif delivery_mode == "screenshot":
webhook = create_screenshot_webhook(webhook_url, entry, reader=reader)
else:
webhook = create_text_webhook(webhook_url, entry, reader=reader, use_default_message_on_empty=False)
webhook, _delivery_mode = create_webhook_for_entry(
webhook_url,
entry,
reader,
use_default_message_on_empty=False,
)
execute_webhook(webhook, entry, reader=reader)
return None
@ -244,6 +279,502 @@ def get_screenshot_layout(reader: Reader, feed: Feed) -> ScreenshotLayout:
return "desktop"
def feed_saves_sent_webhooks(reader: Reader, feed: Feed | str) -> bool:
"""Return whether sent Discord webhook messages should be stored for a feed.
Missing tags default to enabled so existing feeds start tracking editable Discord messages.
"""
feed_url: str = feed.url if isinstance(feed, Feed) else str(feed)
try:
value = cast("JsonValue", reader.get_tag(feed, SAVE_SENT_WEBHOOKS_TAG, True))
except ReaderError:
logger.exception("Error getting %s tag for feed: %s", SAVE_SENT_WEBHOOKS_TAG, feed_url)
return True
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.strip().lower() not in {"0", "false", "no", "off", "disabled"}
if value is None:
return True
return bool(value)
def get_sent_webhook_records(reader: Reader) -> list[SentWebhookRecord]:
"""Get stored sent webhook records from the global reader tag.
Returns:
list[SentWebhookRecord]: Saved sent webhook records.
"""
raw_records = cast("JsonValue", reader.get_tag((), SENT_WEBHOOKS_TAG, []))
if not isinstance(raw_records, list):
return []
records: list[SentWebhookRecord] = [
cast("SentWebhookRecord", dict(raw_record)) for raw_record in raw_records if isinstance(raw_record, dict)
]
return records
def save_sent_webhook_records(reader: Reader, records: list[SentWebhookRecord]) -> None:
"""Save sent webhook records to the global reader tag."""
reader.set_tag((), SENT_WEBHOOKS_TAG, records) # pyright: ignore[reportArgumentType]
def get_webhook_message_payload(webhook: DiscordWebhook) -> JsonObject:
"""Return the Discord message payload used to compare saved messages.
The discord-webhook object also includes client/runtime fields in `json`; only fields that affect the Discord
message itself are persisted. Empty `content`, `embeds`, and `attachments` are kept so message edits can clear
stale content when a feed changes delivery mode.
Returns:
JsonObject: Normalized Discord message payload.
"""
raw_payload = cast("JsonValue", webhook.json)
if not isinstance(raw_payload, dict):
return {"content": "", "embeds": [], "attachments": []}
payload: JsonObject = {}
webhook_payload = cast("JsonObject", raw_payload)
for key in MESSAGE_PAYLOAD_KEYS:
if key in webhook_payload:
payload[key] = webhook_payload[key]
payload.setdefault("content", "")
payload.setdefault("embeds", [])
payload.setdefault("attachments", [])
return cast("JsonObject", json.loads(json.dumps(payload, default=str)))
def hash_webhook_payload(payload: JsonObject) -> str:
"""Hash a normalized Discord message payload.
Returns:
str: SHA-256 hash of the payload.
"""
normalized_payload: str = json.dumps(payload, sort_keys=True, separators=(",", ":"), default=str)
return hashlib.sha256(normalized_payload.encode()).hexdigest()
def json_value_to_int(value: JsonValue, default: int = 0) -> int:
"""Convert a simple JSON scalar to int.
Returns:
int: Converted integer, or default when the value is not scalar-convertible.
"""
if isinstance(value, bool):
return int(value)
if isinstance(value, int | float | str):
try:
return int(value)
except ValueError:
return default
return default
def get_response_json(response: JsonResponseLike) -> JsonObject:
"""Best-effort JSON extraction for requests/httpx response objects.
Returns:
JsonObject: Decoded JSON object, or an empty dict.
"""
try:
response_json = response.json()
except (AttributeError, TypeError, ValueError):
response_text: str = response.text
if not response_text:
response_content: bytes = response.content
if isinstance(response_content, bytes):
response_text = response_content.decode("utf-8", errors="ignore")
if not response_text:
return {}
try:
response_json = json.loads(response_text)
except json.JSONDecodeError:
return {}
return cast("JsonObject", dict(response_json)) if isinstance(response_json, dict) else {}
def get_discord_message_id_from_response(response_json: JsonObject, webhook: DiscordWebhook) -> str:
"""Get the Discord message id from a decoded webhook response.
Returns:
str: Discord message id, or an empty string.
"""
message_id: JsonValue = response_json.get("id")
if isinstance(message_id, str) and message_id:
return message_id
webhook_id: str | None = webhook.id if isinstance(webhook.id, str) else None
return webhook_id if isinstance(webhook_id, str) else ""
def get_discord_message_id(response: JsonResponseLike, webhook: DiscordWebhook) -> str:
"""Get the Discord message id returned by a webhook send/edit response.
Returns:
str: Discord message id, or an empty string.
"""
return get_discord_message_id_from_response(get_response_json(response), webhook)
def get_entry_timestamp(value: datetime.datetime | None) -> str:
"""Return an ISO timestamp when an entry datetime-like value is present.
Returns:
str: ISO timestamp, or an empty string.
"""
if value is not None:
return value.isoformat()
return ""
def upsert_sent_webhook_record(
reader: Reader,
entry: Entry,
webhook_url: str,
webhook: DiscordWebhook,
response: JsonResponseLike,
payload: JsonObject,
) -> None:
"""Store the Discord message id and rendered payload for a successfully sent entry."""
if not feed_saves_sent_webhooks(reader, entry.feed):
return
response_json: JsonObject = get_response_json(response)
message_id: str = get_discord_message_id_from_response(response_json, webhook)
if not message_id:
logger.debug("Discord response did not include a message id for entry %s; not storing webhook.", entry.id)
return
now: str = datetime.datetime.now(tz=datetime.UTC).isoformat()
payload_hash: str = hash_webhook_payload(payload)
delivery_mode: DeliveryMode = get_entry_delivery_mode(reader, entry)
record: SentWebhookRecord = {
"feed_url": entry.feed.url,
"feed_title": entry.feed.title or "",
"entry_id": entry.id,
"entry_title": entry.title or "",
"entry_link": entry.link or "",
"entry_updated": get_entry_timestamp(entry.updated),
"webhook_url": webhook_url,
"message_id": message_id,
"delivery_mode": delivery_mode,
"payload": payload,
"payload_hash": payload_hash,
"discord_response": response_json,
"response_text": response.text[:5000],
"first_sent_at": now,
"last_sent_at": now,
"last_updated_at": now,
"last_status_code": response.status_code,
"last_error": "",
"update_count": 0,
}
records: list[SentWebhookRecord] = get_sent_webhook_records(reader)
for index, existing_record in enumerate(records):
if (
existing_record.get("feed_url") == entry.feed.url
and existing_record.get("entry_id") == entry.id
and existing_record.get("webhook_url") == webhook_url
):
record["first_sent_at"] = existing_record.get("first_sent_at") or now
record["update_count"] = json_value_to_int(existing_record.get("update_count"))
records[index] = record
save_sent_webhook_records(reader, records)
return
records.append(record)
save_sent_webhook_records(reader, records)
def split_webhook_url_for_message_endpoint(webhook_url: str) -> tuple[str, str | None]:
"""Split a webhook URL into the base webhook endpoint and optional thread id.
Returns:
tuple[str, str | None]: Clean webhook URL and optional thread id.
"""
parsed_url = urlparse(webhook_url)
query = parse_qs(parsed_url.query)
thread_id_values: list[str] = query.get("thread_id", [])
thread_id: str | None = thread_id_values[0].strip() if thread_id_values else None
if not thread_id:
thread_id = None
clean_url: str = parsed_url._replace(query="", fragment="").geturl().rstrip("/")
return clean_url, thread_id
def edit_sent_webhook_message(
webhook_url: str,
message_id: str,
webhook: DiscordWebhook,
payload: JsonObject,
) -> Response | httpx.Response:
"""Edit an already-sent Discord webhook message.
Returns:
Response | httpx.Response: Discord API response.
"""
clean_webhook_url, thread_id = split_webhook_url_for_message_endpoint(webhook_url)
if getattr(webhook, "files", None):
webhook.url = clean_webhook_url
webhook.id = message_id
if thread_id:
webhook.thread_id = thread_id
return webhook.edit()
params: dict[str, str] = {"wait": "true"}
if thread_id:
params["thread_id"] = thread_id
timeout: int | float = cast("int | float", getattr(webhook, "timeout", None) or 30.0)
return httpx.patch(
f"{clean_webhook_url}/messages/{message_id}",
json=payload,
params=params,
timeout=timeout,
)
def create_webhook_for_entry(
webhook_url: str,
entry: Entry,
reader: Reader,
*,
use_default_message_on_empty: bool,
) -> tuple[DiscordWebhook, DeliveryMode]:
"""Create the Discord webhook payload for the entry's effective delivery mode.
Returns:
tuple[DiscordWebhook, DeliveryMode]: Rendered webhook object and delivery mode.
"""
delivery_mode: DeliveryMode = get_entry_delivery_mode(reader, entry)
if delivery_mode == "embed" and is_c3kay_feed(entry.feed.url):
entry_link: str | None = entry.link
if entry_link:
post_id: str | None = extract_post_id_from_hoyolab_url(entry_link)
if post_id:
post_data = fetch_hoyolab_post(post_id)
if post_data:
return create_hoyolab_webhook(webhook_url, entry, post_data), delivery_mode
logger.warning(
"Failed to create Hoyolab webhook for feed %s, falling back to regular processing",
entry.feed.url,
)
else:
logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url)
if delivery_mode == "embed":
return create_embed_webhook(webhook_url, entry, reader=reader), delivery_mode
if delivery_mode == "screenshot":
return create_screenshot_webhook(webhook_url, entry, reader=reader), delivery_mode
return (
create_text_webhook(
webhook_url,
entry,
reader=reader,
use_default_message_on_empty=use_default_message_on_empty,
),
delivery_mode,
)
def collect_modified_entries_during_update(reader: Reader, update_callback: UpdateCallback) -> list[tuple[str, str]]:
"""Run a reader update call and collect entries whose stored content was modified.
Returns:
list[tuple[str, str]]: Modified entry `(feed_url, entry_id)` pairs.
"""
modified_entries: list[tuple[str, str]] = []
hooks = reader.after_entry_update_hooks
def collect_modified_entry(_reader: Reader, entry: EntryData, status: EntryUpdateStatus) -> None:
status_value: str = getattr(status, "value", str(status))
if status == EntryUpdateStatus.MODIFIED or status_value == EntryUpdateStatus.MODIFIED.value:
modified_entries.append((entry.feed_url, entry.id))
hooks.append(collect_modified_entry)
try:
update_callback()
finally:
with suppress(ValueError):
hooks.remove(collect_modified_entry)
return list(dict.fromkeys(modified_entries))
def update_feeds_and_collect_modified_entries(
reader: Reader,
*,
scheduled: bool,
workers: int,
) -> list[tuple[str, str]]:
"""Update feeds and return reader entries whose stored content was modified.
Returns:
list[tuple[str, str]]: Modified entry `(feed_url, entry_id)` pairs.
"""
return collect_modified_entries_during_update(
reader,
lambda: reader.update_feeds(scheduled=scheduled, workers=workers),
)
def update_feed_and_collect_modified_entries(reader: Reader, feed: Feed | str) -> list[tuple[str, str]]:
"""Update one feed and return reader entries whose stored content was modified.
Returns:
list[tuple[str, str]]: Modified entry `(feed_url, entry_id)` pairs.
"""
return collect_modified_entries_during_update(reader, lambda: reader.update_feed(feed))
def update_sent_webhook_record_for_entry(
reader: Reader,
entry: Entry,
record: SentWebhookRecord,
) -> tuple[SentWebhookRecord, bool, bool]:
"""Edit one saved Discord webhook message record for an updated entry.
Returns:
tuple[SentWebhookRecord, bool, bool]: Updated record, whether it changed, and whether Discord was edited.
"""
webhook_url_value: JsonValue = record.get("webhook_url")
message_id_value: JsonValue = record.get("message_id")
if (
not isinstance(webhook_url_value, str)
or not isinstance(message_id_value, str)
or not webhook_url_value
or not message_id_value
):
return record, False, False
webhook, delivery_mode = create_webhook_for_entry(
webhook_url_value,
entry,
reader,
use_default_message_on_empty=True,
)
payload: JsonObject = get_webhook_message_payload(webhook)
payload_hash: str = hash_webhook_payload(payload)
if payload_hash == record.get("payload_hash"):
return record, False, False
now: str = datetime.datetime.now(tz=datetime.UTC).isoformat()
try:
response = edit_sent_webhook_message(webhook_url_value, message_id_value, webhook, payload)
except (AssertionError, RequestException, httpx.HTTPError, OSError, ValueError) as e:
logger.exception("Failed to edit Discord webhook message %s for entry %s", message_id_value, entry.id)
return (
{
**record,
"last_update_attempt_at": now,
"last_error": str(e),
},
True,
False,
)
status_code: int = response.status_code
response_json: JsonObject = get_response_json(response)
if status_code in {200, 204}:
return (
{
**record,
"feed_title": entry.feed.title or "",
"entry_title": entry.title or "",
"entry_link": entry.link or "",
"entry_updated": get_entry_timestamp(entry.updated),
"delivery_mode": delivery_mode,
"payload": payload,
"payload_hash": payload_hash,
"discord_response": response_json,
"response_text": response.text[:5000],
"last_updated_at": now,
"last_status_code": status_code,
"last_error": "",
"update_count": json_value_to_int(record.get("update_count")) + 1,
},
True,
True,
)
return (
{
**record,
"last_update_attempt_at": now,
"last_status_code": status_code,
"discord_response": response_json,
"response_text": response.text[:5000],
"last_error": response.text[:500],
},
True,
False,
)
def update_sent_webhooks_for_modified_entries( # noqa: C901
reader: Reader,
modified_entries: Iterable[tuple[str, str]],
) -> int:
"""Edit saved Discord webhook messages for modified reader entries.
Returns:
int: Number of Discord messages successfully edited.
"""
modified_entry_keys: set[tuple[str, str]] = set(modified_entries)
if not modified_entry_keys:
return 0
records: list[SentWebhookRecord] = get_sent_webhook_records(reader)
if not records:
return 0
records_changed: bool = False
updated_count: int = 0
for feed_url, entry_id in modified_entry_keys:
matching_record_indexes: list[int] = [
index
for index, record in enumerate(records)
if record.get("feed_url") == feed_url and record.get("entry_id") == entry_id
]
if not matching_record_indexes:
continue
try:
entry: Entry = reader.get_entry((feed_url, entry_id))
except (FeedNotFoundError, EntryNotFoundError):
logger.exception("Saved webhook entry no longer exists: %s %s", feed_url, entry_id)
continue
if not feed_saves_sent_webhooks(reader, entry.feed):
continue
for record_index in matching_record_indexes:
updated_record, record_changed, message_was_edited = update_sent_webhook_record_for_entry(
reader,
entry,
records[record_index],
)
if record_changed:
records[record_index] = updated_record
records_changed = True
if message_was_edited:
updated_count += 1
if records_changed:
save_sent_webhook_records(reader, records)
return updated_count
def create_text_webhook(
webhook_url: str,
entry: Entry,
@ -496,7 +1027,7 @@ def send_discord_quest_notification(entry: Entry, webhook_url: str, reader: Read
content=quest_url,
rate_limit_retry=True,
)
execute_webhook(webhook, entry, reader=reader)
execute_webhook(webhook, entry, reader=reader, save_sent_webhook=False)
# Iterate through the content of the entry
for content in entry.content:
@ -661,7 +1192,7 @@ def set_entry_as_read(reader: Reader, entry: Entry) -> None:
logger.exception("Error setting entry to read: %s", entry.id)
def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: # noqa: C901, PLR0912
def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None:
"""Send entries to Discord.
If response was not ok, we will log the error and mark the entry as unread, so it will be sent again next time.
@ -675,11 +1206,16 @@ def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, d
# Get the default reader if we didn't get a custom one.
effective_reader: Reader = get_reader() if reader is None else reader
# Check for new entries for every feed.
effective_reader.update_feeds(
# Check for new and modified entries for every feed.
modified_entries: list[tuple[str, str]] = update_feeds_and_collect_modified_entries(
effective_reader,
scheduled=True,
workers=os.cpu_count() or 1,
)
try:
update_sent_webhooks_for_modified_entries(effective_reader, modified_entries)
except (AssertionError, ReaderError, RequestException, httpx.HTTPError, OSError, ValueError):
logger.exception("Failed to update saved Discord webhooks for modified feed entries.")
# Loop through the unread entries.
entries: Iterable[Entry] = effective_reader.get_entries(feed=feed, read=False)
@ -695,42 +1231,17 @@ def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, d
logger.info("No webhook URL found for feed: %s", entry.feed.url)
continue
delivery_mode: DeliveryMode = get_entry_delivery_mode(effective_reader, entry)
if delivery_mode == "embed":
webhook = create_embed_webhook(webhook_url, entry, reader=effective_reader)
elif delivery_mode == "screenshot":
webhook = create_screenshot_webhook(webhook_url, entry, reader=effective_reader)
else:
webhook = create_text_webhook(
webhook_url,
entry,
reader=effective_reader,
use_default_message_on_empty=True,
)
decision = get_entry_filter_decision_from_reader(effective_reader, entry)
if not decision.should_send:
logger.info("Entry was skipped: %s (%s)", entry.id, decision.reason)
continue
# Use a custom webhook for Hoyolab feeds.
if is_c3kay_feed(entry.feed.url):
entry_link: str | None = entry.link
if entry_link:
post_id: str | None = extract_post_id_from_hoyolab_url(entry_link)
if post_id:
post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id)
if post_data:
webhook = create_hoyolab_webhook(webhook_url, entry, post_data)
execute_webhook(webhook, entry, reader=effective_reader)
return
logger.warning(
"Failed to create Hoyolab webhook for feed %s, falling back to regular processing",
entry.feed.url,
)
else:
logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url)
webhook, _delivery_mode = create_webhook_for_entry(
webhook_url,
entry,
effective_reader,
use_default_message_on_empty=True,
)
# Send the entry to Discord because the combined blacklist/whitelist decision allowed it.
execute_webhook(webhook, entry, reader=effective_reader)
@ -741,14 +1252,20 @@ def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, d
break
def execute_webhook(webhook: DiscordWebhook, entry: Entry, reader: Reader) -> None:
def execute_webhook(
webhook: DiscordWebhook,
entry: Entry,
reader: Reader,
*,
save_sent_webhook: bool = True,
) -> None:
"""Execute the webhook.
Args:
webhook (DiscordWebhook): The webhook to execute.
entry (Entry): The entry to send to Discord.
reader (Reader): The Reader instance to use for checking feed status.
save_sent_webhook: Whether to save the sent Discord message metadata for future edits.
"""
# If the feed has been paused or deleted, we will not send the entry to Discord.
entry_feed: Feed = entry.feed
@ -762,6 +1279,7 @@ def execute_webhook(webhook: DiscordWebhook, entry: Entry, reader: Reader) -> No
logger.warning("Feed not found in reader, not sending entry to Discord: %s", entry_feed.url)
return
payload: JsonObject = get_webhook_message_payload(webhook)
response: Response = webhook.execute()
logger.debug("Discord webhook response for entry %s: status=%s", entry.id, response.status_code)
if response.status_code not in {200, 204}:
@ -772,6 +1290,10 @@ def execute_webhook(webhook: DiscordWebhook, entry: Entry, reader: Reader) -> No
logger.error(msg)
else:
logger.info("Sent entry to Discord: %s", entry.id)
if save_sent_webhook:
webhook_url: str = get_webhook_url(reader, entry)
if webhook_url:
upsert_sent_webhook_record(reader, entry, webhook_url, webhook, response, payload)
def is_youtube_feed(feed_url: str) -> bool:
@ -868,6 +1390,9 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None:
# This is the webhook that will be used to send the feed to Discord.
reader.set_tag(clean_feed_url, "webhook", webhook_url) # pyright: ignore[reportArgumentType]
# Store sent Discord message ids by default so modified feed entries can edit the original webhook message.
reader.set_tag(clean_feed_url, SAVE_SENT_WEBHOOKS_TAG, True) # pyright: ignore[reportArgumentType]
# This is the default message that will be sent to Discord.
reader.set_tag(clean_feed_url, "custom_message", default_custom_message) # pyright: ignore[reportArgumentType]

View file

@ -28,7 +28,6 @@ import shutil
import subprocess # noqa: S404
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
if TYPE_CHECKING:
from reader import Reader
@ -37,11 +36,9 @@ logger: logging.Logger = logging.getLogger(__name__)
GIT_EXECUTABLE: str = shutil.which("git") or "git"
type TAG_VALUE = (
dict[str, str | int | float | bool | dict[str, Any] | list[Any] | None]
| list[str | int | float | bool | dict[str, Any] | list[Any] | None]
| None
)
type JsonValue = bool | int | float | str | list[JsonValue] | dict[str, JsonValue] | None
type JsonObject = dict[str, JsonValue]
type TagValue = JsonValue
# Tags that are exported per-feed (empty values are omitted).
_FEED_TAGS: tuple[str, ...] = (
@ -164,24 +161,24 @@ def export_state(reader: Reader, backup_path: Path) -> None:
reader: The :class:`reader.Reader` instance to read state from.
backup_path: Destination directory for the exported ``state.json``.
"""
feeds_state: list[dict] = []
feeds_state: list[JsonObject] = []
for feed in reader.get_feeds():
feed_data: dict = {"url": feed.url}
feed_data: JsonObject = {"url": feed.url}
for tag in _FEED_TAGS:
try:
value: TAG_VALUE = reader.get_tag(feed, tag, None)
value: TagValue = reader.get_tag(feed, tag, None)
if value is not None and value != "": # noqa: PLC1901
feed_data[tag] = value
except Exception:
logger.exception("Failed to read tag '%s' for feed '%s' during state export", tag, feed.url)
feeds_state.append(feed_data)
webhooks: list[str | int | float | bool | dict[str, Any] | list[Any] | None] = list(
webhooks: list[JsonValue] = list(
reader.get_tag((), "webhooks", []),
)
# Export global update interval if set
global_update_interval: dict[str, Any] | None = None
global_update_interval: JsonObject | None = None
global_update_config = reader.get_tag((), ".reader.update", None)
if isinstance(global_update_config, dict):
global_update_interval = global_update_config
@ -193,7 +190,7 @@ def export_state(reader: Reader, backup_path: Path) -> None:
if clean_layout in {"desktop", "mobile"}:
global_screenshot_layout = clean_layout
state: dict = {"feeds": feeds_state, "webhooks": webhooks}
state: JsonObject = {"feeds": feeds_state, "webhooks": webhooks}
if global_update_interval is not None:
state["global_update_interval"] = global_update_interval
if global_screenshot_layout is not None:

View file

@ -5,7 +5,7 @@ import json
import logging
import re
from typing import TYPE_CHECKING
from typing import Any
from typing import cast
import requests
from discord_webhook import DiscordEmbed
@ -17,6 +17,9 @@ if TYPE_CHECKING:
logger: logging.Logger = logging.getLogger(__name__)
type JsonValue = bool | int | float | str | list[JsonValue] | dict[str, JsonValue] | None
type JsonObject = dict[str, JsonValue]
def is_c3kay_feed(feed_url: str) -> bool:
"""Check if the feed is from c3kay.de.
@ -50,14 +53,23 @@ def extract_post_id_from_hoyolab_url(url: str) -> str | None:
return None
def fetch_hoyolab_post(post_id: str) -> dict[str, Any] | None:
def as_json_object(value: JsonValue) -> JsonObject:
"""Return value as a JSON object when possible.
Returns:
JsonObject: The input dict or an empty dict.
"""
return cast("JsonObject", value) if isinstance(value, dict) else {}
def fetch_hoyolab_post(post_id: str) -> JsonObject | None:
"""Fetch post data from the Hoyolab API.
Args:
post_id: The post ID to fetch.
Returns:
dict[str, Any] | None: The post data if successful, None otherwise.
JsonObject | None: The post data if successful, None otherwise.
"""
if not post_id:
return None
@ -68,9 +80,11 @@ def fetch_hoyolab_post(post_id: str) -> dict[str, Any] | None:
response: requests.Response = requests.get(url, timeout=10)
if response.status_code == http_ok:
data: dict[str, Any] = response.json()
if data.get("retcode") == 0 and "data" in data and "post" in data["data"]:
return data["data"]["post"]
data = cast("JsonObject", response.json())
data_payload: JsonObject = as_json_object(data.get("data"))
post_payload: JsonObject = as_json_object(data_payload.get("post"))
if data.get("retcode") == 0 and post_payload:
return post_payload
logger.warning("Failed to fetch Hoyolab post %s: %s", post_id, response.text)
except (requests.RequestException, ValueError):
@ -79,7 +93,7 @@ def fetch_hoyolab_post(post_id: str) -> dict[str, Any] | None:
return None
def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: dict[str, Any]) -> DiscordWebhook: # noqa: C901, PLR0912, PLR0914, PLR0915
def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: JsonObject) -> DiscordWebhook: # noqa: C901, PLR0912, PLR0914, PLR0915
"""Create a webhook with data from the Hoyolab API.
Args:
@ -94,22 +108,23 @@ def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: dict[str,
webhook = DiscordWebhook(url=webhook_url, rate_limit_retry=True)
# Extract relevant data from the post
post: dict[str, Any] = post_data.get("post", {})
subject: str = post.get("subject", "")
content: str = post.get("content", "{}")
post: JsonObject = as_json_object(post_data.get("post"))
subject: str = str(post.get("subject", ""))
content: str = str(post.get("content", "{}"))
logger.debug("Post subject: %s", subject)
logger.debug("Post content: %s", content)
content_data: dict[str, str] = {}
content_data: JsonObject = {}
with contextlib.suppress(json.JSONDecodeError, ValueError):
content_data = json.loads(content)
loaded_content = cast("JsonValue", json.loads(content))
content_data = as_json_object(loaded_content)
logger.debug("Content data: %s", content_data)
description: str = content_data.get("describe", "")
description: str = str(content_data.get("describe", ""))
if not description:
description = post.get("desc", "")
description = str(post.get("desc", ""))
# Create the embed
discord_embed = DiscordEmbed()
@ -119,7 +134,12 @@ def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: dict[str,
discord_embed.set_url(entry_link)
# Get post.image_list
image_list: list[dict[str, Any]] = post_data.get("image_list", [])
image_list_value: JsonValue = post_data.get("image_list", [])
image_list: list[JsonObject] = (
[cast("JsonObject", item) for item in image_list_value if isinstance(item, dict)]
if isinstance(image_list_value, list)
else []
)
if image_list:
image_url: str = str(image_list[0].get("url", ""))
image_height: int = int(image_list[0].get("height", 1080))
@ -128,7 +148,7 @@ def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: dict[str,
logger.debug("Image URL: %s, Height: %s, Width: %s", image_url, image_height, image_width)
discord_embed.set_image(url=image_url, height=image_height, width=image_width)
video: dict[str, str | int | bool] = post_data.get("video", {})
video: JsonObject = as_json_object(post_data.get("video"))
if video and video.get("url"):
video_url: str = str(video.get("url", ""))
logger.debug("Video URL: %s", video_url)
@ -140,20 +160,20 @@ def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: dict[str,
filename=f"{entry.id}.mp4",
)
game = post_data.get("game", {})
game: JsonObject = as_json_object(post_data.get("game"))
if game and game.get("color"):
game_color = str(game.get("color", ""))
discord_embed.set_color(game_color.removeprefix("#"))
user: dict[str, str | int | bool] = post_data.get("user", {})
user: JsonObject = as_json_object(post_data.get("user"))
author_name: str = str(user.get("nickname", ""))
avatar_url: str = str(user.get("avatar_url", ""))
if author_name:
webhook.avatar_url = avatar_url
webhook.username = author_name
classification = post_data.get("classification", {})
classification: JsonObject = as_json_object(post_data.get("classification"))
if classification and classification.get("name"):
footer = str(classification.get("name", ""))
@ -162,13 +182,19 @@ def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: dict[str,
webhook.add_embed(discord_embed)
# Only show Youtube URL if available
structured_content: str = post.get("structured_content", "")
structured_content: str = str(post.get("structured_content", ""))
if structured_content: # noqa: PLR1702
try:
structured_content_data: list[dict[str, Any]] = json.loads(structured_content)
loaded_structured_content = cast("JsonValue", json.loads(structured_content))
structured_content_data: list[JsonObject] = (
[cast("JsonObject", item) for item in loaded_structured_content if isinstance(item, dict)]
if isinstance(loaded_structured_content, list)
else []
)
for item in structured_content_data:
if item.get("insert") and isinstance(item["insert"], dict):
video_url: str = str(item["insert"].get("video", ""))
insert: JsonObject = as_json_object(item.get("insert"))
if insert:
video_url: str = str(insert.get("video", ""))
if video_url:
video_id_match: re.Match[str] | None = re.search(r"embed/([a-zA-Z0-9_-]+)", video_url)
if video_id_match:
@ -180,15 +206,15 @@ def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: dict[str,
except (json.JSONDecodeError, ValueError) as e:
logger.warning("Error parsing structured content: %s", e)
event_start_date: str = post.get("event_start_date", "")
event_start_date: str = str(post.get("event_start_date", ""))
if event_start_date and event_start_date != "0":
discord_embed.add_embed_field(name="Start", value=f"<t:{event_start_date}:R>")
event_end_date: str = post.get("event_end_date", "")
event_end_date: str = str(post.get("event_end_date", ""))
if event_end_date and event_end_date != "0":
discord_embed.add_embed_field(name="End", value=f"<t:{event_end_date}:R>")
created_at: str = post.get("created_at", "")
created_at: str = str(post.get("created_at", ""))
if created_at and created_at != "0":
discord_embed.set_timestamp(timestamp=created_at)

View file

@ -15,7 +15,7 @@ from html import escape
from html import unescape
from typing import TYPE_CHECKING
from typing import Annotated
from typing import Any
from typing import TypedDict
from typing import cast
import httpx
@ -51,12 +51,18 @@ from discord_rss_bot.custom_message import get_embed
from discord_rss_bot.custom_message import get_first_image
from discord_rss_bot.custom_message import replace_tags_in_text_message
from discord_rss_bot.custom_message import save_embed
from discord_rss_bot.feeds import SAVE_SENT_WEBHOOKS_TAG
from discord_rss_bot.feeds import SentWebhookRecord
from discord_rss_bot.feeds import create_feed
from discord_rss_bot.feeds import extract_domain
from discord_rss_bot.feeds import feed_saves_sent_webhooks
from discord_rss_bot.feeds import get_feed_delivery_mode
from discord_rss_bot.feeds import get_screenshot_layout
from discord_rss_bot.feeds import get_sent_webhook_records
from discord_rss_bot.feeds import send_entry_to_discord
from discord_rss_bot.feeds import send_to_discord
from discord_rss_bot.feeds import update_feed_and_collect_modified_entries
from discord_rss_bot.feeds import update_sent_webhooks_for_modified_entries
from discord_rss_bot.filter.evaluator import FILTER_FIELDS
from discord_rss_bot.filter.evaluator import EntryFilterDecision
from discord_rss_bot.filter.evaluator import FilterMatch
@ -78,7 +84,41 @@ if TYPE_CHECKING:
from reader.types import JSONType
LOGGING_CONFIG: dict[str, Any] = {
class PreviewFieldRow(TypedDict):
label: str
value_html: str
badges: list[dict[str, str]]
class FilterPreviewRow(TypedDict):
entry: Entry
decision: EntryFilterDecision
field_rows: list[PreviewFieldRow]
published_label: str
status_label: str
status_class: str
first_image: str
class FilterPreviewSummary(TypedDict):
total: int
sent: int
skipped: int
blacklist_matches: int
whitelist_matches: int
class FilterPreviewContext(TypedDict):
filter_name: str
filter_label: str
preview_rendered_count: int
preview_rows: list[FilterPreviewRow]
preview_limit: int
preview_summary: FilterPreviewSummary
preview_helper_text: str
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
@ -696,7 +736,7 @@ def build_filter_preview_context(
feed: Feed,
filter_name: str,
form_values: dict[str, str] | None = None,
) -> dict[str, Any]:
) -> FilterPreviewContext:
"""Build preview data for the blacklist and whitelist pages.
Args:
@ -706,7 +746,7 @@ def build_filter_preview_context(
form_values: Optional unsaved values from the current form.
Returns:
dict[str, Any]: Preview context for template rendering.
FilterPreviewContext: Preview context for template rendering.
"""
saved_blacklist_values: dict[str, str] = get_filter_values_from_reader(reader, feed, "blacklist")
saved_whitelist_values: dict[str, str] = get_filter_values_from_reader(reader, feed, "whitelist")
@ -724,7 +764,7 @@ def build_filter_preview_context(
helper_text = "Saved blacklist rules still apply while previewing whitelist changes."
preview_entries: list[Entry] = list(reader.get_entries(feed=feed, limit=FILTER_PREVIEW_LIMIT))
preview_rows: list[dict[str, Any]] = []
preview_rows: list[FilterPreviewRow] = []
preview_decisions: dict[str, EntryFilterDecision] = {}
sent_count = 0
skipped_count = 0
@ -782,7 +822,7 @@ def build_filter_preview_context(
}
def build_preview_field_rows(entry: Entry, decision: EntryFilterDecision) -> list[dict[str, Any]]:
def build_preview_field_rows(entry: Entry, decision: EntryFilterDecision) -> list[PreviewFieldRow]:
"""Build labeled preview fields for the filter UI.
Args:
@ -790,10 +830,10 @@ def build_preview_field_rows(entry: Entry, decision: EntryFilterDecision) -> lis
decision: The final decision for the entry.
Returns:
list[dict[str, Any]]: Labeled field rows for the preview template.
list[PreviewFieldRow]: Labeled field rows for the preview template.
"""
entry_fields: dict[str, str] = get_entry_fields(entry)
field_rows: list[dict[str, Any]] = []
field_rows: list[PreviewFieldRow] = []
for field_name in ("title", "author", "summary", "content"):
badges: list[dict[str, str]] = []
@ -1284,6 +1324,34 @@ async def post_use_screenshot_desktop(
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.post("/set_feed_save_sent_webhooks")
async def post_set_feed_save_sent_webhooks(
feed_url: Annotated[str, Form()],
enabled: Annotated[str, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
) -> RedirectResponse:
"""Set whether a feed stores sent Discord webhook message records.
Returns:
RedirectResponse: Redirect to the specified feed page.
Raises:
HTTPException: If Feed does not exists.
"""
clean_feed_url: str = feed_url.strip()
should_save: bool = enabled.strip().lower() in {"1", "true", "yes", "on", "enabled"}
try:
reader.get_feed(clean_feed_url)
except FeedNotFoundError as e:
raise HTTPException(status_code=404, detail="Feed not found") from e
reader.set_tag(clean_feed_url, SAVE_SENT_WEBHOOKS_TAG, should_save) # pyright: ignore[reportArgumentType]
action: str = "Enable" if should_save else "Disable"
commit_state_change(reader, f"{action} sent webhook storage for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.post("/set_update_interval")
async def post_set_update_interval(
feed_url: Annotated[str, Form()],
@ -1600,6 +1668,7 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
"webhooks": webhooks,
"current_webhook_url": current_webhook_url,
"current_webhook_name": current_webhook_name,
"save_sent_webhooks": feed_saves_sent_webhooks(reader, feed),
}
return templates.TemplateResponse(request=request, name="feed.html", context=context)
@ -1659,6 +1728,7 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
"webhooks": webhooks,
"current_webhook_url": current_webhook_url,
"current_webhook_name": current_webhook_name,
"save_sent_webhooks": feed_saves_sent_webhooks(reader, feed),
}
return templates.TemplateResponse(request=request, name="feed.html", context=context)
@ -1926,6 +1996,50 @@ async def get_webhooks(
return templates.TemplateResponse(request=request, name="webhooks.html", context=context)
@app.get("/sent_webhooks", response_class=HTMLResponse)
async def get_sent_webhooks(
request: Request,
reader: Annotated[Reader, Depends(get_reader_dependency)],
feed_url: str = "",
webhook_url: str = "",
) -> HTMLResponse:
"""View sent Discord webhook messages saved for future edits.
Returns:
sent_webhooks.html HTML
"""
clean_feed_url: str = urllib.parse.unquote(feed_url.strip())
clean_webhook_url: str = urllib.parse.unquote(webhook_url.strip())
records: list[SentWebhookRecord] = get_sent_webhook_records(reader)
if clean_feed_url:
records = [record for record in records if record.get("feed_url") == clean_feed_url]
if clean_webhook_url:
records = [record for record in records if record.get("webhook_url") == clean_webhook_url]
records.sort(
key=lambda record: str(record.get("last_updated_at") or record.get("last_sent_at") or ""),
reverse=True,
)
webhooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(reader.get_tag((), "webhooks", [])))
webhook_names: dict[str, str] = {
hook.get("url", ""): hook.get("name", "") for hook in webhooks if isinstance(hook, dict)
}
feed_titles: dict[str, str] = {feed.url: (feed.title or feed.url) for feed in reader.get_feeds()}
context = {
"request": request,
"records": records,
"total_records": len(records),
"feed_url": clean_feed_url,
"webhook_url": clean_webhook_url,
"webhook_names": webhook_names,
"feed_titles": feed_titles,
}
return templates.TemplateResponse(request=request, name="sent_webhooks.html", context=context)
@app.get("/", response_class=HTMLResponse)
def get_index(
request: Request,
@ -2040,10 +2154,16 @@ async def update_feed(
HTTPException: If the feed is not found.
"""
try:
reader.update_feed(urllib.parse.unquote(feed_url))
clean_feed_url: str = urllib.parse.unquote(feed_url)
modified_entries: list[tuple[str, str]] = update_feed_and_collect_modified_entries(reader, clean_feed_url)
except FeedNotFoundError as e:
raise HTTPException(status_code=404, detail="Feed not found") from e
try:
update_sent_webhooks_for_modified_entries(reader, modified_entries)
except (AssertionError, ReaderError, httpx.HTTPError, OSError, ValueError):
logger.exception("Failed to update saved Discord webhooks for manually updated feed: %s", feed_url)
logger.info("Manually updated feed: %s", feed_url)
return RedirectResponse(url="/feed?feed_url=" + urllib.parse.quote(feed_url), status_code=303)

View file

@ -65,6 +65,14 @@ body {
word-break: break-word;
}
.sent-webhooks__entry,
.sent-webhooks__preview {
min-width: 14rem;
max-width: 28rem;
overflow-wrap: anywhere;
word-break: break-word;
}
.filter-page__sidebar {
height: 100%;
}

View file

@ -214,6 +214,23 @@
{% else %}
<p class="text-muted mb-0">Add a webhook first to attach this feed.</p>
{% endif %}
<div class="mt-3 d-flex flex-wrap align-items-center gap-2">
<span class="badge {{ 'bg-success' if save_sent_webhooks else 'bg-secondary' }}">
Sent webhook tracking:
{{ 'Enabled' if save_sent_webhooks else 'Disabled' }}
</span>
<form action="/set_feed_save_sent_webhooks" method="post" class="d-inline">
<input type="hidden" name="feed_url" value="{{ feed.url }}" />
<input type="hidden"
name="enabled"
value="{{ 'false' if save_sent_webhooks else 'true' }}" />
<button class="btn btn-outline-light btn-sm" type="submit">
{{ 'Disable saved webhook updates' if save_sent_webhooks else 'Enable saved webhook updates' }}
</button>
</form>
<a class="btn btn-outline-info btn-sm"
href="/sent_webhooks?feed_url={{ feed.url|encode_url }}">View sent webhooks</a>
</div>
</section>
<section class="mt-4 pt-3 border-top border-secondary-subtle">
<h3 class="h6 text-uppercase text-muted mb-3">Feed Information</h3>

View file

@ -22,6 +22,10 @@
<a class="nav-link" href="/webhooks">Webhooks</a>
</li>
<li class="nav-item nav-link d-none d-md-block">|</li>
<li class="nav-item">
<a class="nav-link" href="/sent_webhooks">Sent webhooks</a>
</li>
<li class="nav-item nav-link d-none d-md-block">|</li>
<li class="nav-item">
<a class="nav-link" href="/settings">Settings</a>
</li>

View file

@ -0,0 +1,97 @@
{% extends "base.html" %}
{% block title %}
Sent Webhooks | discord-rss-bot
{% endblock title %}
{% block description %}
Review Discord webhook messages saved for future RSS entry edits.
{% endblock description %}
{% block content %}
<div class="d-flex flex-wrap justify-content-between align-items-start gap-3 mb-3">
<div>
<h2 class="h3 mb-1">Sent webhooks</h2>
<p class="text-muted mb-0">
{{ total_records }} saved message{{ '' if total_records == 1 else 's' }}
{% if feed_url %}for {{ feed_titles.get(feed_url, feed_url) }}{% endif %}
{% if webhook_url %}via {{ webhook_names.get(webhook_url) or 'selected webhook' }}{% endif %}
</p>
</div>
<a class="btn btn-outline-light btn-sm" href="/webhooks">All webhooks</a>
</div>
{% if records %}
<div class="table-responsive">
<table class="table table-dark table-striped align-middle">
<thead>
<tr>
<th scope="col">Entry</th>
<th scope="col">Webhook</th>
<th scope="col">Discord response</th>
<th scope="col">Mode</th>
<th scope="col">Updated</th>
<th scope="col">Preview</th>
</tr>
</thead>
<tbody>
{% for record in records %}
<tr>
<td class="sent-webhooks__entry">
<a class="text-muted"
href="/feed?feed_url={{ record.feed_url|encode_url }}">
{{ feed_titles.get(record.feed_url, record.feed_title or record.feed_url) }}
</a>
<div>
{% if record.entry_link %}
<a class="text-light" href="{{ record.entry_link }}">{{ record.entry_title or record.entry_id }}</a>
{% else %}
<span class="text-light">{{ record.entry_title or record.entry_id }}</span>
{% endif %}
</div>
<code class="text-muted">{{ record.entry_id }}</code>
</td>
<td>
<span class="text-muted">{{ webhook_names.get(record.webhook_url) or 'Stored webhook' }}</span>
</td>
<td>
<div class="mb-1">
<span class="badge bg-secondary">HTTP {{ record.last_status_code or 'unknown' }}</span>
</div>
<div class="small">
Message:
<code>{{ record.message_id }}</code>
</div>
{% if record.discord_response %}
<details class="mt-2">
<summary class="text-muted small">Response JSON</summary>
<pre class="mb-0 mt-1 feed-page__pre">{{ record.discord_response|tojson(indent=2) }}</pre>
</details>
{% elif record.response_text %}
<pre class="mb-0 mt-2 feed-page__pre">{{ record.response_text }}</pre>
{% else %}
<div class="text-muted small mt-1">No saved response body</div>
{% endif %}
{% if record.last_error %}<div class="text-warning small">{{ record.last_error }}</div>{% endif %}
</td>
<td>
<span class="badge bg-secondary">{{ record.delivery_mode or 'unknown' }}</span>
{% if record.update_count %}
<span class="badge bg-info">{{ record.update_count }} edit{{ '' if record.update_count == 1 else 's' }}</span>
{% endif %}
</td>
<td class="text-muted small">{{ record.last_updated_at or record.last_sent_at or 'Never' }}</td>
<td class="sent-webhooks__preview">
{% if record.payload and record.payload.content %}
<pre class="mb-0 feed-page__pre">{{ record.payload.content }}</pre>
{% elif record.payload and record.payload.embeds %}
<span class="text-muted">{{ record.payload.embeds|length }} embed{{ '' if record.payload.embeds|length == 1 else 's' }}</span>
{% else %}
<span class="text-muted">No text payload</span>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% else %}
<div class="alert alert-info" role="alert">No sent webhooks have been saved yet.</div>
{% endif %}
{% endblock content %}

View file

@ -8,14 +8,27 @@ import warnings
from contextlib import suppress
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
from typing import Protocol
from typing import cast
from bs4 import MarkupResemblesLocatorWarning
if TYPE_CHECKING:
from types import ModuleType
import pytest
class CachedReaderFactory(Protocol):
"""Reader factory with lru_cache controls used by tests."""
def __call__(self) -> None:
"""Create or return the cached reader."""
def cache_clear(self) -> None:
"""Clear the cached reader."""
def pytest_addoption(parser: pytest.Parser) -> None:
"""Register custom command-line options for optional integration tests."""
parser.addoption(
@ -44,21 +57,23 @@ def pytest_sessionstart(session: pytest.Session) -> None:
# If modules were imported before this hook (unlikely), force them to use
# the worker-specific location.
settings_module: Any = sys.modules.get("discord_rss_bot.settings")
settings_module: ModuleType | None = sys.modules.get("discord_rss_bot.settings")
if settings_module is not None:
settings_module.data_dir = str(worker_data_dir)
get_reader: Any = getattr(settings_module, "get_reader", None)
if get_reader is not None and hasattr(get_reader, "cache_clear"):
get_reader_attr = getattr(settings_module, "get_reader", None)
if get_reader_attr is not None and hasattr(get_reader_attr, "cache_clear"):
get_reader = cast("CachedReaderFactory", get_reader_attr)
get_reader.cache_clear()
main_module: Any = sys.modules.get("discord_rss_bot.main")
main_module: ModuleType | None = sys.modules.get("discord_rss_bot.main")
if main_module is not None and settings_module is not None:
with suppress(Exception):
current_reader = getattr(main_module, "reader", None)
if current_reader is not None:
current_reader.close()
get_reader: Any = getattr(settings_module, "get_reader", None)
if callable(get_reader):
get_reader_attr = getattr(settings_module, "get_reader", None)
if callable(get_reader_attr):
get_reader = cast("CachedReaderFactory", get_reader_attr)
get_reader()

View file

@ -3,6 +3,8 @@ from __future__ import annotations
import asyncio
import os
import tempfile
from datetime import UTC
from datetime import datetime
from pathlib import Path
from typing import LiteralString
from unittest.mock import MagicMock
@ -17,6 +19,7 @@ from reader import StorageError
from reader import make_reader
from discord_rss_bot import feeds
from discord_rss_bot.feeds import JsonObject
from discord_rss_bot.feeds import capture_full_page_screenshot
from discord_rss_bot.feeds import create_feed
from discord_rss_bot.feeds import create_screenshot_webhook
@ -354,6 +357,19 @@ def test_create_feed_inherits_global_text_delivery_mode() -> None:
reader.set_tag.assert_any_call("https://example.com/feed.xml", "should_send_embed", False)
def test_create_feed_enables_sent_webhook_tracking_by_default() -> None:
reader = MagicMock()
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
"webhooks": [{"name": "Main", "url": "https://discord.com/api/webhooks/123/abc"}],
"screenshot_layout": "desktop",
"delivery_mode": "embed",
}.get(key, default)
create_feed(reader, "https://example.com/feed.xml", "Main")
reader.set_tag.assert_any_call("https://example.com/feed.xml", feeds.SAVE_SENT_WEBHOOKS_TAG, True)
def test_create_feed_falls_back_to_embed_when_global_delivery_mode_is_invalid() -> None:
reader = MagicMock()
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
@ -882,3 +898,191 @@ def test_execute_webhook_logs_info_on_success(mock_logger: MagicMock) -> None:
execute_webhook(webhook, entry, reader)
mock_logger.info.assert_called_once_with("Sent entry to Discord: %s", "entry-9")
def test_execute_webhook_records_sent_webhook_message() -> None:
webhook_url = "https://discord.com/api/webhooks/123/abc"
state: dict[str, feeds.JsonValue] = {}
def get_tag(_resource: str | tuple[()], key: str, default: feeds.JsonValue = None) -> feeds.JsonValue:
if key == feeds.SENT_WEBHOOKS_TAG:
return state.get(feeds.SENT_WEBHOOKS_TAG, default)
if key == feeds.SAVE_SENT_WEBHOOKS_TAG:
return True
if key == "webhook":
return webhook_url
if key == "delivery_mode":
return "text"
return default
def set_tag(_resource: str | tuple[()], key: str, value: feeds.JsonValue) -> None:
state[key] = value
reader = MagicMock()
reader.get_tag.side_effect = get_tag
reader.set_tag.side_effect = set_tag
entry = MagicMock()
entry.id = "entry-1"
entry.title = "Entry title"
entry.link = "https://example.com/entry-1"
entry.updated = datetime(2026, 5, 8, tzinfo=UTC)
entry.feed_url = "https://example.com/feed.xml"
entry.feed.url = "https://example.com/feed.xml"
entry.feed.title = "Example feed"
entry.feed.updates_enabled = True
webhook = MagicMock()
webhook.json = {"content": "Entry title", "embeds": [], "attachments": []}
response = MagicMock()
response.status_code = 200
response.text = '{"id": "message-1"}'
response.json.return_value = {"id": "message-1"}
webhook.execute.return_value = response
execute_webhook(webhook, entry, reader)
records = state[feeds.SENT_WEBHOOKS_TAG]
assert isinstance(records, list)
assert len(records) == 1
assert isinstance(records[0], dict)
assert records[0]["feed_url"] == "https://example.com/feed.xml"
assert records[0]["entry_id"] == "entry-1"
assert records[0]["webhook_url"] == webhook_url
assert records[0]["message_id"] == "message-1"
assert records[0]["last_status_code"] == 200
assert records[0]["discord_response"] == {"id": "message-1"}
assert records[0]["response_text"] == '{"id": "message-1"}'
assert isinstance(records[0]["payload"], dict)
assert records[0]["payload"]["content"] == "Entry title"
def test_execute_webhook_does_not_record_when_feed_tracking_disabled() -> None:
webhook_url = "https://discord.com/api/webhooks/123/abc"
reader = MagicMock()
reader.get_tag.side_effect = lambda _resource, key, default=None: {
feeds.SAVE_SENT_WEBHOOKS_TAG: False,
"webhook": webhook_url,
}.get(key, default)
entry = MagicMock()
entry.id = "entry-2"
entry.feed_url = "https://example.com/feed.xml"
entry.feed.url = "https://example.com/feed.xml"
entry.feed.updates_enabled = True
webhook = MagicMock()
webhook.json = {"content": "Entry title", "embeds": [], "attachments": []}
response = MagicMock()
response.status_code = 200
response.text = '{"id": "message-2"}'
response.json.return_value = {"id": "message-2"}
webhook.execute.return_value = response
execute_webhook(webhook, entry, reader)
reader.set_tag.assert_not_called()
@patch("discord_rss_bot.feeds.edit_sent_webhook_message")
@patch("discord_rss_bot.feeds.create_webhook_for_entry")
def test_update_sent_webhooks_for_modified_entries_edits_changed_payload(
mock_create_webhook_for_entry: MagicMock,
mock_edit_sent_webhook_message: MagicMock,
) -> None:
webhook_url = "https://discord.com/api/webhooks/123/abc"
old_payload: JsonObject = {"content": "Old title", "embeds": [], "attachments": []}
state: dict[str, feeds.JsonValue] = {
feeds.SENT_WEBHOOKS_TAG: [
{
"feed_url": "https://example.com/feed.xml",
"entry_id": "entry-3",
"webhook_url": webhook_url,
"message_id": "message-3",
"payload": old_payload,
"payload_hash": feeds.hash_webhook_payload(old_payload),
"update_count": 0,
}, # pyright: ignore[reportAssignmentType, reportArgumentType]
],
}
def get_tag(_resource: str | tuple[()], key: str, default: feeds.JsonValue = None) -> feeds.JsonValue:
if key == feeds.SENT_WEBHOOKS_TAG:
return state[feeds.SENT_WEBHOOKS_TAG]
if key == feeds.SAVE_SENT_WEBHOOKS_TAG:
return True
return default
def set_tag(_resource: str | tuple[()], key: str, value: feeds.JsonValue) -> None:
state[key] = value
entry = MagicMock()
entry.id = "entry-3"
entry.title = "New title"
entry.link = "https://example.com/entry-3"
entry.updated = datetime(2026, 5, 8, tzinfo=UTC)
entry.feed.url = "https://example.com/feed.xml"
entry.feed.title = "Example feed"
reader = MagicMock()
reader.get_tag.side_effect = get_tag
reader.set_tag.side_effect = set_tag
reader.get_entry.return_value = entry
webhook = MagicMock()
webhook.json = {"content": "New title", "embeds": [], "attachments": []}
mock_create_webhook_for_entry.return_value = (webhook, "text")
response = MagicMock()
response.status_code = 200
response.text = '{"id": "message-3"}'
response.json.return_value = {"id": "message-3"}
mock_edit_sent_webhook_message.return_value = response
updated_count = feeds.update_sent_webhooks_for_modified_entries(
reader,
[("https://example.com/feed.xml", "entry-3")],
)
assert updated_count == 1
mock_edit_sent_webhook_message.assert_called_once()
records = state[feeds.SENT_WEBHOOKS_TAG]
assert isinstance(records, list)
assert isinstance(records[0], dict)
assert isinstance(records[0]["payload"], dict)
assert records[0]["payload"]["content"] == "New title"
assert records[0]["discord_response"] == {"id": "message-3"}
assert records[0]["response_text"] == '{"id": "message-3"}'
assert records[0]["update_count"] == 1
assert not records[0]["last_error"]
def test_update_feeds_and_collect_modified_entries_only_returns_modified_entries() -> None:
class StubReader:
def __init__(self) -> None:
self.after_entry_update_hooks = []
def update_feeds(self, *, scheduled: bool, workers: int) -> None:
assert scheduled is True
assert workers == 1
new_entry = MagicMock()
new_entry.feed_url = "https://example.com/feed.xml"
new_entry.id = "new"
modified_entry = MagicMock()
modified_entry.feed_url = "https://example.com/feed.xml"
modified_entry.id = "modified"
for hook in list(self.after_entry_update_hooks):
hook(self, new_entry, feeds.EntryUpdateStatus.NEW)
hook(self, modified_entry, feeds.EntryUpdateStatus.MODIFIED)
reader = StubReader()
modified_entries: list[tuple[str, str]] = feeds.update_feeds_and_collect_modified_entries(
reader, # pyright: ignore[reportArgumentType]
scheduled=True,
workers=1,
)
assert modified_entries == [("https://example.com/feed.xml", "modified")]
assert reader.after_entry_update_hooks == []

View file

@ -6,13 +6,15 @@ import shutil
import subprocess # noqa: S404
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
from typing import cast
from unittest.mock import MagicMock
from unittest.mock import patch
import pytest
from fastapi.testclient import TestClient
from discord_rss_bot.git_backup import JsonObject
from discord_rss_bot.git_backup import JsonValue
from discord_rss_bot.git_backup import commit_state_change
from discord_rss_bot.git_backup import export_state
from discord_rss_bot.git_backup import get_backup_path
@ -172,7 +174,7 @@ def test_export_state_creates_state_json(tmp_path: Path) -> None:
feed_or_key: tuple | str,
tag: str | None = None,
default: str | None = None,
) -> list[Any] | str | None:
) -> list[JsonValue] | str | None:
if feed_or_key == () and tag is None:
# Called for global webhooks list
return []
@ -191,7 +193,7 @@ def test_export_state_creates_state_json(tmp_path: Path) -> None:
state_file: Path = backup_path / "state.json"
assert state_file.exists(), "state.json should be created by export_state"
data: dict[str, Any] = json.loads(state_file.read_text(encoding="utf-8"))
data = cast("JsonObject", json.loads(state_file.read_text(encoding="utf-8")))
assert "feeds" in data
assert "webhooks" in data
assert data["feeds"][0]["url"] == "https://example.com/feed.rss"
@ -209,7 +211,7 @@ def test_export_state_omits_empty_tags(tmp_path: Path) -> None:
feed_or_key: tuple | str,
tag: str | None = None,
default: str | None = None,
) -> list[Any] | str | None:
) -> list[JsonValue] | str | None:
if feed_or_key == ():
return []
@ -222,7 +224,7 @@ def test_export_state_omits_empty_tags(tmp_path: Path) -> None:
backup_path.mkdir()
export_state(mock_reader, backup_path)
data: dict[str, Any] = json.loads((backup_path / "state.json").read_text())
data = cast("JsonObject", json.loads((backup_path / "state.json").read_text()))
# Only "url" key should be present (no empty-value tags)
assert list(data["feeds"][0].keys()) == ["url"]
@ -570,7 +572,7 @@ def test_embed_backup_end_to_end(monkeypatch: pytest.MonkeyPatch, tmp_path: Path
# Verify state.json contains embed data
state_file: Path = backup_path / "state.json"
assert state_file.exists(), "state.json should exist in backup repo"
state_data: dict[str, Any] = json.loads(state_file.read_text(encoding="utf-8"))
state_data = cast("JsonObject", json.loads(state_file.read_text(encoding="utf-8")))
# Find our test feed in the state
test_feed_data = next((feed for feed in state_data["feeds"] if feed["url"] == test_feed_url), None)

View file

@ -15,6 +15,7 @@ from unittest.mock import patch
from fastapi.testclient import TestClient
import discord_rss_bot.main as main_module
from discord_rss_bot import feeds
from discord_rss_bot.main import app
from discord_rss_bot.main import create_html_for_feed
from discord_rss_bot.main import get_reader_dependency
@ -31,6 +32,8 @@ client: TestClient = TestClient(app)
webhook_name: str = "Hello, I am a webhook!"
webhook_url: str = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz"
feed_url: str = "https://lovinator.space/rss_test.xml"
type TestTagValue = str | bool | int | list[dict[str, str]] | feeds.JsonValue | None
type TestKwargValue = str | int | None
def encoded_feed_url(url: str) -> str:
@ -348,14 +351,14 @@ def test_blacklist_preview_uses_50_entry_limit() -> None:
def get_feed(self, _feed_url: str) -> DummyFeed:
return self.feed
def get_entries(self, **kwargs: object) -> list[Entry]:
def get_entries(self, **kwargs: TestKwargValue) -> list[Entry]:
limit = kwargs.get("limit")
self.recorded_limit = limit if isinstance(limit, int) else None
if isinstance(limit, int):
return self.entries[:limit]
return self.entries
def get_tag(self, _resource: object, _key: str, default: object = None) -> object:
def get_tag(self, _resource: str | DummyFeed, _key: str, default: TestTagValue = None) -> TestTagValue:
return default
stub_reader = StubReader()
@ -420,10 +423,10 @@ def test_blacklist_preview_shows_labeled_field_values_for_substring_match() -> N
def get_feed(self, _feed_url: str) -> DummyFeed:
return self.feed
def get_entries(self, **_kwargs: object) -> list[Entry]:
def get_entries(self, **_kwargs: TestKwargValue) -> list[Entry]:
return self.entries
def get_tag(self, _resource: object, _key: str, default: object = None) -> object:
def get_tag(self, _resource: str | DummyFeed, _key: str, default: TestTagValue = None) -> TestTagValue:
return default
stub_reader = StubReader()
@ -528,6 +531,103 @@ def test_c3kay_feed_delivery_mode_toggle_routes_update_stored_tags() -> None:
assert reader.get_tag(c3kay_feed_url, "should_send_embed") is True
def test_set_feed_save_sent_webhooks_route_updates_stored_tag() -> None:
@dataclass(slots=True)
class DummyFeed:
url: str
title: str
class StubReader:
def __init__(self) -> None:
self.feed = DummyFeed(url="https://example.com/feed.xml", title="Example")
self.tags: dict[tuple[str, str], bool] = {}
def get_feed(self, feed_url: str) -> DummyFeed:
assert feed_url == self.feed.url
return self.feed
def set_tag(self, resource: str, key: str, value: bool) -> None: # noqa: FBT001
self.tags[resource, key] = value
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
try:
with patch("discord_rss_bot.main.commit_state_change"):
response: Response = client.post(
url="/set_feed_save_sent_webhooks",
data={"feed_url": stub_reader.feed.url, "enabled": "false"},
follow_redirects=False,
)
assert response.status_code == 303, f"/set_feed_save_sent_webhooks failed: {response.text}"
assert stub_reader.tags[stub_reader.feed.url, feeds.SAVE_SENT_WEBHOOKS_TAG] is False
finally:
app.dependency_overrides = {}
def test_sent_webhooks_view_shows_saved_records() -> None:
@dataclass(slots=True)
class DummyFeed:
url: str
title: str
sent_webhook_url = "https://discord.com/api/webhooks/123/abc"
sent_feed_url = "https://example.com/feed.xml"
class StubReader:
def __init__(self) -> None:
self.feed = DummyFeed(url=sent_feed_url, title="Example feed")
def get_tag(
self,
resource: str | tuple[()],
key: str,
default: feeds.JsonValue = None,
) -> feeds.JsonValue:
if resource == () and key == feeds.SENT_WEBHOOKS_TAG:
return [
{
"feed_url": sent_feed_url,
"feed_title": "Example feed",
"entry_id": "entry-1",
"entry_title": "Fixed typo",
"entry_link": "https://example.com/entry-1",
"webhook_url": sent_webhook_url,
"message_id": "message-1",
"delivery_mode": "text",
"payload": {"content": "Fixed typo", "embeds": [], "attachments": []},
"discord_response": {"id": "message-1", "channel_id": "channel-1"},
"response_text": '{"id": "message-1", "channel_id": "channel-1"}',
"last_updated_at": "2026-05-08T12:00:00+00:00",
"last_status_code": 200,
"update_count": 1,
},
]
if resource == () and key == "webhooks":
return [{"name": "Main", "url": sent_webhook_url}]
return default
def get_feeds(self) -> list[DummyFeed]:
return [self.feed]
app.dependency_overrides[get_reader_dependency] = StubReader
try:
response: Response = client.get(url="/sent_webhooks")
assert response.status_code == 200, f"/sent_webhooks failed: {response.text}"
assert "Fixed typo" in response.text
assert "message-1" in response.text
assert "channel-1" in response.text
assert sent_webhook_url not in response.text
assert "HTTP 200" in response.text
assert "Example feed" in response.text
assert "Main" in response.text
finally:
app.dependency_overrides = {}
def test_set_global_screenshot_layout() -> None:
response: Response = client.post(url="/set_global_screenshot_layout", data={"screenshot_layout": "mobile"})
assert response.status_code == 200, f"Failed to set global screenshot layout: {response.text}"
@ -964,6 +1064,35 @@ def test_update_feed_not_found() -> None:
assert "Feed not found" in response.text
def test_update_feed_updates_saved_webhooks_for_modified_entries() -> None:
class StubReader:
pass
stub_reader = StubReader()
modified_entries = [("https://example.com/feed.xml", "entry-1")]
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
try:
with (
patch(
"discord_rss_bot.main.update_feed_and_collect_modified_entries",
return_value=modified_entries,
) as mock_update_feed,
patch("discord_rss_bot.main.update_sent_webhooks_for_modified_entries") as mock_update_webhooks,
):
response: Response = client.get(
url="/update",
params={"feed_url": "https://example.com/feed.xml"},
follow_redirects=False,
)
assert response.status_code == 303, f"Expected redirect after update, got: {response.text}"
mock_update_feed.assert_called_once_with(stub_reader, "https://example.com/feed.xml")
mock_update_webhooks.assert_called_once_with(stub_reader, modified_entries)
finally:
app.dependency_overrides = {}
def test_post_entry_send_to_discord() -> None:
"""Test that /post_entry sends an entry to Discord and redirects to the feed page.
@ -1046,7 +1175,7 @@ def test_post_entry_uses_feed_url_to_disambiguate_duplicate_ids() -> None:
selected_feed_urls: list[str] = []
def fake_send_entry_to_discord(entry: Entry, reader: object) -> None:
def fake_send_entry_to_discord(entry: Entry, reader: Reader) -> None:
selected_feed_urls.append(entry.feed.url)
app.dependency_overrides[get_reader_dependency] = StubReader
@ -1547,7 +1676,12 @@ def test_webhook_entries_sort_newest_and_non_null_published_first() -> None:
]
class StubReader:
def get_tag(self, resource: object, key: str, default: object = None) -> object:
def get_tag(
self,
resource: str | tuple[()] | DummyFeed,
key: str,
default: TestTagValue = None,
) -> TestTagValue:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
@ -1557,12 +1691,12 @@ def test_webhook_entries_sort_newest_and_non_null_published_first() -> None:
def get_feeds(self) -> list[DummyFeed]:
return [dummy_feed]
def get_entries(self, **_kwargs: object) -> list[Entry]:
def get_entries(self, **_kwargs: TestKwargValue) -> list[Entry]:
return unsorted_entries
observed_order: list[str] = []
def capture_entries(*, reader: object, entries: list[Entry], current_feed_url: str = "") -> str:
def capture_entries(*, reader: Reader, entries: list[Entry], current_feed_url: str = "") -> str:
del reader, current_feed_url
observed_order.extend(entry.id for entry in entries)
return ""
@ -1761,7 +1895,12 @@ def test_webhook_entries_mass_update_preview_shows_old_and_new_urls() -> None:
DummyFeed(url="https://unchanged.example.com/rss/c.xml", title="C"),
]
def get_tag(self, resource: object, key: str, default: object = None) -> object:
def get_tag(
self,
resource: str | tuple[()] | DummyFeed,
key: str,
default: TestTagValue = None,
) -> TestTagValue:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
@ -1774,7 +1913,7 @@ def test_webhook_entries_mass_update_preview_shows_old_and_new_urls() -> None:
def get_feeds(self) -> list[DummyFeed]:
return self._feeds
def get_entries(self, **_kwargs: object) -> list[Entry]:
def get_entries(self, **_kwargs: TestKwargValue) -> list[Entry]:
return []
app.dependency_overrides[get_reader_dependency] = StubReader
@ -1827,7 +1966,12 @@ def test_bulk_change_feed_urls_updates_matching_feeds() -> None:
self.change_calls: list[tuple[str, str]] = []
self.updated_feeds: list[str] = []
def get_tag(self, resource: object, key: str, default: object = None) -> object:
def get_tag(
self,
resource: str | tuple[()] | DummyFeed,
key: str,
default: TestTagValue = None,
) -> TestTagValue:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
@ -1843,7 +1987,7 @@ def test_bulk_change_feed_urls_updates_matching_feeds() -> None:
def update_feed(self, feed_url: str) -> None:
self.updated_feeds.append(feed_url)
def get_entries(self, **_kwargs: object) -> list[Entry]:
def get_entries(self, **_kwargs: TestKwargValue) -> list[Entry]:
return []
def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001
@ -1899,7 +2043,7 @@ def test_webhook_entries_mass_update_preview_fragment_endpoint() -> None:
DummyFeed(url="https://old.example.com/rss/b.xml", title="B"),
]
def get_tag(self, resource: object, key: str, default: object = None) -> object:
def get_tag(self, resource: str | DummyFeed, key: str, default: TestTagValue = None) -> TestTagValue:
if key == "webhook" and isinstance(resource, str):
return webhook_url
return default
@ -1947,7 +2091,12 @@ def test_bulk_change_feed_urls_force_update_overwrites_conflict() -> None: # no
self.delete_calls: list[str] = []
self.change_calls: list[tuple[str, str]] = []
def get_tag(self, resource: object, key: str, default: object = None) -> object:
def get_tag(
self,
resource: str | tuple[()] | DummyFeed,
key: str,
default: TestTagValue = None,
) -> TestTagValue:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
@ -1966,7 +2115,7 @@ def test_bulk_change_feed_urls_force_update_overwrites_conflict() -> None: # no
def update_feed(self, _feed_url: str) -> None:
return
def get_entries(self, **_kwargs: object) -> list[Entry]:
def get_entries(self, **_kwargs: TestKwargValue) -> list[Entry]:
return []
def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001
@ -2019,7 +2168,12 @@ def test_bulk_change_feed_urls_force_update_ignores_resolution_error() -> None:
]
self.change_calls: list[tuple[str, str]] = []
def get_tag(self, resource: object, key: str, default: object = None) -> object:
def get_tag(
self,
resource: str | tuple[()] | DummyFeed,
key: str,
default: TestTagValue = None,
) -> TestTagValue:
if resource == () and key == "webhooks":
return [{"name": webhook_name, "url": webhook_url}]
if key == "webhook" and isinstance(resource, str):
@ -2035,7 +2189,7 @@ def test_bulk_change_feed_urls_force_update_ignores_resolution_error() -> None:
def update_feed(self, _feed_url: str) -> None:
return
def get_entries(self, **_kwargs: object) -> list[Entry]:
def get_entries(self, **_kwargs: TestKwargValue) -> list[Entry]:
return []
def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001