From 36d55566fc1c2aa7a2245ddb4b96d76e6d47c5be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Hells=C3=A9n?= Date: Sat, 9 May 2026 04:41:50 +0200 Subject: [PATCH] Edit sent Discord webhooks if entry values updates --- discord_rss_bot/custom_message.py | 7 +- discord_rss_bot/feeds.py | 649 +++++++++++++++++-- discord_rss_bot/git_backup.py | 21 +- discord_rss_bot/hoyolab_api.py | 78 ++- discord_rss_bot/main.py | 138 +++- discord_rss_bot/static/styles.css | 8 + discord_rss_bot/templates/feed.html | 17 + discord_rss_bot/templates/nav.html | 4 + discord_rss_bot/templates/sent_webhooks.html | 97 +++ tests/conftest.py | 29 +- tests/test_feeds.py | 204 ++++++ tests/test_git_backup.py | 14 +- tests/test_main.py | 188 +++++- 13 files changed, 1313 insertions(+), 141 deletions(-) create mode 100644 discord_rss_bot/templates/sent_webhooks.html diff --git a/discord_rss_bot/custom_message.py b/discord_rss_bot/custom_message.py index 1536e64..4c6f62f 100644 --- a/discord_rss_bot/custom_message.py +++ b/discord_rss_bot/custom_message.py @@ -14,6 +14,9 @@ from markdownify import markdownify from discord_rss_bot.is_url_valid import is_url_valid if TYPE_CHECKING: + from collections.abc import Sequence + + from reader import Content from reader import Entry from reader import Feed from reader import Reader @@ -193,7 +196,7 @@ def replace_tags_in_text_message(entry: Entry, reader: Reader) -> str: return custom_message.replace("\\n", "\n") -def get_first_image(summary: str | None, content: str | None) -> str: # noqa: C901 +def get_first_image(summary: str | None, content: str | Sequence[Content] | None) -> str: # noqa: C901 """Get image from summary or content. Args: @@ -204,7 +207,7 @@ def get_first_image(summary: str | None, content: str | None) -> str: # noqa: C The first image """ - def extract_string(data: str | list | tuple | None) -> str | None: + def extract_string(data: str | list | tuple | Sequence[Content] | None) -> str | None: if not data: return None if isinstance(data, str): diff --git a/discord_rss_bot/feeds.py b/discord_rss_bot/feeds.py index adbfcb9..6897087 100644 --- a/discord_rss_bot/feeds.py +++ b/discord_rss_bot/feeds.py @@ -3,18 +3,23 @@ from __future__ import annotations import asyncio import concurrent.futures import datetime +import hashlib import json import logging import os import pprint import re +from collections.abc import Callable +from contextlib import suppress from typing import TYPE_CHECKING -from typing import Any from typing import Literal +from typing import Protocol from typing import cast from urllib.parse import ParseResult +from urllib.parse import parse_qs from urllib.parse import urlparse +import httpx import tldextract from discord_webhook import DiscordEmbed from discord_webhook import DiscordWebhook @@ -32,6 +37,9 @@ from reader import FeedNotFoundError from reader import Reader from reader import ReaderError from reader import StorageError +from reader.types import EntryUpdateStatus +from reader.types import UpdatedFeed +from requests import RequestException from discord_rss_bot.custom_message import CustomEmbed from discord_rss_bot.custom_message import get_custom_message @@ -50,6 +58,7 @@ from discord_rss_bot.settings import get_reader if TYPE_CHECKING: from collections.abc import Iterable + from reader._types import EntryData from requests import Response logger: logging.Logger = logging.getLogger(__name__) @@ -57,9 +66,53 @@ logger: logging.Logger = logging.getLogger(__name__) type DeliveryMode = Literal["embed", "text", "screenshot"] type ScreenshotLayout = Literal["desktop", "mobile"] type ScreenshotFileType = Literal["png", "jpeg"] +type JsonValue = bool | int | float | str | list[JsonValue] | dict[str, JsonValue] | None +type JsonObject = dict[str, JsonValue] +type SentWebhookRecord = dict[str, JsonValue] +type UpdateCallback = Callable[[], UpdatedFeed | None] + + +class JsonResponseLike(Protocol): + """Response interface needed for Discord webhook JSON parsing.""" + + @property + def status_code(self) -> int: + """HTTP status code.""" + ... + + @property + def text(self) -> str: + """Response body decoded as text.""" + ... + + @property + def content(self) -> bytes: + """Raw response body.""" + ... + + def json(self) -> JsonValue: + """Decode response body as JSON. + + Returns: + JsonValue: Decoded JSON response body. + """ + ... + MAX_DISCORD_UPLOAD_BYTES: int = 8 * 1024 * 1024 JPEG_QUALITY_STEPS: tuple[int, ...] = (85, 70, 55, 40) +SENT_WEBHOOKS_TAG: str = "sent_webhooks" +SAVE_SENT_WEBHOOKS_TAG: str = "save_sent_webhooks" +MESSAGE_PAYLOAD_KEYS: tuple[str, ...] = ( + "allowed_mentions", + "attachments", + "avatar_url", + "content", + "embeds", + "flags", + "tts", + "username", +) def extract_domain(url: str) -> str: # noqa: PLR0911 @@ -137,30 +190,12 @@ def send_entry_to_discord(entry: Entry, reader: Reader) -> str | None: delivery_mode, ) - # Hoyolab/c3kay feeds use a custom embed only when embed mode is selected. - if delivery_mode == "embed" and is_c3kay_feed(entry.feed.url): - entry_link: str | None = entry.link - if entry_link: - post_id: str | None = extract_post_id_from_hoyolab_url(entry_link) - if post_id: - post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id) - if post_data: - webhook = create_hoyolab_webhook(webhook_url, entry, post_data) - execute_webhook(webhook, entry, reader=reader) - return None - logger.warning( - "Failed to create Hoyolab webhook for feed %s, falling back to regular processing", - entry.feed.url, - ) - else: - logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url) - - if delivery_mode == "embed": - webhook: DiscordWebhook = create_embed_webhook(webhook_url, entry, reader=reader) - elif delivery_mode == "screenshot": - webhook = create_screenshot_webhook(webhook_url, entry, reader=reader) - else: - webhook = create_text_webhook(webhook_url, entry, reader=reader, use_default_message_on_empty=False) + webhook, _delivery_mode = create_webhook_for_entry( + webhook_url, + entry, + reader, + use_default_message_on_empty=False, + ) execute_webhook(webhook, entry, reader=reader) return None @@ -244,6 +279,502 @@ def get_screenshot_layout(reader: Reader, feed: Feed) -> ScreenshotLayout: return "desktop" +def feed_saves_sent_webhooks(reader: Reader, feed: Feed | str) -> bool: + """Return whether sent Discord webhook messages should be stored for a feed. + + Missing tags default to enabled so existing feeds start tracking editable Discord messages. + """ + feed_url: str = feed.url if isinstance(feed, Feed) else str(feed) + try: + value = cast("JsonValue", reader.get_tag(feed, SAVE_SENT_WEBHOOKS_TAG, True)) + except ReaderError: + logger.exception("Error getting %s tag for feed: %s", SAVE_SENT_WEBHOOKS_TAG, feed_url) + return True + + if isinstance(value, bool): + return value + if isinstance(value, str): + return value.strip().lower() not in {"0", "false", "no", "off", "disabled"} + if value is None: + return True + return bool(value) + + +def get_sent_webhook_records(reader: Reader) -> list[SentWebhookRecord]: + """Get stored sent webhook records from the global reader tag. + + Returns: + list[SentWebhookRecord]: Saved sent webhook records. + """ + raw_records = cast("JsonValue", reader.get_tag((), SENT_WEBHOOKS_TAG, [])) + if not isinstance(raw_records, list): + return [] + + records: list[SentWebhookRecord] = [ + cast("SentWebhookRecord", dict(raw_record)) for raw_record in raw_records if isinstance(raw_record, dict) + ] + return records + + +def save_sent_webhook_records(reader: Reader, records: list[SentWebhookRecord]) -> None: + """Save sent webhook records to the global reader tag.""" + reader.set_tag((), SENT_WEBHOOKS_TAG, records) # pyright: ignore[reportArgumentType] + + +def get_webhook_message_payload(webhook: DiscordWebhook) -> JsonObject: + """Return the Discord message payload used to compare saved messages. + + The discord-webhook object also includes client/runtime fields in `json`; only fields that affect the Discord + message itself are persisted. Empty `content`, `embeds`, and `attachments` are kept so message edits can clear + stale content when a feed changes delivery mode. + + Returns: + JsonObject: Normalized Discord message payload. + """ + raw_payload = cast("JsonValue", webhook.json) + if not isinstance(raw_payload, dict): + return {"content": "", "embeds": [], "attachments": []} + + payload: JsonObject = {} + webhook_payload = cast("JsonObject", raw_payload) + for key in MESSAGE_PAYLOAD_KEYS: + if key in webhook_payload: + payload[key] = webhook_payload[key] + + payload.setdefault("content", "") + payload.setdefault("embeds", []) + payload.setdefault("attachments", []) + return cast("JsonObject", json.loads(json.dumps(payload, default=str))) + + +def hash_webhook_payload(payload: JsonObject) -> str: + """Hash a normalized Discord message payload. + + Returns: + str: SHA-256 hash of the payload. + """ + normalized_payload: str = json.dumps(payload, sort_keys=True, separators=(",", ":"), default=str) + return hashlib.sha256(normalized_payload.encode()).hexdigest() + + +def json_value_to_int(value: JsonValue, default: int = 0) -> int: + """Convert a simple JSON scalar to int. + + Returns: + int: Converted integer, or default when the value is not scalar-convertible. + """ + if isinstance(value, bool): + return int(value) + if isinstance(value, int | float | str): + try: + return int(value) + except ValueError: + return default + return default + + +def get_response_json(response: JsonResponseLike) -> JsonObject: + """Best-effort JSON extraction for requests/httpx response objects. + + Returns: + JsonObject: Decoded JSON object, or an empty dict. + """ + try: + response_json = response.json() + except (AttributeError, TypeError, ValueError): + response_text: str = response.text + if not response_text: + response_content: bytes = response.content + if isinstance(response_content, bytes): + response_text = response_content.decode("utf-8", errors="ignore") + if not response_text: + return {} + try: + response_json = json.loads(response_text) + except json.JSONDecodeError: + return {} + + return cast("JsonObject", dict(response_json)) if isinstance(response_json, dict) else {} + + +def get_discord_message_id_from_response(response_json: JsonObject, webhook: DiscordWebhook) -> str: + """Get the Discord message id from a decoded webhook response. + + Returns: + str: Discord message id, or an empty string. + """ + message_id: JsonValue = response_json.get("id") + if isinstance(message_id, str) and message_id: + return message_id + + webhook_id: str | None = webhook.id if isinstance(webhook.id, str) else None + return webhook_id if isinstance(webhook_id, str) else "" + + +def get_discord_message_id(response: JsonResponseLike, webhook: DiscordWebhook) -> str: + """Get the Discord message id returned by a webhook send/edit response. + + Returns: + str: Discord message id, or an empty string. + """ + return get_discord_message_id_from_response(get_response_json(response), webhook) + + +def get_entry_timestamp(value: datetime.datetime | None) -> str: + """Return an ISO timestamp when an entry datetime-like value is present. + + Returns: + str: ISO timestamp, or an empty string. + """ + if value is not None: + return value.isoformat() + return "" + + +def upsert_sent_webhook_record( + reader: Reader, + entry: Entry, + webhook_url: str, + webhook: DiscordWebhook, + response: JsonResponseLike, + payload: JsonObject, +) -> None: + """Store the Discord message id and rendered payload for a successfully sent entry.""" + if not feed_saves_sent_webhooks(reader, entry.feed): + return + + response_json: JsonObject = get_response_json(response) + message_id: str = get_discord_message_id_from_response(response_json, webhook) + if not message_id: + logger.debug("Discord response did not include a message id for entry %s; not storing webhook.", entry.id) + return + + now: str = datetime.datetime.now(tz=datetime.UTC).isoformat() + payload_hash: str = hash_webhook_payload(payload) + delivery_mode: DeliveryMode = get_entry_delivery_mode(reader, entry) + record: SentWebhookRecord = { + "feed_url": entry.feed.url, + "feed_title": entry.feed.title or "", + "entry_id": entry.id, + "entry_title": entry.title or "", + "entry_link": entry.link or "", + "entry_updated": get_entry_timestamp(entry.updated), + "webhook_url": webhook_url, + "message_id": message_id, + "delivery_mode": delivery_mode, + "payload": payload, + "payload_hash": payload_hash, + "discord_response": response_json, + "response_text": response.text[:5000], + "first_sent_at": now, + "last_sent_at": now, + "last_updated_at": now, + "last_status_code": response.status_code, + "last_error": "", + "update_count": 0, + } + + records: list[SentWebhookRecord] = get_sent_webhook_records(reader) + for index, existing_record in enumerate(records): + if ( + existing_record.get("feed_url") == entry.feed.url + and existing_record.get("entry_id") == entry.id + and existing_record.get("webhook_url") == webhook_url + ): + record["first_sent_at"] = existing_record.get("first_sent_at") or now + record["update_count"] = json_value_to_int(existing_record.get("update_count")) + records[index] = record + save_sent_webhook_records(reader, records) + return + + records.append(record) + save_sent_webhook_records(reader, records) + + +def split_webhook_url_for_message_endpoint(webhook_url: str) -> tuple[str, str | None]: + """Split a webhook URL into the base webhook endpoint and optional thread id. + + Returns: + tuple[str, str | None]: Clean webhook URL and optional thread id. + """ + parsed_url = urlparse(webhook_url) + query = parse_qs(parsed_url.query) + thread_id_values: list[str] = query.get("thread_id", []) + thread_id: str | None = thread_id_values[0].strip() if thread_id_values else None + if not thread_id: + thread_id = None + + clean_url: str = parsed_url._replace(query="", fragment="").geturl().rstrip("/") + return clean_url, thread_id + + +def edit_sent_webhook_message( + webhook_url: str, + message_id: str, + webhook: DiscordWebhook, + payload: JsonObject, +) -> Response | httpx.Response: + """Edit an already-sent Discord webhook message. + + Returns: + Response | httpx.Response: Discord API response. + """ + clean_webhook_url, thread_id = split_webhook_url_for_message_endpoint(webhook_url) + + if getattr(webhook, "files", None): + webhook.url = clean_webhook_url + webhook.id = message_id + if thread_id: + webhook.thread_id = thread_id + return webhook.edit() + + params: dict[str, str] = {"wait": "true"} + if thread_id: + params["thread_id"] = thread_id + + timeout: int | float = cast("int | float", getattr(webhook, "timeout", None) or 30.0) + return httpx.patch( + f"{clean_webhook_url}/messages/{message_id}", + json=payload, + params=params, + timeout=timeout, + ) + + +def create_webhook_for_entry( + webhook_url: str, + entry: Entry, + reader: Reader, + *, + use_default_message_on_empty: bool, +) -> tuple[DiscordWebhook, DeliveryMode]: + """Create the Discord webhook payload for the entry's effective delivery mode. + + Returns: + tuple[DiscordWebhook, DeliveryMode]: Rendered webhook object and delivery mode. + """ + delivery_mode: DeliveryMode = get_entry_delivery_mode(reader, entry) + + if delivery_mode == "embed" and is_c3kay_feed(entry.feed.url): + entry_link: str | None = entry.link + if entry_link: + post_id: str | None = extract_post_id_from_hoyolab_url(entry_link) + if post_id: + post_data = fetch_hoyolab_post(post_id) + if post_data: + return create_hoyolab_webhook(webhook_url, entry, post_data), delivery_mode + logger.warning( + "Failed to create Hoyolab webhook for feed %s, falling back to regular processing", + entry.feed.url, + ) + else: + logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url) + + if delivery_mode == "embed": + return create_embed_webhook(webhook_url, entry, reader=reader), delivery_mode + if delivery_mode == "screenshot": + return create_screenshot_webhook(webhook_url, entry, reader=reader), delivery_mode + return ( + create_text_webhook( + webhook_url, + entry, + reader=reader, + use_default_message_on_empty=use_default_message_on_empty, + ), + delivery_mode, + ) + + +def collect_modified_entries_during_update(reader: Reader, update_callback: UpdateCallback) -> list[tuple[str, str]]: + """Run a reader update call and collect entries whose stored content was modified. + + Returns: + list[tuple[str, str]]: Modified entry `(feed_url, entry_id)` pairs. + """ + modified_entries: list[tuple[str, str]] = [] + hooks = reader.after_entry_update_hooks + + def collect_modified_entry(_reader: Reader, entry: EntryData, status: EntryUpdateStatus) -> None: + status_value: str = getattr(status, "value", str(status)) + if status == EntryUpdateStatus.MODIFIED or status_value == EntryUpdateStatus.MODIFIED.value: + modified_entries.append((entry.feed_url, entry.id)) + + hooks.append(collect_modified_entry) + try: + update_callback() + finally: + with suppress(ValueError): + hooks.remove(collect_modified_entry) + + return list(dict.fromkeys(modified_entries)) + + +def update_feeds_and_collect_modified_entries( + reader: Reader, + *, + scheduled: bool, + workers: int, +) -> list[tuple[str, str]]: + """Update feeds and return reader entries whose stored content was modified. + + Returns: + list[tuple[str, str]]: Modified entry `(feed_url, entry_id)` pairs. + """ + return collect_modified_entries_during_update( + reader, + lambda: reader.update_feeds(scheduled=scheduled, workers=workers), + ) + + +def update_feed_and_collect_modified_entries(reader: Reader, feed: Feed | str) -> list[tuple[str, str]]: + """Update one feed and return reader entries whose stored content was modified. + + Returns: + list[tuple[str, str]]: Modified entry `(feed_url, entry_id)` pairs. + """ + return collect_modified_entries_during_update(reader, lambda: reader.update_feed(feed)) + + +def update_sent_webhook_record_for_entry( + reader: Reader, + entry: Entry, + record: SentWebhookRecord, +) -> tuple[SentWebhookRecord, bool, bool]: + """Edit one saved Discord webhook message record for an updated entry. + + Returns: + tuple[SentWebhookRecord, bool, bool]: Updated record, whether it changed, and whether Discord was edited. + """ + webhook_url_value: JsonValue = record.get("webhook_url") + message_id_value: JsonValue = record.get("message_id") + if ( + not isinstance(webhook_url_value, str) + or not isinstance(message_id_value, str) + or not webhook_url_value + or not message_id_value + ): + return record, False, False + + webhook, delivery_mode = create_webhook_for_entry( + webhook_url_value, + entry, + reader, + use_default_message_on_empty=True, + ) + payload: JsonObject = get_webhook_message_payload(webhook) + payload_hash: str = hash_webhook_payload(payload) + if payload_hash == record.get("payload_hash"): + return record, False, False + + now: str = datetime.datetime.now(tz=datetime.UTC).isoformat() + try: + response = edit_sent_webhook_message(webhook_url_value, message_id_value, webhook, payload) + except (AssertionError, RequestException, httpx.HTTPError, OSError, ValueError) as e: + logger.exception("Failed to edit Discord webhook message %s for entry %s", message_id_value, entry.id) + return ( + { + **record, + "last_update_attempt_at": now, + "last_error": str(e), + }, + True, + False, + ) + + status_code: int = response.status_code + response_json: JsonObject = get_response_json(response) + if status_code in {200, 204}: + return ( + { + **record, + "feed_title": entry.feed.title or "", + "entry_title": entry.title or "", + "entry_link": entry.link or "", + "entry_updated": get_entry_timestamp(entry.updated), + "delivery_mode": delivery_mode, + "payload": payload, + "payload_hash": payload_hash, + "discord_response": response_json, + "response_text": response.text[:5000], + "last_updated_at": now, + "last_status_code": status_code, + "last_error": "", + "update_count": json_value_to_int(record.get("update_count")) + 1, + }, + True, + True, + ) + + return ( + { + **record, + "last_update_attempt_at": now, + "last_status_code": status_code, + "discord_response": response_json, + "response_text": response.text[:5000], + "last_error": response.text[:500], + }, + True, + False, + ) + + +def update_sent_webhooks_for_modified_entries( # noqa: C901 + reader: Reader, + modified_entries: Iterable[tuple[str, str]], +) -> int: + """Edit saved Discord webhook messages for modified reader entries. + + Returns: + int: Number of Discord messages successfully edited. + """ + modified_entry_keys: set[tuple[str, str]] = set(modified_entries) + if not modified_entry_keys: + return 0 + + records: list[SentWebhookRecord] = get_sent_webhook_records(reader) + if not records: + return 0 + + records_changed: bool = False + updated_count: int = 0 + + for feed_url, entry_id in modified_entry_keys: + matching_record_indexes: list[int] = [ + index + for index, record in enumerate(records) + if record.get("feed_url") == feed_url and record.get("entry_id") == entry_id + ] + if not matching_record_indexes: + continue + + try: + entry: Entry = reader.get_entry((feed_url, entry_id)) + except (FeedNotFoundError, EntryNotFoundError): + logger.exception("Saved webhook entry no longer exists: %s %s", feed_url, entry_id) + continue + + if not feed_saves_sent_webhooks(reader, entry.feed): + continue + + for record_index in matching_record_indexes: + updated_record, record_changed, message_was_edited = update_sent_webhook_record_for_entry( + reader, + entry, + records[record_index], + ) + if record_changed: + records[record_index] = updated_record + records_changed = True + if message_was_edited: + updated_count += 1 + + if records_changed: + save_sent_webhook_records(reader, records) + + return updated_count + + def create_text_webhook( webhook_url: str, entry: Entry, @@ -496,7 +1027,7 @@ def send_discord_quest_notification(entry: Entry, webhook_url: str, reader: Read content=quest_url, rate_limit_retry=True, ) - execute_webhook(webhook, entry, reader=reader) + execute_webhook(webhook, entry, reader=reader, save_sent_webhook=False) # Iterate through the content of the entry for content in entry.content: @@ -661,7 +1192,7 @@ def set_entry_as_read(reader: Reader, entry: Entry) -> None: logger.exception("Error setting entry to read: %s", entry.id) -def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: # noqa: C901, PLR0912 +def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: """Send entries to Discord. If response was not ok, we will log the error and mark the entry as unread, so it will be sent again next time. @@ -675,11 +1206,16 @@ def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, d # Get the default reader if we didn't get a custom one. effective_reader: Reader = get_reader() if reader is None else reader - # Check for new entries for every feed. - effective_reader.update_feeds( + # Check for new and modified entries for every feed. + modified_entries: list[tuple[str, str]] = update_feeds_and_collect_modified_entries( + effective_reader, scheduled=True, workers=os.cpu_count() or 1, ) + try: + update_sent_webhooks_for_modified_entries(effective_reader, modified_entries) + except (AssertionError, ReaderError, RequestException, httpx.HTTPError, OSError, ValueError): + logger.exception("Failed to update saved Discord webhooks for modified feed entries.") # Loop through the unread entries. entries: Iterable[Entry] = effective_reader.get_entries(feed=feed, read=False) @@ -695,42 +1231,17 @@ def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, d logger.info("No webhook URL found for feed: %s", entry.feed.url) continue - delivery_mode: DeliveryMode = get_entry_delivery_mode(effective_reader, entry) - - if delivery_mode == "embed": - webhook = create_embed_webhook(webhook_url, entry, reader=effective_reader) - elif delivery_mode == "screenshot": - webhook = create_screenshot_webhook(webhook_url, entry, reader=effective_reader) - else: - webhook = create_text_webhook( - webhook_url, - entry, - reader=effective_reader, - use_default_message_on_empty=True, - ) - decision = get_entry_filter_decision_from_reader(effective_reader, entry) if not decision.should_send: logger.info("Entry was skipped: %s (%s)", entry.id, decision.reason) continue - # Use a custom webhook for Hoyolab feeds. - if is_c3kay_feed(entry.feed.url): - entry_link: str | None = entry.link - if entry_link: - post_id: str | None = extract_post_id_from_hoyolab_url(entry_link) - if post_id: - post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id) - if post_data: - webhook = create_hoyolab_webhook(webhook_url, entry, post_data) - execute_webhook(webhook, entry, reader=effective_reader) - return - logger.warning( - "Failed to create Hoyolab webhook for feed %s, falling back to regular processing", - entry.feed.url, - ) - else: - logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url) + webhook, _delivery_mode = create_webhook_for_entry( + webhook_url, + entry, + effective_reader, + use_default_message_on_empty=True, + ) # Send the entry to Discord because the combined blacklist/whitelist decision allowed it. execute_webhook(webhook, entry, reader=effective_reader) @@ -741,14 +1252,20 @@ def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, d break -def execute_webhook(webhook: DiscordWebhook, entry: Entry, reader: Reader) -> None: +def execute_webhook( + webhook: DiscordWebhook, + entry: Entry, + reader: Reader, + *, + save_sent_webhook: bool = True, +) -> None: """Execute the webhook. Args: webhook (DiscordWebhook): The webhook to execute. entry (Entry): The entry to send to Discord. reader (Reader): The Reader instance to use for checking feed status. - + save_sent_webhook: Whether to save the sent Discord message metadata for future edits. """ # If the feed has been paused or deleted, we will not send the entry to Discord. entry_feed: Feed = entry.feed @@ -762,6 +1279,7 @@ def execute_webhook(webhook: DiscordWebhook, entry: Entry, reader: Reader) -> No logger.warning("Feed not found in reader, not sending entry to Discord: %s", entry_feed.url) return + payload: JsonObject = get_webhook_message_payload(webhook) response: Response = webhook.execute() logger.debug("Discord webhook response for entry %s: status=%s", entry.id, response.status_code) if response.status_code not in {200, 204}: @@ -772,6 +1290,10 @@ def execute_webhook(webhook: DiscordWebhook, entry: Entry, reader: Reader) -> No logger.error(msg) else: logger.info("Sent entry to Discord: %s", entry.id) + if save_sent_webhook: + webhook_url: str = get_webhook_url(reader, entry) + if webhook_url: + upsert_sent_webhook_record(reader, entry, webhook_url, webhook, response, payload) def is_youtube_feed(feed_url: str) -> bool: @@ -868,6 +1390,9 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None: # This is the webhook that will be used to send the feed to Discord. reader.set_tag(clean_feed_url, "webhook", webhook_url) # pyright: ignore[reportArgumentType] + # Store sent Discord message ids by default so modified feed entries can edit the original webhook message. + reader.set_tag(clean_feed_url, SAVE_SENT_WEBHOOKS_TAG, True) # pyright: ignore[reportArgumentType] + # This is the default message that will be sent to Discord. reader.set_tag(clean_feed_url, "custom_message", default_custom_message) # pyright: ignore[reportArgumentType] diff --git a/discord_rss_bot/git_backup.py b/discord_rss_bot/git_backup.py index febc34c..490807d 100644 --- a/discord_rss_bot/git_backup.py +++ b/discord_rss_bot/git_backup.py @@ -28,7 +28,6 @@ import shutil import subprocess # noqa: S404 from pathlib import Path from typing import TYPE_CHECKING -from typing import Any if TYPE_CHECKING: from reader import Reader @@ -37,11 +36,9 @@ logger: logging.Logger = logging.getLogger(__name__) GIT_EXECUTABLE: str = shutil.which("git") or "git" -type TAG_VALUE = ( - dict[str, str | int | float | bool | dict[str, Any] | list[Any] | None] - | list[str | int | float | bool | dict[str, Any] | list[Any] | None] - | None -) +type JsonValue = bool | int | float | str | list[JsonValue] | dict[str, JsonValue] | None +type JsonObject = dict[str, JsonValue] +type TagValue = JsonValue # Tags that are exported per-feed (empty values are omitted). _FEED_TAGS: tuple[str, ...] = ( @@ -164,24 +161,24 @@ def export_state(reader: Reader, backup_path: Path) -> None: reader: The :class:`reader.Reader` instance to read state from. backup_path: Destination directory for the exported ``state.json``. """ - feeds_state: list[dict] = [] + feeds_state: list[JsonObject] = [] for feed in reader.get_feeds(): - feed_data: dict = {"url": feed.url} + feed_data: JsonObject = {"url": feed.url} for tag in _FEED_TAGS: try: - value: TAG_VALUE = reader.get_tag(feed, tag, None) + value: TagValue = reader.get_tag(feed, tag, None) if value is not None and value != "": # noqa: PLC1901 feed_data[tag] = value except Exception: logger.exception("Failed to read tag '%s' for feed '%s' during state export", tag, feed.url) feeds_state.append(feed_data) - webhooks: list[str | int | float | bool | dict[str, Any] | list[Any] | None] = list( + webhooks: list[JsonValue] = list( reader.get_tag((), "webhooks", []), ) # Export global update interval if set - global_update_interval: dict[str, Any] | None = None + global_update_interval: JsonObject | None = None global_update_config = reader.get_tag((), ".reader.update", None) if isinstance(global_update_config, dict): global_update_interval = global_update_config @@ -193,7 +190,7 @@ def export_state(reader: Reader, backup_path: Path) -> None: if clean_layout in {"desktop", "mobile"}: global_screenshot_layout = clean_layout - state: dict = {"feeds": feeds_state, "webhooks": webhooks} + state: JsonObject = {"feeds": feeds_state, "webhooks": webhooks} if global_update_interval is not None: state["global_update_interval"] = global_update_interval if global_screenshot_layout is not None: diff --git a/discord_rss_bot/hoyolab_api.py b/discord_rss_bot/hoyolab_api.py index 227a413..f571046 100644 --- a/discord_rss_bot/hoyolab_api.py +++ b/discord_rss_bot/hoyolab_api.py @@ -5,7 +5,7 @@ import json import logging import re from typing import TYPE_CHECKING -from typing import Any +from typing import cast import requests from discord_webhook import DiscordEmbed @@ -17,6 +17,9 @@ if TYPE_CHECKING: logger: logging.Logger = logging.getLogger(__name__) +type JsonValue = bool | int | float | str | list[JsonValue] | dict[str, JsonValue] | None +type JsonObject = dict[str, JsonValue] + def is_c3kay_feed(feed_url: str) -> bool: """Check if the feed is from c3kay.de. @@ -50,14 +53,23 @@ def extract_post_id_from_hoyolab_url(url: str) -> str | None: return None -def fetch_hoyolab_post(post_id: str) -> dict[str, Any] | None: +def as_json_object(value: JsonValue) -> JsonObject: + """Return value as a JSON object when possible. + + Returns: + JsonObject: The input dict or an empty dict. + """ + return cast("JsonObject", value) if isinstance(value, dict) else {} + + +def fetch_hoyolab_post(post_id: str) -> JsonObject | None: """Fetch post data from the Hoyolab API. Args: post_id: The post ID to fetch. Returns: - dict[str, Any] | None: The post data if successful, None otherwise. + JsonObject | None: The post data if successful, None otherwise. """ if not post_id: return None @@ -68,9 +80,11 @@ def fetch_hoyolab_post(post_id: str) -> dict[str, Any] | None: response: requests.Response = requests.get(url, timeout=10) if response.status_code == http_ok: - data: dict[str, Any] = response.json() - if data.get("retcode") == 0 and "data" in data and "post" in data["data"]: - return data["data"]["post"] + data = cast("JsonObject", response.json()) + data_payload: JsonObject = as_json_object(data.get("data")) + post_payload: JsonObject = as_json_object(data_payload.get("post")) + if data.get("retcode") == 0 and post_payload: + return post_payload logger.warning("Failed to fetch Hoyolab post %s: %s", post_id, response.text) except (requests.RequestException, ValueError): @@ -79,7 +93,7 @@ def fetch_hoyolab_post(post_id: str) -> dict[str, Any] | None: return None -def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: dict[str, Any]) -> DiscordWebhook: # noqa: C901, PLR0912, PLR0914, PLR0915 +def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: JsonObject) -> DiscordWebhook: # noqa: C901, PLR0912, PLR0914, PLR0915 """Create a webhook with data from the Hoyolab API. Args: @@ -94,22 +108,23 @@ def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: dict[str, webhook = DiscordWebhook(url=webhook_url, rate_limit_retry=True) # Extract relevant data from the post - post: dict[str, Any] = post_data.get("post", {}) - subject: str = post.get("subject", "") - content: str = post.get("content", "{}") + post: JsonObject = as_json_object(post_data.get("post")) + subject: str = str(post.get("subject", "")) + content: str = str(post.get("content", "{}")) logger.debug("Post subject: %s", subject) logger.debug("Post content: %s", content) - content_data: dict[str, str] = {} + content_data: JsonObject = {} with contextlib.suppress(json.JSONDecodeError, ValueError): - content_data = json.loads(content) + loaded_content = cast("JsonValue", json.loads(content)) + content_data = as_json_object(loaded_content) logger.debug("Content data: %s", content_data) - description: str = content_data.get("describe", "") + description: str = str(content_data.get("describe", "")) if not description: - description = post.get("desc", "") + description = str(post.get("desc", "")) # Create the embed discord_embed = DiscordEmbed() @@ -119,7 +134,12 @@ def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: dict[str, discord_embed.set_url(entry_link) # Get post.image_list - image_list: list[dict[str, Any]] = post_data.get("image_list", []) + image_list_value: JsonValue = post_data.get("image_list", []) + image_list: list[JsonObject] = ( + [cast("JsonObject", item) for item in image_list_value if isinstance(item, dict)] + if isinstance(image_list_value, list) + else [] + ) if image_list: image_url: str = str(image_list[0].get("url", "")) image_height: int = int(image_list[0].get("height", 1080)) @@ -128,7 +148,7 @@ def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: dict[str, logger.debug("Image URL: %s, Height: %s, Width: %s", image_url, image_height, image_width) discord_embed.set_image(url=image_url, height=image_height, width=image_width) - video: dict[str, str | int | bool] = post_data.get("video", {}) + video: JsonObject = as_json_object(post_data.get("video")) if video and video.get("url"): video_url: str = str(video.get("url", "")) logger.debug("Video URL: %s", video_url) @@ -140,20 +160,20 @@ def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: dict[str, filename=f"{entry.id}.mp4", ) - game = post_data.get("game", {}) + game: JsonObject = as_json_object(post_data.get("game")) if game and game.get("color"): game_color = str(game.get("color", "")) discord_embed.set_color(game_color.removeprefix("#")) - user: dict[str, str | int | bool] = post_data.get("user", {}) + user: JsonObject = as_json_object(post_data.get("user")) author_name: str = str(user.get("nickname", "")) avatar_url: str = str(user.get("avatar_url", "")) if author_name: webhook.avatar_url = avatar_url webhook.username = author_name - classification = post_data.get("classification", {}) + classification: JsonObject = as_json_object(post_data.get("classification")) if classification and classification.get("name"): footer = str(classification.get("name", "")) @@ -162,13 +182,19 @@ def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: dict[str, webhook.add_embed(discord_embed) # Only show Youtube URL if available - structured_content: str = post.get("structured_content", "") + structured_content: str = str(post.get("structured_content", "")) if structured_content: # noqa: PLR1702 try: - structured_content_data: list[dict[str, Any]] = json.loads(structured_content) + loaded_structured_content = cast("JsonValue", json.loads(structured_content)) + structured_content_data: list[JsonObject] = ( + [cast("JsonObject", item) for item in loaded_structured_content if isinstance(item, dict)] + if isinstance(loaded_structured_content, list) + else [] + ) for item in structured_content_data: - if item.get("insert") and isinstance(item["insert"], dict): - video_url: str = str(item["insert"].get("video", "")) + insert: JsonObject = as_json_object(item.get("insert")) + if insert: + video_url: str = str(insert.get("video", "")) if video_url: video_id_match: re.Match[str] | None = re.search(r"embed/([a-zA-Z0-9_-]+)", video_url) if video_id_match: @@ -180,15 +206,15 @@ def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: dict[str, except (json.JSONDecodeError, ValueError) as e: logger.warning("Error parsing structured content: %s", e) - event_start_date: str = post.get("event_start_date", "") + event_start_date: str = str(post.get("event_start_date", "")) if event_start_date and event_start_date != "0": discord_embed.add_embed_field(name="Start", value=f"") - event_end_date: str = post.get("event_end_date", "") + event_end_date: str = str(post.get("event_end_date", "")) if event_end_date and event_end_date != "0": discord_embed.add_embed_field(name="End", value=f"") - created_at: str = post.get("created_at", "") + created_at: str = str(post.get("created_at", "")) if created_at and created_at != "0": discord_embed.set_timestamp(timestamp=created_at) diff --git a/discord_rss_bot/main.py b/discord_rss_bot/main.py index 10f28bd..5ebfbcd 100644 --- a/discord_rss_bot/main.py +++ b/discord_rss_bot/main.py @@ -15,7 +15,7 @@ from html import escape from html import unescape from typing import TYPE_CHECKING from typing import Annotated -from typing import Any +from typing import TypedDict from typing import cast import httpx @@ -51,12 +51,18 @@ from discord_rss_bot.custom_message import get_embed from discord_rss_bot.custom_message import get_first_image from discord_rss_bot.custom_message import replace_tags_in_text_message from discord_rss_bot.custom_message import save_embed +from discord_rss_bot.feeds import SAVE_SENT_WEBHOOKS_TAG +from discord_rss_bot.feeds import SentWebhookRecord from discord_rss_bot.feeds import create_feed from discord_rss_bot.feeds import extract_domain +from discord_rss_bot.feeds import feed_saves_sent_webhooks from discord_rss_bot.feeds import get_feed_delivery_mode from discord_rss_bot.feeds import get_screenshot_layout +from discord_rss_bot.feeds import get_sent_webhook_records from discord_rss_bot.feeds import send_entry_to_discord from discord_rss_bot.feeds import send_to_discord +from discord_rss_bot.feeds import update_feed_and_collect_modified_entries +from discord_rss_bot.feeds import update_sent_webhooks_for_modified_entries from discord_rss_bot.filter.evaluator import FILTER_FIELDS from discord_rss_bot.filter.evaluator import EntryFilterDecision from discord_rss_bot.filter.evaluator import FilterMatch @@ -78,7 +84,41 @@ if TYPE_CHECKING: from reader.types import JSONType -LOGGING_CONFIG: dict[str, Any] = { +class PreviewFieldRow(TypedDict): + label: str + value_html: str + badges: list[dict[str, str]] + + +class FilterPreviewRow(TypedDict): + entry: Entry + decision: EntryFilterDecision + field_rows: list[PreviewFieldRow] + published_label: str + status_label: str + status_class: str + first_image: str + + +class FilterPreviewSummary(TypedDict): + total: int + sent: int + skipped: int + blacklist_matches: int + whitelist_matches: int + + +class FilterPreviewContext(TypedDict): + filter_name: str + filter_label: str + preview_rendered_count: int + preview_rows: list[FilterPreviewRow] + preview_limit: int + preview_summary: FilterPreviewSummary + preview_helper_text: str + + +LOGGING_CONFIG = { "version": 1, "disable_existing_loggers": False, "formatters": { @@ -696,7 +736,7 @@ def build_filter_preview_context( feed: Feed, filter_name: str, form_values: dict[str, str] | None = None, -) -> dict[str, Any]: +) -> FilterPreviewContext: """Build preview data for the blacklist and whitelist pages. Args: @@ -706,7 +746,7 @@ def build_filter_preview_context( form_values: Optional unsaved values from the current form. Returns: - dict[str, Any]: Preview context for template rendering. + FilterPreviewContext: Preview context for template rendering. """ saved_blacklist_values: dict[str, str] = get_filter_values_from_reader(reader, feed, "blacklist") saved_whitelist_values: dict[str, str] = get_filter_values_from_reader(reader, feed, "whitelist") @@ -724,7 +764,7 @@ def build_filter_preview_context( helper_text = "Saved blacklist rules still apply while previewing whitelist changes." preview_entries: list[Entry] = list(reader.get_entries(feed=feed, limit=FILTER_PREVIEW_LIMIT)) - preview_rows: list[dict[str, Any]] = [] + preview_rows: list[FilterPreviewRow] = [] preview_decisions: dict[str, EntryFilterDecision] = {} sent_count = 0 skipped_count = 0 @@ -782,7 +822,7 @@ def build_filter_preview_context( } -def build_preview_field_rows(entry: Entry, decision: EntryFilterDecision) -> list[dict[str, Any]]: +def build_preview_field_rows(entry: Entry, decision: EntryFilterDecision) -> list[PreviewFieldRow]: """Build labeled preview fields for the filter UI. Args: @@ -790,10 +830,10 @@ def build_preview_field_rows(entry: Entry, decision: EntryFilterDecision) -> lis decision: The final decision for the entry. Returns: - list[dict[str, Any]]: Labeled field rows for the preview template. + list[PreviewFieldRow]: Labeled field rows for the preview template. """ entry_fields: dict[str, str] = get_entry_fields(entry) - field_rows: list[dict[str, Any]] = [] + field_rows: list[PreviewFieldRow] = [] for field_name in ("title", "author", "summary", "content"): badges: list[dict[str, str]] = [] @@ -1284,6 +1324,34 @@ async def post_use_screenshot_desktop( return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) +@app.post("/set_feed_save_sent_webhooks") +async def post_set_feed_save_sent_webhooks( + feed_url: Annotated[str, Form()], + enabled: Annotated[str, Form()], + reader: Annotated[Reader, Depends(get_reader_dependency)], +) -> RedirectResponse: + """Set whether a feed stores sent Discord webhook message records. + + Returns: + RedirectResponse: Redirect to the specified feed page. + + Raises: + HTTPException: If Feed does not exists. + """ + clean_feed_url: str = feed_url.strip() + should_save: bool = enabled.strip().lower() in {"1", "true", "yes", "on", "enabled"} + + try: + reader.get_feed(clean_feed_url) + except FeedNotFoundError as e: + raise HTTPException(status_code=404, detail="Feed not found") from e + + reader.set_tag(clean_feed_url, SAVE_SENT_WEBHOOKS_TAG, should_save) # pyright: ignore[reportArgumentType] + action: str = "Enable" if should_save else "Disable" + commit_state_change(reader, f"{action} sent webhook storage for {clean_feed_url}") + return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) + + @app.post("/set_update_interval") async def post_set_update_interval( feed_url: Annotated[str, Form()], @@ -1600,6 +1668,7 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 "webhooks": webhooks, "current_webhook_url": current_webhook_url, "current_webhook_name": current_webhook_name, + "save_sent_webhooks": feed_saves_sent_webhooks(reader, feed), } return templates.TemplateResponse(request=request, name="feed.html", context=context) @@ -1659,6 +1728,7 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 "webhooks": webhooks, "current_webhook_url": current_webhook_url, "current_webhook_name": current_webhook_name, + "save_sent_webhooks": feed_saves_sent_webhooks(reader, feed), } return templates.TemplateResponse(request=request, name="feed.html", context=context) @@ -1926,6 +1996,50 @@ async def get_webhooks( return templates.TemplateResponse(request=request, name="webhooks.html", context=context) +@app.get("/sent_webhooks", response_class=HTMLResponse) +async def get_sent_webhooks( + request: Request, + reader: Annotated[Reader, Depends(get_reader_dependency)], + feed_url: str = "", + webhook_url: str = "", +) -> HTMLResponse: + """View sent Discord webhook messages saved for future edits. + + Returns: + sent_webhooks.html HTML + """ + clean_feed_url: str = urllib.parse.unquote(feed_url.strip()) + clean_webhook_url: str = urllib.parse.unquote(webhook_url.strip()) + + records: list[SentWebhookRecord] = get_sent_webhook_records(reader) + if clean_feed_url: + records = [record for record in records if record.get("feed_url") == clean_feed_url] + if clean_webhook_url: + records = [record for record in records if record.get("webhook_url") == clean_webhook_url] + + records.sort( + key=lambda record: str(record.get("last_updated_at") or record.get("last_sent_at") or ""), + reverse=True, + ) + + webhooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(reader.get_tag((), "webhooks", []))) + webhook_names: dict[str, str] = { + hook.get("url", ""): hook.get("name", "") for hook in webhooks if isinstance(hook, dict) + } + feed_titles: dict[str, str] = {feed.url: (feed.title or feed.url) for feed in reader.get_feeds()} + + context = { + "request": request, + "records": records, + "total_records": len(records), + "feed_url": clean_feed_url, + "webhook_url": clean_webhook_url, + "webhook_names": webhook_names, + "feed_titles": feed_titles, + } + return templates.TemplateResponse(request=request, name="sent_webhooks.html", context=context) + + @app.get("/", response_class=HTMLResponse) def get_index( request: Request, @@ -2040,10 +2154,16 @@ async def update_feed( HTTPException: If the feed is not found. """ try: - reader.update_feed(urllib.parse.unquote(feed_url)) + clean_feed_url: str = urllib.parse.unquote(feed_url) + modified_entries: list[tuple[str, str]] = update_feed_and_collect_modified_entries(reader, clean_feed_url) except FeedNotFoundError as e: raise HTTPException(status_code=404, detail="Feed not found") from e + try: + update_sent_webhooks_for_modified_entries(reader, modified_entries) + except (AssertionError, ReaderError, httpx.HTTPError, OSError, ValueError): + logger.exception("Failed to update saved Discord webhooks for manually updated feed: %s", feed_url) + logger.info("Manually updated feed: %s", feed_url) return RedirectResponse(url="/feed?feed_url=" + urllib.parse.quote(feed_url), status_code=303) diff --git a/discord_rss_bot/static/styles.css b/discord_rss_bot/static/styles.css index 451cc9d..65567b3 100644 --- a/discord_rss_bot/static/styles.css +++ b/discord_rss_bot/static/styles.css @@ -65,6 +65,14 @@ body { word-break: break-word; } +.sent-webhooks__entry, +.sent-webhooks__preview { + min-width: 14rem; + max-width: 28rem; + overflow-wrap: anywhere; + word-break: break-word; +} + .filter-page__sidebar { height: 100%; } diff --git a/discord_rss_bot/templates/feed.html b/discord_rss_bot/templates/feed.html index d04a662..ec15805 100644 --- a/discord_rss_bot/templates/feed.html +++ b/discord_rss_bot/templates/feed.html @@ -214,6 +214,23 @@ {% else %}

Add a webhook first to attach this feed.

{% endif %} +
+ + Sent webhook tracking: + {{ 'Enabled' if save_sent_webhooks else 'Disabled' }} + +
+ + + +
+ View sent webhooks +

Feed Information

diff --git a/discord_rss_bot/templates/nav.html b/discord_rss_bot/templates/nav.html index 2286744..1fb1728 100644 --- a/discord_rss_bot/templates/nav.html +++ b/discord_rss_bot/templates/nav.html @@ -22,6 +22,10 @@ Webhooks + + diff --git a/discord_rss_bot/templates/sent_webhooks.html b/discord_rss_bot/templates/sent_webhooks.html new file mode 100644 index 0000000..e5597d4 --- /dev/null +++ b/discord_rss_bot/templates/sent_webhooks.html @@ -0,0 +1,97 @@ +{% extends "base.html" %} +{% block title %} + Sent Webhooks | discord-rss-bot +{% endblock title %} +{% block description %} + Review Discord webhook messages saved for future RSS entry edits. +{% endblock description %} +{% block content %} +
+
+

Sent webhooks

+

+ {{ total_records }} saved message{{ '' if total_records == 1 else 's' }} + {% if feed_url %}for {{ feed_titles.get(feed_url, feed_url) }}{% endif %} + {% if webhook_url %}via {{ webhook_names.get(webhook_url) or 'selected webhook' }}{% endif %} +

+
+ All webhooks +
+ {% if records %} +
+ + + + + + + + + + + + + {% for record in records %} + + + + + + + + + {% endfor %} + +
EntryWebhookDiscord responseModeUpdatedPreview
+ + {{ feed_titles.get(record.feed_url, record.feed_title or record.feed_url) }} + +
+ {% if record.entry_link %} + {{ record.entry_title or record.entry_id }} + {% else %} + {{ record.entry_title or record.entry_id }} + {% endif %} +
+ {{ record.entry_id }} +
+ {{ webhook_names.get(record.webhook_url) or 'Stored webhook' }} + +
+ HTTP {{ record.last_status_code or 'unknown' }} +
+
+ Message: + {{ record.message_id }} +
+ {% if record.discord_response %} +
+ Response JSON +
{{ record.discord_response|tojson(indent=2) }}
+
+ {% elif record.response_text %} +
{{ record.response_text }}
+ {% else %} +
No saved response body
+ {% endif %} + {% if record.last_error %}
{{ record.last_error }}
{% endif %} +
+ {{ record.delivery_mode or 'unknown' }} + {% if record.update_count %} + {{ record.update_count }} edit{{ '' if record.update_count == 1 else 's' }} + {% endif %} + {{ record.last_updated_at or record.last_sent_at or 'Never' }} + {% if record.payload and record.payload.content %} +
{{ record.payload.content }}
+ {% elif record.payload and record.payload.embeds %} + {{ record.payload.embeds|length }} embed{{ '' if record.payload.embeds|length == 1 else 's' }} + {% else %} + No text payload + {% endif %} +
+
+ {% else %} + + {% endif %} +{% endblock content %} diff --git a/tests/conftest.py b/tests/conftest.py index 4aa791d..8bd820e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,14 +8,27 @@ import warnings from contextlib import suppress from pathlib import Path from typing import TYPE_CHECKING -from typing import Any +from typing import Protocol +from typing import cast from bs4 import MarkupResemblesLocatorWarning if TYPE_CHECKING: + from types import ModuleType + import pytest +class CachedReaderFactory(Protocol): + """Reader factory with lru_cache controls used by tests.""" + + def __call__(self) -> None: + """Create or return the cached reader.""" + + def cache_clear(self) -> None: + """Clear the cached reader.""" + + def pytest_addoption(parser: pytest.Parser) -> None: """Register custom command-line options for optional integration tests.""" parser.addoption( @@ -44,21 +57,23 @@ def pytest_sessionstart(session: pytest.Session) -> None: # If modules were imported before this hook (unlikely), force them to use # the worker-specific location. - settings_module: Any = sys.modules.get("discord_rss_bot.settings") + settings_module: ModuleType | None = sys.modules.get("discord_rss_bot.settings") if settings_module is not None: settings_module.data_dir = str(worker_data_dir) - get_reader: Any = getattr(settings_module, "get_reader", None) - if get_reader is not None and hasattr(get_reader, "cache_clear"): + get_reader_attr = getattr(settings_module, "get_reader", None) + if get_reader_attr is not None and hasattr(get_reader_attr, "cache_clear"): + get_reader = cast("CachedReaderFactory", get_reader_attr) get_reader.cache_clear() - main_module: Any = sys.modules.get("discord_rss_bot.main") + main_module: ModuleType | None = sys.modules.get("discord_rss_bot.main") if main_module is not None and settings_module is not None: with suppress(Exception): current_reader = getattr(main_module, "reader", None) if current_reader is not None: current_reader.close() - get_reader: Any = getattr(settings_module, "get_reader", None) - if callable(get_reader): + get_reader_attr = getattr(settings_module, "get_reader", None) + if callable(get_reader_attr): + get_reader = cast("CachedReaderFactory", get_reader_attr) get_reader() diff --git a/tests/test_feeds.py b/tests/test_feeds.py index 93c5ccc..2d26900 100644 --- a/tests/test_feeds.py +++ b/tests/test_feeds.py @@ -3,6 +3,8 @@ from __future__ import annotations import asyncio import os import tempfile +from datetime import UTC +from datetime import datetime from pathlib import Path from typing import LiteralString from unittest.mock import MagicMock @@ -17,6 +19,7 @@ from reader import StorageError from reader import make_reader from discord_rss_bot import feeds +from discord_rss_bot.feeds import JsonObject from discord_rss_bot.feeds import capture_full_page_screenshot from discord_rss_bot.feeds import create_feed from discord_rss_bot.feeds import create_screenshot_webhook @@ -354,6 +357,19 @@ def test_create_feed_inherits_global_text_delivery_mode() -> None: reader.set_tag.assert_any_call("https://example.com/feed.xml", "should_send_embed", False) +def test_create_feed_enables_sent_webhook_tracking_by_default() -> None: + reader = MagicMock() + reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005 + "webhooks": [{"name": "Main", "url": "https://discord.com/api/webhooks/123/abc"}], + "screenshot_layout": "desktop", + "delivery_mode": "embed", + }.get(key, default) + + create_feed(reader, "https://example.com/feed.xml", "Main") + + reader.set_tag.assert_any_call("https://example.com/feed.xml", feeds.SAVE_SENT_WEBHOOKS_TAG, True) + + def test_create_feed_falls_back_to_embed_when_global_delivery_mode_is_invalid() -> None: reader = MagicMock() reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005 @@ -882,3 +898,191 @@ def test_execute_webhook_logs_info_on_success(mock_logger: MagicMock) -> None: execute_webhook(webhook, entry, reader) mock_logger.info.assert_called_once_with("Sent entry to Discord: %s", "entry-9") + + +def test_execute_webhook_records_sent_webhook_message() -> None: + webhook_url = "https://discord.com/api/webhooks/123/abc" + state: dict[str, feeds.JsonValue] = {} + + def get_tag(_resource: str | tuple[()], key: str, default: feeds.JsonValue = None) -> feeds.JsonValue: + if key == feeds.SENT_WEBHOOKS_TAG: + return state.get(feeds.SENT_WEBHOOKS_TAG, default) + if key == feeds.SAVE_SENT_WEBHOOKS_TAG: + return True + if key == "webhook": + return webhook_url + if key == "delivery_mode": + return "text" + return default + + def set_tag(_resource: str | tuple[()], key: str, value: feeds.JsonValue) -> None: + state[key] = value + + reader = MagicMock() + reader.get_tag.side_effect = get_tag + reader.set_tag.side_effect = set_tag + + entry = MagicMock() + entry.id = "entry-1" + entry.title = "Entry title" + entry.link = "https://example.com/entry-1" + entry.updated = datetime(2026, 5, 8, tzinfo=UTC) + entry.feed_url = "https://example.com/feed.xml" + entry.feed.url = "https://example.com/feed.xml" + entry.feed.title = "Example feed" + entry.feed.updates_enabled = True + + webhook = MagicMock() + webhook.json = {"content": "Entry title", "embeds": [], "attachments": []} + response = MagicMock() + response.status_code = 200 + response.text = '{"id": "message-1"}' + response.json.return_value = {"id": "message-1"} + webhook.execute.return_value = response + + execute_webhook(webhook, entry, reader) + + records = state[feeds.SENT_WEBHOOKS_TAG] + assert isinstance(records, list) + assert len(records) == 1 + assert isinstance(records[0], dict) + assert records[0]["feed_url"] == "https://example.com/feed.xml" + assert records[0]["entry_id"] == "entry-1" + assert records[0]["webhook_url"] == webhook_url + assert records[0]["message_id"] == "message-1" + assert records[0]["last_status_code"] == 200 + assert records[0]["discord_response"] == {"id": "message-1"} + assert records[0]["response_text"] == '{"id": "message-1"}' + + assert isinstance(records[0]["payload"], dict) + assert records[0]["payload"]["content"] == "Entry title" + + +def test_execute_webhook_does_not_record_when_feed_tracking_disabled() -> None: + webhook_url = "https://discord.com/api/webhooks/123/abc" + reader = MagicMock() + reader.get_tag.side_effect = lambda _resource, key, default=None: { + feeds.SAVE_SENT_WEBHOOKS_TAG: False, + "webhook": webhook_url, + }.get(key, default) + + entry = MagicMock() + entry.id = "entry-2" + entry.feed_url = "https://example.com/feed.xml" + entry.feed.url = "https://example.com/feed.xml" + entry.feed.updates_enabled = True + + webhook = MagicMock() + webhook.json = {"content": "Entry title", "embeds": [], "attachments": []} + response = MagicMock() + response.status_code = 200 + response.text = '{"id": "message-2"}' + response.json.return_value = {"id": "message-2"} + webhook.execute.return_value = response + + execute_webhook(webhook, entry, reader) + + reader.set_tag.assert_not_called() + + +@patch("discord_rss_bot.feeds.edit_sent_webhook_message") +@patch("discord_rss_bot.feeds.create_webhook_for_entry") +def test_update_sent_webhooks_for_modified_entries_edits_changed_payload( + mock_create_webhook_for_entry: MagicMock, + mock_edit_sent_webhook_message: MagicMock, +) -> None: + webhook_url = "https://discord.com/api/webhooks/123/abc" + old_payload: JsonObject = {"content": "Old title", "embeds": [], "attachments": []} + state: dict[str, feeds.JsonValue] = { + feeds.SENT_WEBHOOKS_TAG: [ + { + "feed_url": "https://example.com/feed.xml", + "entry_id": "entry-3", + "webhook_url": webhook_url, + "message_id": "message-3", + "payload": old_payload, + "payload_hash": feeds.hash_webhook_payload(old_payload), + "update_count": 0, + }, # pyright: ignore[reportAssignmentType, reportArgumentType] + ], + } + + def get_tag(_resource: str | tuple[()], key: str, default: feeds.JsonValue = None) -> feeds.JsonValue: + if key == feeds.SENT_WEBHOOKS_TAG: + return state[feeds.SENT_WEBHOOKS_TAG] + if key == feeds.SAVE_SENT_WEBHOOKS_TAG: + return True + return default + + def set_tag(_resource: str | tuple[()], key: str, value: feeds.JsonValue) -> None: + state[key] = value + + entry = MagicMock() + entry.id = "entry-3" + entry.title = "New title" + entry.link = "https://example.com/entry-3" + entry.updated = datetime(2026, 5, 8, tzinfo=UTC) + entry.feed.url = "https://example.com/feed.xml" + entry.feed.title = "Example feed" + + reader = MagicMock() + reader.get_tag.side_effect = get_tag + reader.set_tag.side_effect = set_tag + reader.get_entry.return_value = entry + + webhook = MagicMock() + webhook.json = {"content": "New title", "embeds": [], "attachments": []} + mock_create_webhook_for_entry.return_value = (webhook, "text") + + response = MagicMock() + response.status_code = 200 + response.text = '{"id": "message-3"}' + response.json.return_value = {"id": "message-3"} + mock_edit_sent_webhook_message.return_value = response + + updated_count = feeds.update_sent_webhooks_for_modified_entries( + reader, + [("https://example.com/feed.xml", "entry-3")], + ) + + assert updated_count == 1 + mock_edit_sent_webhook_message.assert_called_once() + records = state[feeds.SENT_WEBHOOKS_TAG] + assert isinstance(records, list) + assert isinstance(records[0], dict) + assert isinstance(records[0]["payload"], dict) + assert records[0]["payload"]["content"] == "New title" + assert records[0]["discord_response"] == {"id": "message-3"} + assert records[0]["response_text"] == '{"id": "message-3"}' + assert records[0]["update_count"] == 1 + assert not records[0]["last_error"] + + +def test_update_feeds_and_collect_modified_entries_only_returns_modified_entries() -> None: + class StubReader: + def __init__(self) -> None: + self.after_entry_update_hooks = [] + + def update_feeds(self, *, scheduled: bool, workers: int) -> None: + assert scheduled is True + assert workers == 1 + new_entry = MagicMock() + new_entry.feed_url = "https://example.com/feed.xml" + new_entry.id = "new" + modified_entry = MagicMock() + modified_entry.feed_url = "https://example.com/feed.xml" + modified_entry.id = "modified" + for hook in list(self.after_entry_update_hooks): + hook(self, new_entry, feeds.EntryUpdateStatus.NEW) + hook(self, modified_entry, feeds.EntryUpdateStatus.MODIFIED) + + reader = StubReader() + + modified_entries: list[tuple[str, str]] = feeds.update_feeds_and_collect_modified_entries( + reader, # pyright: ignore[reportArgumentType] + scheduled=True, + workers=1, + ) + + assert modified_entries == [("https://example.com/feed.xml", "modified")] + assert reader.after_entry_update_hooks == [] diff --git a/tests/test_git_backup.py b/tests/test_git_backup.py index bfdf4a2..ec6b2a0 100644 --- a/tests/test_git_backup.py +++ b/tests/test_git_backup.py @@ -6,13 +6,15 @@ import shutil import subprocess # noqa: S404 from pathlib import Path from typing import TYPE_CHECKING -from typing import Any +from typing import cast from unittest.mock import MagicMock from unittest.mock import patch import pytest from fastapi.testclient import TestClient +from discord_rss_bot.git_backup import JsonObject +from discord_rss_bot.git_backup import JsonValue from discord_rss_bot.git_backup import commit_state_change from discord_rss_bot.git_backup import export_state from discord_rss_bot.git_backup import get_backup_path @@ -172,7 +174,7 @@ def test_export_state_creates_state_json(tmp_path: Path) -> None: feed_or_key: tuple | str, tag: str | None = None, default: str | None = None, - ) -> list[Any] | str | None: + ) -> list[JsonValue] | str | None: if feed_or_key == () and tag is None: # Called for global webhooks list return [] @@ -191,7 +193,7 @@ def test_export_state_creates_state_json(tmp_path: Path) -> None: state_file: Path = backup_path / "state.json" assert state_file.exists(), "state.json should be created by export_state" - data: dict[str, Any] = json.loads(state_file.read_text(encoding="utf-8")) + data = cast("JsonObject", json.loads(state_file.read_text(encoding="utf-8"))) assert "feeds" in data assert "webhooks" in data assert data["feeds"][0]["url"] == "https://example.com/feed.rss" @@ -209,7 +211,7 @@ def test_export_state_omits_empty_tags(tmp_path: Path) -> None: feed_or_key: tuple | str, tag: str | None = None, default: str | None = None, - ) -> list[Any] | str | None: + ) -> list[JsonValue] | str | None: if feed_or_key == (): return [] @@ -222,7 +224,7 @@ def test_export_state_omits_empty_tags(tmp_path: Path) -> None: backup_path.mkdir() export_state(mock_reader, backup_path) - data: dict[str, Any] = json.loads((backup_path / "state.json").read_text()) + data = cast("JsonObject", json.loads((backup_path / "state.json").read_text())) # Only "url" key should be present (no empty-value tags) assert list(data["feeds"][0].keys()) == ["url"] @@ -570,7 +572,7 @@ def test_embed_backup_end_to_end(monkeypatch: pytest.MonkeyPatch, tmp_path: Path # Verify state.json contains embed data state_file: Path = backup_path / "state.json" assert state_file.exists(), "state.json should exist in backup repo" - state_data: dict[str, Any] = json.loads(state_file.read_text(encoding="utf-8")) + state_data = cast("JsonObject", json.loads(state_file.read_text(encoding="utf-8"))) # Find our test feed in the state test_feed_data = next((feed for feed in state_data["feeds"] if feed["url"] == test_feed_url), None) diff --git a/tests/test_main.py b/tests/test_main.py index 4b43a0f..347e9df 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -15,6 +15,7 @@ from unittest.mock import patch from fastapi.testclient import TestClient import discord_rss_bot.main as main_module +from discord_rss_bot import feeds from discord_rss_bot.main import app from discord_rss_bot.main import create_html_for_feed from discord_rss_bot.main import get_reader_dependency @@ -31,6 +32,8 @@ client: TestClient = TestClient(app) webhook_name: str = "Hello, I am a webhook!" webhook_url: str = "https://discord.com/api/webhooks/1234567890/abcdefghijklmnopqrstuvwxyz" feed_url: str = "https://lovinator.space/rss_test.xml" +type TestTagValue = str | bool | int | list[dict[str, str]] | feeds.JsonValue | None +type TestKwargValue = str | int | None def encoded_feed_url(url: str) -> str: @@ -348,14 +351,14 @@ def test_blacklist_preview_uses_50_entry_limit() -> None: def get_feed(self, _feed_url: str) -> DummyFeed: return self.feed - def get_entries(self, **kwargs: object) -> list[Entry]: + def get_entries(self, **kwargs: TestKwargValue) -> list[Entry]: limit = kwargs.get("limit") self.recorded_limit = limit if isinstance(limit, int) else None if isinstance(limit, int): return self.entries[:limit] return self.entries - def get_tag(self, _resource: object, _key: str, default: object = None) -> object: + def get_tag(self, _resource: str | DummyFeed, _key: str, default: TestTagValue = None) -> TestTagValue: return default stub_reader = StubReader() @@ -420,10 +423,10 @@ def test_blacklist_preview_shows_labeled_field_values_for_substring_match() -> N def get_feed(self, _feed_url: str) -> DummyFeed: return self.feed - def get_entries(self, **_kwargs: object) -> list[Entry]: + def get_entries(self, **_kwargs: TestKwargValue) -> list[Entry]: return self.entries - def get_tag(self, _resource: object, _key: str, default: object = None) -> object: + def get_tag(self, _resource: str | DummyFeed, _key: str, default: TestTagValue = None) -> TestTagValue: return default stub_reader = StubReader() @@ -528,6 +531,103 @@ def test_c3kay_feed_delivery_mode_toggle_routes_update_stored_tags() -> None: assert reader.get_tag(c3kay_feed_url, "should_send_embed") is True +def test_set_feed_save_sent_webhooks_route_updates_stored_tag() -> None: + @dataclass(slots=True) + class DummyFeed: + url: str + title: str + + class StubReader: + def __init__(self) -> None: + self.feed = DummyFeed(url="https://example.com/feed.xml", title="Example") + self.tags: dict[tuple[str, str], bool] = {} + + def get_feed(self, feed_url: str) -> DummyFeed: + assert feed_url == self.feed.url + return self.feed + + def set_tag(self, resource: str, key: str, value: bool) -> None: # noqa: FBT001 + self.tags[resource, key] = value + + stub_reader = StubReader() + app.dependency_overrides[get_reader_dependency] = lambda: stub_reader + + try: + with patch("discord_rss_bot.main.commit_state_change"): + response: Response = client.post( + url="/set_feed_save_sent_webhooks", + data={"feed_url": stub_reader.feed.url, "enabled": "false"}, + follow_redirects=False, + ) + + assert response.status_code == 303, f"/set_feed_save_sent_webhooks failed: {response.text}" + assert stub_reader.tags[stub_reader.feed.url, feeds.SAVE_SENT_WEBHOOKS_TAG] is False + finally: + app.dependency_overrides = {} + + +def test_sent_webhooks_view_shows_saved_records() -> None: + @dataclass(slots=True) + class DummyFeed: + url: str + title: str + + sent_webhook_url = "https://discord.com/api/webhooks/123/abc" + sent_feed_url = "https://example.com/feed.xml" + + class StubReader: + def __init__(self) -> None: + self.feed = DummyFeed(url=sent_feed_url, title="Example feed") + + def get_tag( + self, + resource: str | tuple[()], + key: str, + default: feeds.JsonValue = None, + ) -> feeds.JsonValue: + if resource == () and key == feeds.SENT_WEBHOOKS_TAG: + return [ + { + "feed_url": sent_feed_url, + "feed_title": "Example feed", + "entry_id": "entry-1", + "entry_title": "Fixed typo", + "entry_link": "https://example.com/entry-1", + "webhook_url": sent_webhook_url, + "message_id": "message-1", + "delivery_mode": "text", + "payload": {"content": "Fixed typo", "embeds": [], "attachments": []}, + "discord_response": {"id": "message-1", "channel_id": "channel-1"}, + "response_text": '{"id": "message-1", "channel_id": "channel-1"}', + "last_updated_at": "2026-05-08T12:00:00+00:00", + "last_status_code": 200, + "update_count": 1, + }, + ] + if resource == () and key == "webhooks": + return [{"name": "Main", "url": sent_webhook_url}] + return default + + def get_feeds(self) -> list[DummyFeed]: + return [self.feed] + + app.dependency_overrides[get_reader_dependency] = StubReader + + try: + response: Response = client.get(url="/sent_webhooks") + + assert response.status_code == 200, f"/sent_webhooks failed: {response.text}" + assert "Fixed typo" in response.text + assert "message-1" in response.text + assert "channel-1" in response.text + assert sent_webhook_url not in response.text + assert "HTTP 200" in response.text + assert "Example feed" in response.text + assert "Main" in response.text + finally: + app.dependency_overrides = {} + + def test_set_global_screenshot_layout() -> None: response: Response = client.post(url="/set_global_screenshot_layout", data={"screenshot_layout": "mobile"}) assert response.status_code == 200, f"Failed to set global screenshot layout: {response.text}" @@ -964,6 +1064,35 @@ def test_update_feed_not_found() -> None: assert "Feed not found" in response.text +def test_update_feed_updates_saved_webhooks_for_modified_entries() -> None: + class StubReader: + pass + + stub_reader = StubReader() + modified_entries = [("https://example.com/feed.xml", "entry-1")] + app.dependency_overrides[get_reader_dependency] = lambda: stub_reader + + try: + with ( + patch( + "discord_rss_bot.main.update_feed_and_collect_modified_entries", + return_value=modified_entries, + ) as mock_update_feed, + patch("discord_rss_bot.main.update_sent_webhooks_for_modified_entries") as mock_update_webhooks, + ): + response: Response = client.get( + url="/update", + params={"feed_url": "https://example.com/feed.xml"}, + follow_redirects=False, + ) + + assert response.status_code == 303, f"Expected redirect after update, got: {response.text}" + mock_update_feed.assert_called_once_with(stub_reader, "https://example.com/feed.xml") + mock_update_webhooks.assert_called_once_with(stub_reader, modified_entries) + finally: + app.dependency_overrides = {} + + def test_post_entry_send_to_discord() -> None: """Test that /post_entry sends an entry to Discord and redirects to the feed page. @@ -1046,7 +1175,7 @@ def test_post_entry_uses_feed_url_to_disambiguate_duplicate_ids() -> None: selected_feed_urls: list[str] = [] - def fake_send_entry_to_discord(entry: Entry, reader: object) -> None: + def fake_send_entry_to_discord(entry: Entry, reader: Reader) -> None: selected_feed_urls.append(entry.feed.url) app.dependency_overrides[get_reader_dependency] = StubReader @@ -1547,7 +1676,12 @@ def test_webhook_entries_sort_newest_and_non_null_published_first() -> None: ] class StubReader: - def get_tag(self, resource: object, key: str, default: object = None) -> object: + def get_tag( + self, + resource: str | tuple[()] | DummyFeed, + key: str, + default: TestTagValue = None, + ) -> TestTagValue: if resource == () and key == "webhooks": return [{"name": webhook_name, "url": webhook_url}] if key == "webhook" and isinstance(resource, str): @@ -1557,12 +1691,12 @@ def test_webhook_entries_sort_newest_and_non_null_published_first() -> None: def get_feeds(self) -> list[DummyFeed]: return [dummy_feed] - def get_entries(self, **_kwargs: object) -> list[Entry]: + def get_entries(self, **_kwargs: TestKwargValue) -> list[Entry]: return unsorted_entries observed_order: list[str] = [] - def capture_entries(*, reader: object, entries: list[Entry], current_feed_url: str = "") -> str: + def capture_entries(*, reader: Reader, entries: list[Entry], current_feed_url: str = "") -> str: del reader, current_feed_url observed_order.extend(entry.id for entry in entries) return "" @@ -1761,7 +1895,12 @@ def test_webhook_entries_mass_update_preview_shows_old_and_new_urls() -> None: DummyFeed(url="https://unchanged.example.com/rss/c.xml", title="C"), ] - def get_tag(self, resource: object, key: str, default: object = None) -> object: + def get_tag( + self, + resource: str | tuple[()] | DummyFeed, + key: str, + default: TestTagValue = None, + ) -> TestTagValue: if resource == () and key == "webhooks": return [{"name": webhook_name, "url": webhook_url}] if key == "webhook" and isinstance(resource, str): @@ -1774,7 +1913,7 @@ def test_webhook_entries_mass_update_preview_shows_old_and_new_urls() -> None: def get_feeds(self) -> list[DummyFeed]: return self._feeds - def get_entries(self, **_kwargs: object) -> list[Entry]: + def get_entries(self, **_kwargs: TestKwargValue) -> list[Entry]: return [] app.dependency_overrides[get_reader_dependency] = StubReader @@ -1827,7 +1966,12 @@ def test_bulk_change_feed_urls_updates_matching_feeds() -> None: self.change_calls: list[tuple[str, str]] = [] self.updated_feeds: list[str] = [] - def get_tag(self, resource: object, key: str, default: object = None) -> object: + def get_tag( + self, + resource: str | tuple[()] | DummyFeed, + key: str, + default: TestTagValue = None, + ) -> TestTagValue: if resource == () and key == "webhooks": return [{"name": webhook_name, "url": webhook_url}] if key == "webhook" and isinstance(resource, str): @@ -1843,7 +1987,7 @@ def test_bulk_change_feed_urls_updates_matching_feeds() -> None: def update_feed(self, feed_url: str) -> None: self.updated_feeds.append(feed_url) - def get_entries(self, **_kwargs: object) -> list[Entry]: + def get_entries(self, **_kwargs: TestKwargValue) -> list[Entry]: return [] def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001 @@ -1899,7 +2043,7 @@ def test_webhook_entries_mass_update_preview_fragment_endpoint() -> None: DummyFeed(url="https://old.example.com/rss/b.xml", title="B"), ] - def get_tag(self, resource: object, key: str, default: object = None) -> object: + def get_tag(self, resource: str | DummyFeed, key: str, default: TestTagValue = None) -> TestTagValue: if key == "webhook" and isinstance(resource, str): return webhook_url return default @@ -1947,7 +2091,12 @@ def test_bulk_change_feed_urls_force_update_overwrites_conflict() -> None: # no self.delete_calls: list[str] = [] self.change_calls: list[tuple[str, str]] = [] - def get_tag(self, resource: object, key: str, default: object = None) -> object: + def get_tag( + self, + resource: str | tuple[()] | DummyFeed, + key: str, + default: TestTagValue = None, + ) -> TestTagValue: if resource == () and key == "webhooks": return [{"name": webhook_name, "url": webhook_url}] if key == "webhook" and isinstance(resource, str): @@ -1966,7 +2115,7 @@ def test_bulk_change_feed_urls_force_update_overwrites_conflict() -> None: # no def update_feed(self, _feed_url: str) -> None: return - def get_entries(self, **_kwargs: object) -> list[Entry]: + def get_entries(self, **_kwargs: TestKwargValue) -> list[Entry]: return [] def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001 @@ -2019,7 +2168,12 @@ def test_bulk_change_feed_urls_force_update_ignores_resolution_error() -> None: ] self.change_calls: list[tuple[str, str]] = [] - def get_tag(self, resource: object, key: str, default: object = None) -> object: + def get_tag( + self, + resource: str | tuple[()] | DummyFeed, + key: str, + default: TestTagValue = None, + ) -> TestTagValue: if resource == () and key == "webhooks": return [{"name": webhook_name, "url": webhook_url}] if key == "webhook" and isinstance(resource, str): @@ -2035,7 +2189,7 @@ def test_bulk_change_feed_urls_force_update_ignores_resolution_error() -> None: def update_feed(self, _feed_url: str) -> None: return - def get_entries(self, **_kwargs: object) -> list[Entry]: + def get_entries(self, **_kwargs: TestKwargValue) -> list[Entry]: return [] def set_entry_read(self, _entry: Entry, _value: bool) -> None: # noqa: FBT001