Compare commits

...

2 commits

Author SHA1 Message Date
3346994763
Allow images to be between 0..10
All checks were successful
Test and build Docker image / docker (push) Successful in 1m54s
2026-05-12 20:31:12 +02:00
a0c186559f
Use Discord webhook components to send 10 images 2026-05-12 06:09:21 +02:00
13 changed files with 1417 additions and 135 deletions

11
.vscode/settings.json vendored
View file

@ -1,22 +1,31 @@
{ {
"cSpell.words": [ "cSpell.words": [
"argnames",
"argvalues",
"autoexport", "autoexport",
"autoplay",
"botuser", "botuser",
"DISCORDTIMESTAMPPLACEHOLDER",
"domcontentloaded", "domcontentloaded",
"Genshins", "Genshins",
"healthcheck", "healthcheck",
"Hoyolab", "Hoyolab",
"HTMX",
"KHTML", "KHTML",
"levelname", "levelname",
"Lovinator", "Lovinator",
"markdownified", "markdownified",
"markdownify", "markdownify",
"networkidle", "networkidle",
"overwritable",
"pipx", "pipx",
"pyproject", "pyproject",
"Skulbladi",
"thead", "thead",
"thelovinator", "thelovinator",
"uvicorn" "ttvdrops",
"uvicorn",
"youtu"
], ],
"python.analysis.typeCheckingMode": "basic" "python.analysis.typeCheckingMode": "basic"
} }

View file

@ -196,39 +196,52 @@ def replace_tags_in_text_message(entry: Entry, reader: Reader) -> str:
return custom_message.replace("\\n", "\n") return custom_message.replace("\\n", "\n")
def get_first_image(summary: str | None, content: str | Sequence[Content] | None) -> str: # noqa: C901 def _extract_entry_text(data: str | list | tuple | Sequence[Content] | None) -> str | None:
"""Get image from summary or content. """Extract text from a reader summary/content value.
Returns:
Extracted text, or None when the input is empty.
"""
if not data:
return None
if isinstance(data, str):
return data
if isinstance(data, (list, tuple)):
extracted: list[str] = []
for item in data:
if hasattr(item, "value"):
extracted.append(item.value)
elif isinstance(item, dict) and "value" in item:
extracted.append(item.get("value", ""))
else:
extracted.append(str(item))
return "".join(extracted)
return str(data)
def get_image_urls(
summary: str | None,
content: str | Sequence[Content] | None,
*,
limit: int | None = None,
) -> list[str]:
"""Get valid image URLs from content, then summary.
Args: Args:
summary: The summary from the entry (string, or tuple/list of objects) summary: The summary from the entry (string, or tuple/list of objects)
content: The content from the entry (string, or tuple/list of objects) content: The content from the entry (string, or tuple/list of objects)
limit: Optional maximum number of URLs to return.
Returns: Returns:
The first image Valid, de-duplicated image URLs.
""" """
image_urls: list[str] = []
seen_urls: set[str] = set()
def extract_string(data: str | list | tuple | Sequence[Content] | None) -> str | None: def add_images_from_text(text: str | None) -> None:
if not data: if not text:
return None return
if isinstance(data, str): images = BeautifulSoup(text, features="lxml").find_all("img")
return data
if isinstance(data, (list, tuple)):
extracted: list[str] = []
for item in data:
if hasattr(item, "value"):
extracted.append(item.value)
elif isinstance(item, dict) and "value" in item:
extracted.append(item.get("value", ""))
else:
extracted.append(str(item))
return "".join(extracted)
return str(data)
# Convert potentially complex objects into strings
content_str: str | None = extract_string(content)
summary_str: str | None = extract_string(summary)
if content_str and (images := BeautifulSoup(content_str, features="lxml").find_all("img")):
for image in images: for image in images:
if not isinstance(image, Tag) or "src" not in image.attrs: if not isinstance(image, Tag) or "src" not in image.attrs:
logger.error("Image is not a Tag or does not have a src attribute.") logger.error("Image is not a Tag or does not have a src attribute.")
@ -239,21 +252,29 @@ def get_first_image(summary: str | None, content: str | Sequence[Content] | None
logger.warning("Invalid URL: %s", src) logger.warning("Invalid URL: %s", src)
continue continue
return src if src in seen_urls:
if summary_str and (images := BeautifulSoup(summary_str, features="lxml").find_all("img")):
for image in images:
if not isinstance(image, Tag) or "src" not in image.attrs:
logger.error("Image is not a Tag or does not have a src attribute.")
continue continue
if not is_url_valid(str(image.attrs["src"])): image_urls.append(src)
logger.warning("Invalid URL: %s", image.attrs["src"]) seen_urls.add(src)
continue if limit is not None and len(image_urls) >= limit:
return
return str(image.attrs["src"]) add_images_from_text(_extract_entry_text(content))
if limit is None or len(image_urls) < limit:
add_images_from_text(_extract_entry_text(summary))
return "" return image_urls
def get_first_image(summary: str | None, content: str | Sequence[Content] | None) -> str:
"""Get the first image from summary or content.
Returns:
First valid image URL, or an empty string.
"""
image_urls: list[str] = get_image_urls(summary, content, limit=1)
return image_urls[0] if image_urls else ""
def replace_tags_in_embed(feed: Feed, entry: Entry, reader: Reader) -> CustomEmbed: def replace_tags_in_embed(feed: Feed, entry: Entry, reader: Reader) -> CustomEmbed:

View file

@ -9,20 +9,21 @@ import logging
import os import os
import pprint import pprint
import re import re
import time
from collections.abc import Callable from collections.abc import Callable
from contextlib import suppress from contextlib import suppress
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Any
from typing import Literal from typing import Literal
from typing import Protocol from typing import Protocol
from typing import cast from typing import cast
from urllib.parse import ParseResult from urllib.parse import ParseResult
from urllib.parse import parse_qs from urllib.parse import parse_qs
from urllib.parse import urljoin
from urllib.parse import urlparse from urllib.parse import urlparse
import httpx import httpx
import tldextract import tldextract
from discord_webhook import DiscordEmbed
from discord_webhook import DiscordWebhook
from fastapi import HTTPException from fastapi import HTTPException
from markdownify import markdownify from markdownify import markdownify
from playwright.sync_api import Browser from playwright.sync_api import Browser
@ -43,6 +44,7 @@ from requests import RequestException
from discord_rss_bot.custom_message import CustomEmbed from discord_rss_bot.custom_message import CustomEmbed
from discord_rss_bot.custom_message import get_custom_message from discord_rss_bot.custom_message import get_custom_message
from discord_rss_bot.custom_message import get_image_urls
from discord_rss_bot.custom_message import replace_tags_in_embed from discord_rss_bot.custom_message import replace_tags_in_embed
from discord_rss_bot.custom_message import replace_tags_in_text_message from discord_rss_bot.custom_message import replace_tags_in_text_message
from discord_rss_bot.filter.evaluator import get_entry_filter_decision_from_reader from discord_rss_bot.filter.evaluator import get_entry_filter_decision_from_reader
@ -54,12 +56,15 @@ from discord_rss_bot.is_url_valid import is_url_valid
from discord_rss_bot.settings import default_custom_embed from discord_rss_bot.settings import default_custom_embed
from discord_rss_bot.settings import default_custom_message from discord_rss_bot.settings import default_custom_message
from discord_rss_bot.settings import get_reader from discord_rss_bot.settings import get_reader
from discord_rss_bot.webhook import DiscordEmbed
from discord_rss_bot.webhook import DiscordWebhook
from discord_rss_bot.webhook import WebhookFile
if TYPE_CHECKING: if TYPE_CHECKING:
from collections.abc import Iterable from collections.abc import Iterable
from reader._types import EntryData from reader._types import EntryData
from requests import Response from reader.types import JSONType
logger: logging.Logger = logging.getLogger(__name__) logger: logging.Logger = logging.getLogger(__name__)
@ -99,16 +104,17 @@ class JsonResponseLike(Protocol):
... ...
MAX_DISCORD_UPLOAD_BYTES: int = 8 * 1024 * 1024
SENT_WEBHOOKS_TAG: str = "sent_webhooks"
SAVE_SENT_WEBHOOKS_TAG: str = "save_sent_webhooks"
MESSAGE_PAYLOAD_KEYS: tuple[str, ...] = ( MESSAGE_PAYLOAD_KEYS: tuple[str, ...] = (
"allowed_mentions", "allowed_mentions",
"applied_tags",
"attachments", "attachments",
"avatar_url", "avatar_url",
"components",
"content", "content",
"embeds", "embeds",
"flags", "flags",
"poll",
"thread_name",
"tts", "tts",
"username", "username",
) )
@ -278,6 +284,41 @@ def get_screenshot_layout(reader: Reader, feed: Feed) -> ScreenshotLayout:
return "desktop" return "desktop"
def coerce_media_gallery_image_limit(value: JsonValue) -> int: # noqa: PLR0911
"""Return the supported media gallery image limit for a stored tag value."""
if isinstance(value, bool):
return 1
if isinstance(value, int):
return min(max(value, 0), 10)
if isinstance(value, str) and value.strip().lower() in {"1", "first", "first_image", "first-only"}:
return 1
if isinstance(value, str) and value.strip().lower() in {"0", "none", "no_images", "off", "disabled"}:
return 0
if isinstance(value, str):
try:
parsed_value: int = int(value.strip())
except ValueError:
return 1
return min(max(parsed_value, 0), 10)
return 1
def get_feed_media_gallery_image_limit(reader: Reader, feed: Feed | str) -> int:
"""Resolve how many feed images should be sent in Discord media galleries.
Returns:
The configured image limit, normalized to a supported Discord gallery size.
"""
feed_url: str = str(getattr(feed, "url", feed))
try:
value = cast("JsonValue", reader.get_tag(feed, "media_gallery_image_limit", 1))
except ReaderError:
logger.exception("Error getting %s tag for feed: %s", "media_gallery_image_limit", feed_url)
return 1
return coerce_media_gallery_image_limit(value)
def feed_saves_sent_webhooks(reader: Reader, feed: Feed | str) -> bool: def feed_saves_sent_webhooks(reader: Reader, feed: Feed | str) -> bool:
"""Return whether sent Discord webhook messages should be stored for a feed. """Return whether sent Discord webhook messages should be stored for a feed.
@ -285,9 +326,9 @@ def feed_saves_sent_webhooks(reader: Reader, feed: Feed | str) -> bool:
""" """
feed_url: str = feed.url if isinstance(feed, Feed) else str(feed) feed_url: str = feed.url if isinstance(feed, Feed) else str(feed)
try: try:
value = cast("JsonValue", reader.get_tag(feed, SAVE_SENT_WEBHOOKS_TAG, True)) value = cast("JsonValue", reader.get_tag(feed, "save_sent_webhooks", True))
except ReaderError: except ReaderError:
logger.exception("Error getting %s tag for feed: %s", SAVE_SENT_WEBHOOKS_TAG, feed_url) logger.exception("Error getting %s tag for feed: %s", "save_sent_webhooks", feed_url)
return True return True
if isinstance(value, bool): if isinstance(value, bool):
@ -305,7 +346,7 @@ def get_sent_webhook_records(reader: Reader) -> list[SentWebhookRecord]:
Returns: Returns:
list[SentWebhookRecord]: Saved sent webhook records. list[SentWebhookRecord]: Saved sent webhook records.
""" """
raw_records = cast("JsonValue", reader.get_tag((), SENT_WEBHOOKS_TAG, [])) raw_records = cast("JsonValue", reader.get_tag((), "sent_webhooks", []))
if not isinstance(raw_records, list): if not isinstance(raw_records, list):
return [] return []
@ -317,22 +358,22 @@ def get_sent_webhook_records(reader: Reader) -> list[SentWebhookRecord]:
def save_sent_webhook_records(reader: Reader, records: list[SentWebhookRecord]) -> None: def save_sent_webhook_records(reader: Reader, records: list[SentWebhookRecord]) -> None:
"""Save sent webhook records to the global reader tag.""" """Save sent webhook records to the global reader tag."""
reader.set_tag((), SENT_WEBHOOKS_TAG, records) # pyright: ignore[reportArgumentType] reader.set_tag((), "sent_webhooks", records) # pyright: ignore[reportArgumentType]
def get_webhook_message_payload(webhook: DiscordWebhook) -> JsonObject: def get_webhook_request_payload(webhook: DiscordWebhook) -> JsonObject:
"""Return the Discord message payload used to compare saved messages. """Return the Discord message payload sent to Discord.
The discord-webhook object also includes client/runtime fields in `json`; only fields that affect the Discord Runtime fields on the webhook object are intentionally excluded. Unlike
message itself are persisted. Empty `content`, `embeds`, and `attachments` are kept so message edits can clear `get_webhook_message_payload`, this does not add empty defaults because
stale content when a feed changes delivery mode. Components V2 messages reject otherwise-empty `content` and `embeds` fields.
Returns: Returns:
JsonObject: Normalized Discord message payload. JsonObject: Discord request payload.
""" """
raw_payload = cast("JsonValue", webhook.json) raw_payload = cast("JsonValue", webhook.json)
if not isinstance(raw_payload, dict): if not isinstance(raw_payload, dict):
return {"content": "", "embeds": [], "attachments": []} return {}
payload: JsonObject = {} payload: JsonObject = {}
webhook_payload = cast("JsonObject", raw_payload) webhook_payload = cast("JsonObject", raw_payload)
@ -340,6 +381,19 @@ def get_webhook_message_payload(webhook: DiscordWebhook) -> JsonObject:
if key in webhook_payload: if key in webhook_payload:
payload[key] = webhook_payload[key] payload[key] = webhook_payload[key]
return cast("JsonObject", json.loads(json.dumps(payload, default=str)))
def get_webhook_message_payload(webhook: DiscordWebhook) -> JsonObject:
"""Return the normalized Discord message payload used to compare saved messages.
Empty `content`, `embeds`, and `attachments` are kept here so message edits can clear stale content when a feed
changes delivery mode. Use `get_webhook_request_payload` for the payload sent to Discord.
Returns:
JsonObject: Normalized Discord message payload.
"""
payload: JsonObject = get_webhook_request_payload(webhook)
payload.setdefault("content", "") payload.setdefault("content", "")
payload.setdefault("embeds", []) payload.setdefault("embeds", [])
payload.setdefault("attachments", []) payload.setdefault("attachments", [])
@ -416,6 +470,13 @@ def get_webhook_message_edit_payload(payload: JsonObject, record: SentWebhookRec
if edit_payload.get("attachments") == [] and not previous_attachments: if edit_payload.get("attachments") == [] and not previous_attachments:
edit_payload.pop("attachments", None) edit_payload.pop("attachments", None)
if json_value_to_int(edit_payload.get("flags")) & 1 << 15:
edit_payload.pop("content", None)
edit_payload.pop("embeds", None)
edit_payload.pop("poll", None)
if edit_payload.get("attachments") == []:
edit_payload.pop("attachments", None)
return edit_payload return edit_payload
@ -469,7 +530,8 @@ def get_discord_message_id_from_response(response_json: JsonObject, webhook: Dis
if isinstance(message_id, str) and message_id: if isinstance(message_id, str) and message_id:
return message_id return message_id
webhook_id: str | None = webhook.id if isinstance(webhook.id, str) else None raw_webhook_id = getattr(webhook, "id", None)
webhook_id: str | None = raw_webhook_id if isinstance(raw_webhook_id, str) else None
return webhook_id if isinstance(webhook_id, str) else "" return webhook_id if isinstance(webhook_id, str) else ""
@ -570,36 +632,166 @@ def split_webhook_url_for_message_endpoint(webhook_url: str) -> tuple[str, str |
return clean_url, thread_id return clean_url, thread_id
def payload_has_components(payload: JsonObject) -> bool:
"""Return whether a Discord payload includes message components."""
components: JsonValue = payload.get("components")
return isinstance(components, list) and bool(components)
def get_webhook_query_params(
webhook_url: str,
payload: JsonObject,
*,
webhook: DiscordWebhook | None = None,
wait: bool = True,
) -> tuple[str, dict[str, str]]:
"""Return a clean webhook URL and query params for a Discord webhook request."""
clean_webhook_url, thread_id = split_webhook_url_for_message_endpoint(webhook_url)
webhook_thread_id = getattr(webhook, "thread_id", None) if webhook is not None else None
if isinstance(webhook_thread_id, str) and webhook_thread_id.strip():
thread_id = webhook_thread_id.strip()
params: dict[str, str] = {}
if wait:
params["wait"] = "true"
if thread_id:
params["thread_id"] = thread_id
if payload_has_components(payload):
params["with_components"] = "true"
return clean_webhook_url, params
def get_webhook_files(webhook: DiscordWebhook) -> list[WebhookFile]: # noqa: C901
"""Return files attached to a webhook object in a normalized shape."""
raw_files = getattr(webhook, "files", None)
files: list[WebhookFile] = []
if isinstance(raw_files, dict):
for filename, content in raw_files.items():
if isinstance(filename, str) and isinstance(content, bytes):
files.append(WebhookFile(filename=filename, content=content))
return files
if not isinstance(raw_files, list | tuple):
return []
for index, file_value in enumerate(raw_files):
if isinstance(file_value, WebhookFile):
files.append(file_value)
continue
if not isinstance(file_value, tuple) or len(file_value) < 2: # noqa: PLR2004
continue
first, second = file_value[0], file_value[1]
if isinstance(first, str) and isinstance(second, bytes):
files.append(WebhookFile(filename=first, content=second))
continue
if isinstance(second, tuple) and len(second) >= 2: # noqa: PLR2004
nested_file = cast("tuple[object, ...]", second)
nested_filename, nested_content = nested_file[0], nested_file[1]
if isinstance(nested_filename, str) and isinstance(nested_content, bytes):
files.append(WebhookFile(filename=nested_filename, content=nested_content))
continue
if isinstance(second, bytes):
files.append(WebhookFile(filename=f"file-{index}", content=second))
return files
def get_retry_after_seconds(response: httpx.Response) -> float | None:
"""Return Discord's retry delay for a rate-limited response when available."""
response_json: JsonObject = get_response_json(response)
retry_after: JsonValue = response_json.get("retry_after")
if isinstance(retry_after, int | float | str):
with suppress(TypeError, ValueError):
return float(retry_after)
retry_after_header: str | None = response.headers.get("retry-after")
if retry_after_header:
with suppress(TypeError, ValueError):
return float(retry_after_header)
return None
def request_discord_webhook(
method: str,
url: str,
*,
payload: JsonObject,
params: dict[str, str],
files: list[WebhookFile] | None,
timeout: float,
rate_limit_retry: bool,
) -> httpx.Response:
"""Send a Discord webhook request with optional multipart files.
Returns:
Discord API response.
"""
request_kwargs: dict[str, Any] = {"params": params, "timeout": timeout}
if files:
request_kwargs["data"] = {"payload_json": json.dumps(payload, default=str)}
request_kwargs["files"] = [
(f"files[{index}]", (file.filename, file.content)) for index, file in enumerate(files)
]
else:
request_kwargs["json"] = payload
response: httpx.Response = httpx.request(method, url, **request_kwargs)
if not rate_limit_retry or response.status_code != 429: # noqa: PLR2004
return response
retry_after: float | None = get_retry_after_seconds(response)
if retry_after is None:
return response
time.sleep(max(0.0, retry_after))
return httpx.request(method, url, **request_kwargs)
def send_webhook_message(webhook: DiscordWebhook, payload: JsonObject) -> httpx.Response:
"""Execute a Discord webhook message create request using httpx.
Returns:
Discord API response.
"""
clean_webhook_url, params = get_webhook_query_params(webhook.url, payload, webhook=webhook, wait=True)
return request_discord_webhook(
"POST",
clean_webhook_url,
payload=payload,
params=params,
files=get_webhook_files(webhook),
timeout=cast("int | float", getattr(webhook, "timeout", None) or 30.0),
rate_limit_retry=bool(getattr(webhook, "rate_limit_retry", False)),
)
def edit_sent_webhook_message( def edit_sent_webhook_message(
webhook_url: str, webhook_url: str,
message_id: str, message_id: str,
webhook: DiscordWebhook, webhook: DiscordWebhook,
payload: JsonObject, payload: JsonObject,
) -> Response | httpx.Response: ) -> httpx.Response:
"""Edit an already-sent Discord webhook message. """Edit an already-sent Discord webhook message.
Returns: Returns:
Response | httpx.Response: Discord API response. httpx.Response: Discord API response.
""" """
clean_webhook_url, thread_id = split_webhook_url_for_message_endpoint(webhook_url) clean_webhook_url, params = get_webhook_query_params(webhook_url, payload, webhook=webhook, wait=True)
return request_discord_webhook(
if getattr(webhook, "files", None): "PATCH",
webhook.url = clean_webhook_url
webhook.id = message_id
if thread_id:
webhook.thread_id = thread_id
return webhook.edit()
params: dict[str, str] = {"wait": "true"}
if thread_id:
params["thread_id"] = thread_id
timeout: int | float = cast("int | float", getattr(webhook, "timeout", None) or 30.0)
return httpx.patch(
f"{clean_webhook_url}/messages/{message_id}", f"{clean_webhook_url}/messages/{message_id}",
json=payload, payload=payload,
params=params, params=params,
timeout=timeout, files=get_webhook_files(webhook),
timeout=cast("int | float", getattr(webhook, "timeout", None) or 30.0),
rate_limit_retry=bool(getattr(webhook, "rate_limit_retry", False)),
) )
@ -908,7 +1100,7 @@ def create_screenshot_webhook(webhook_url: str, entry: Entry, reader: Reader) ->
) )
screenshot_extension: str = "png" screenshot_extension: str = "png"
if screenshot_bytes and len(screenshot_bytes) > MAX_DISCORD_UPLOAD_BYTES: if screenshot_bytes and len(screenshot_bytes) > 8 * 1024 * 1024:
logger.info( logger.info(
"Screenshot for entry %s is too large as PNG (%d bytes). Trying JPEG compression.", "Screenshot for entry %s is too large as PNG (%d bytes). Trying JPEG compression.",
entry.id, entry.id,
@ -934,7 +1126,7 @@ def create_screenshot_webhook(webhook_url: str, entry: Entry, reader: Reader) ->
screenshot_bytes = jpeg_bytes screenshot_bytes = jpeg_bytes
screenshot_extension = "jpg" screenshot_extension = "jpg"
if len(screenshot_bytes) <= MAX_DISCORD_UPLOAD_BYTES: if len(screenshot_bytes) <= 8 * 1024 * 1024:
break break
if screenshot_bytes is None: if screenshot_bytes is None:
@ -945,7 +1137,7 @@ def create_screenshot_webhook(webhook_url: str, entry: Entry, reader: Reader) ->
) )
return create_text_webhook(webhook_url, entry, reader=reader, use_default_message_on_empty=True) return create_text_webhook(webhook_url, entry, reader=reader, use_default_message_on_empty=True)
if len(screenshot_bytes) > MAX_DISCORD_UPLOAD_BYTES: if len(screenshot_bytes) > 8 * 1024 * 1024:
logger.warning( logger.warning(
"Screenshot for entry %s is still too large after compression (%d bytes). Falling back to text message.", "Screenshot for entry %s is still too large after compression (%d bytes). Falling back to text message.",
entry.id, entry.id,
@ -1163,7 +1355,266 @@ def set_title(custom_embed: CustomEmbed, discord_embed: DiscordEmbed) -> None:
discord_embed.set_title(embed_title) if embed_title else None discord_embed.set_title(embed_title) if embed_title else None
def create_embed_webhook( # noqa: C901 def add_unique_media_gallery_item(
media_items: list[JsonObject],
image_url: str,
*,
description: str,
limit: int = 10,
) -> None:
"""Append a valid media gallery item while preserving order and uniqueness."""
clean_image_url: str = image_url.strip()
if (
len(media_items) >= limit
or not clean_image_url
or any(item.get("url") == clean_image_url for item in media_items)
):
return
if not is_url_valid(clean_image_url):
logger.warning("Invalid media gallery URL: %s", clean_image_url)
return
media_items.append({"url": clean_image_url, "description": description[:1024]})
def normalize_ttvdrops_media_url(image_url: str) -> str:
"""Return an absolute ttvdrops media URL."""
clean_image_url: str = image_url.strip()
if not clean_image_url:
return ""
return urljoin("https://ttvdrops.lovinator.space/", clean_image_url)
def get_ttvdrops_campaign_api_url(entry: Entry) -> str:
"""Return the ttvdrops campaign API URL for an entry when it can be inferred."""
candidate_urls: tuple[str | None, ...] = (
entry.link,
entry.id,
entry.feed.url,
)
for candidate_url in candidate_urls:
if not candidate_url:
continue
parsed_url = urlparse(str(candidate_url))
if parsed_url.netloc.lower() != "ttvdrops.lovinator.space":
continue
if re.fullmatch(r"/twitch/api/v1/campaigns/[^/]+/?", parsed_url.path):
return parsed_url._replace(query="", fragment="").geturl()
campaign_match = re.fullmatch(r"/twitch/campaigns/([^/]+)/?", parsed_url.path)
if campaign_match:
campaign_id: str = campaign_match.group(1)
return parsed_url._replace(
path=f"/twitch/api/v1/campaigns/{campaign_id}/",
query="",
fragment="",
).geturl()
return ""
def get_ttvdrops_reward_description(drop: JsonObject, reward: JsonObject) -> str:
"""Return alt text for a ttvdrops reward image.
Returns:
Reward alt text suitable for a Media Gallery description.
"""
reward_name: str = str(reward.get("name") or drop.get("name") or "Reward")
required_minutes: int = json_value_to_int(drop.get("required_minutes_watched"))
required_subs: int = json_value_to_int(drop.get("required_subs"))
if required_minutes:
return f"{required_minutes} minutes watched: {reward_name}"
if required_subs:
return f"{required_subs} subscriptions: {reward_name}"
return reward_name
def extract_ttvdrops_media_gallery_items(value: JsonValue) -> list[JsonObject]: # noqa: C901
"""Extract benefit/reward media gallery items from a ttvdrops API response.
Returns:
Media Gallery items with absolute URLs and reward descriptions.
"""
media_items: list[JsonObject] = []
def add_reward_image(drop: JsonObject, reward: JsonObject) -> None:
image_url = reward.get("image_url")
if isinstance(image_url, str):
add_unique_media_gallery_item(
media_items,
normalize_ttvdrops_media_url(image_url),
description=get_ttvdrops_reward_description(drop, reward),
)
def collect_benefit_images(current_value: JsonValue) -> None:
if isinstance(current_value, dict):
for key, child_value in current_value.items():
if key in {"benefits", "rewards"} and isinstance(child_value, list):
for item in child_value:
if isinstance(item, dict):
add_reward_image(cast("JsonObject", current_value), cast("JsonObject", item))
collect_benefit_images(item)
continue
collect_benefit_images(child_value)
return
if isinstance(current_value, list):
for item in current_value:
collect_benefit_images(item)
collect_benefit_images(value)
return media_items
def fetch_ttvdrops_campaign_media_items(entry: Entry) -> list[JsonObject]:
"""Fetch extra campaign media gallery items for ttvdrops entries.
Returns:
Media Gallery items for ttvdrops rewards, or an empty list.
"""
api_url: str = get_ttvdrops_campaign_api_url(entry)
if not api_url:
return []
try:
response: httpx.Response = httpx.get(api_url, follow_redirects=True, timeout=10.0)
if response.status_code != 200: # noqa: PLR2004
logger.warning("Failed to fetch ttvdrops campaign data from %s: %s", api_url, response.text[:500])
return []
response_json = cast("JsonValue", response.json())
except (httpx.HTTPError, ValueError, TypeError):
logger.exception("Failed to fetch ttvdrops campaign data from %s", api_url)
return []
return extract_ttvdrops_media_gallery_items(response_json)
def get_entry_media_gallery_items(
entry: Entry,
custom_embed: CustomEmbed,
*,
image_limit: int = 10,
) -> list[JsonObject]:
"""Return items for a Discord Media Gallery component.
Returns:
Media Gallery items capped to Discord's item limit.
"""
image_limit = coerce_media_gallery_image_limit(image_limit)
if image_limit == 0:
return []
media_items: list[JsonObject] = []
ttvdrops_media_items: list[JsonObject] = fetch_ttvdrops_campaign_media_items(entry)
if ttvdrops_media_items:
return ttvdrops_media_items[:image_limit]
description: str = entry.title or entry.id
for image_url in get_image_urls(entry.summary, entry.content, limit=image_limit):
add_unique_media_gallery_item(media_items, image_url, description=description)
add_unique_media_gallery_item(media_items, custom_embed.image_url, description=description)
add_unique_media_gallery_item(media_items, custom_embed.thumbnail_url, description=description)
return media_items[:image_limit]
def truncate_component_text(content: str) -> str:
"""Trim a Text Display component to a conservative Discord-safe length.
Returns:
Original or truncated component text.
"""
max_text_display_length: int = 4000
if len(content) <= max_text_display_length:
return content
return f"{content[: max_text_display_length - 3]}..."
def get_component_text_display_content(custom_embed: CustomEmbed, entry: Entry) -> str:
"""Build markdown text for a Components V2 Text Display.
Returns:
Markdown content for a Text Display component.
"""
parts: list[str] = []
if custom_embed.title:
parts.append(f"# {custom_embed.title}")
if custom_embed.author_name and custom_embed.author_url:
parts.append(f"## [{custom_embed.author_name}]({custom_embed.author_url})")
elif custom_embed.author_name:
parts.append(f"## {custom_embed.author_name}")
elif custom_embed.author_url:
parts.append(f"<{custom_embed.author_url}>")
if custom_embed.description:
parts.append(custom_embed.description)
if custom_embed.footer_text:
parts.append(f"-# {custom_embed.footer_text}")
if not parts:
fallback_text: str = entry.title or entry.link or entry.id
if entry.link and fallback_text != entry.link:
fallback_text = f"[{fallback_text}]({entry.link})"
parts.append(fallback_text)
return truncate_component_text("\n\n".join(parts))
def create_media_gallery_component(media_items: list[JsonObject]) -> JsonObject:
"""Build a Discord Media Gallery component.
Returns:
Discord Media Gallery component payload.
"""
return {
"type": 12,
"items": [
{
"media": {"url": media_item["url"]},
"description": media_item["description"],
}
for media_item in media_items[:10]
if isinstance(media_item.get("url"), str) and isinstance(media_item.get("description"), str)
],
}
def create_components_v2_webhook(
webhook_url: str,
entry: Entry,
custom_embed: CustomEmbed,
media_items: list[JsonObject],
) -> DiscordWebhook:
"""Create a Components V2 webhook with text and a media gallery.
Returns:
Webhook payload configured for Components V2.
"""
components: list[JsonValue] = [
{
"type": 10,
"content": get_component_text_display_content(custom_embed, entry),
},
create_media_gallery_component(media_items),
]
return DiscordWebhook(
url=webhook_url,
flags=1 << 15,
components=components,
rate_limit_retry=True,
)
def create_embed_webhook( # noqa: C901, PLR0912
webhook_url: str, webhook_url: str,
entry: Entry, entry: Entry,
reader: Reader, reader: Reader,
@ -1183,6 +1634,18 @@ def create_embed_webhook( # noqa: C901
# Get the embed data from the database. # Get the embed data from the database.
custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry, reader=reader) custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry, reader=reader)
media_gallery_image_limit: int = get_feed_media_gallery_image_limit(reader, feed)
if media_gallery_image_limit == 0:
custom_embed.image_url = ""
custom_embed.thumbnail_url = ""
media_gallery_items: list[JsonObject] = get_entry_media_gallery_items(
entry,
custom_embed,
image_limit=media_gallery_image_limit,
)
if media_gallery_items:
return create_components_v2_webhook(webhook_url, entry, custom_embed, media_gallery_items)
discord_embed: DiscordEmbed = DiscordEmbed() discord_embed: DiscordEmbed = DiscordEmbed()
@ -1357,11 +1820,12 @@ def execute_webhook(
logger.warning("Feed not found in reader, not sending entry to Discord: %s", entry_feed.url) logger.warning("Feed not found in reader, not sending entry to Discord: %s", entry_feed.url)
return return
request_payload: JsonObject = get_webhook_request_payload(webhook)
payload: JsonObject = get_webhook_message_payload(webhook) payload: JsonObject = get_webhook_message_payload(webhook)
response: Response = webhook.execute() response: httpx.Response = send_webhook_message(webhook, request_payload)
logger.debug("Discord webhook response for entry %s: status=%s", entry.id, response.status_code) logger.debug("Discord webhook response for entry %s: status=%s", entry.id, response.status_code)
if response.status_code not in {200, 204}: if response.status_code not in {200, 204}:
msg: str = f"Error sending entry to Discord: {response.text}\n{pprint.pformat(webhook.json)}" msg: str = f"Error sending entry to Discord: {response.text}\n{pprint.pformat(request_payload)}"
if entry: if entry:
msg += f"\n{entry}" msg += f"\n{entry}"
@ -1469,7 +1933,14 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None:
reader.set_tag(clean_feed_url, "webhook", webhook_url) # pyright: ignore[reportArgumentType] reader.set_tag(clean_feed_url, "webhook", webhook_url) # pyright: ignore[reportArgumentType]
# Store sent Discord message ids by default so modified feed entries can edit the original webhook message. # Store sent Discord message ids by default so modified feed entries can edit the original webhook message.
reader.set_tag(clean_feed_url, SAVE_SENT_WEBHOOKS_TAG, True) # pyright: ignore[reportArgumentType] reader.set_tag(clean_feed_url, "save_sent_webhooks", True) # pyright: ignore[reportArgumentType]
# Keep the existing delivery behavior for new feeds unless changed from the feed page.
reader.set_tag(
clean_feed_url,
"media_gallery_image_limit",
cast("JSONType", 1),
)
# This is the default message that will be sent to Discord. # This is the default message that will be sent to Discord.
reader.set_tag(clean_feed_url, "custom_message", default_custom_message) # pyright: ignore[reportArgumentType] reader.set_tag(clean_feed_url, "custom_message", default_custom_message) # pyright: ignore[reportArgumentType]

View file

@ -8,8 +8,9 @@ from typing import TYPE_CHECKING
from typing import cast from typing import cast
import requests import requests
from discord_webhook import DiscordEmbed
from discord_webhook import DiscordWebhook from discord_rss_bot.webhook import DiscordEmbed
from discord_rss_bot.webhook import DiscordWebhook
if TYPE_CHECKING: if TYPE_CHECKING:
from reader import Entry from reader import Entry

View file

@ -51,12 +51,13 @@ from discord_rss_bot.custom_message import get_embed
from discord_rss_bot.custom_message import get_first_image from discord_rss_bot.custom_message import get_first_image
from discord_rss_bot.custom_message import replace_tags_in_text_message from discord_rss_bot.custom_message import replace_tags_in_text_message
from discord_rss_bot.custom_message import save_embed from discord_rss_bot.custom_message import save_embed
from discord_rss_bot.feeds import SAVE_SENT_WEBHOOKS_TAG
from discord_rss_bot.feeds import SentWebhookRecord from discord_rss_bot.feeds import SentWebhookRecord
from discord_rss_bot.feeds import coerce_media_gallery_image_limit
from discord_rss_bot.feeds import create_feed from discord_rss_bot.feeds import create_feed
from discord_rss_bot.feeds import extract_domain from discord_rss_bot.feeds import extract_domain
from discord_rss_bot.feeds import feed_saves_sent_webhooks from discord_rss_bot.feeds import feed_saves_sent_webhooks
from discord_rss_bot.feeds import get_feed_delivery_mode from discord_rss_bot.feeds import get_feed_delivery_mode
from discord_rss_bot.feeds import get_feed_media_gallery_image_limit
from discord_rss_bot.feeds import get_screenshot_layout from discord_rss_bot.feeds import get_screenshot_layout
from discord_rss_bot.feeds import get_sent_webhook_records from discord_rss_bot.feeds import get_sent_webhook_records
from discord_rss_bot.feeds import send_entry_to_discord from discord_rss_bot.feeds import send_entry_to_discord
@ -71,6 +72,7 @@ from discord_rss_bot.filter.evaluator import evaluate_entry_filters
from discord_rss_bot.filter.evaluator import get_entry_decision_key from discord_rss_bot.filter.evaluator import get_entry_decision_key
from discord_rss_bot.filter.evaluator import get_entry_fields from discord_rss_bot.filter.evaluator import get_entry_fields
from discord_rss_bot.filter.evaluator import get_filter_values_from_reader from discord_rss_bot.filter.evaluator import get_filter_values_from_reader
from discord_rss_bot.filter.evaluator import has_filter_values
from discord_rss_bot.git_backup import commit_state_change from discord_rss_bot.git_backup import commit_state_change
from discord_rss_bot.git_backup import get_backup_path from discord_rss_bot.git_backup import get_backup_path
from discord_rss_bot.is_url_valid import is_url_valid from discord_rss_bot.is_url_valid import is_url_valid
@ -1346,12 +1348,44 @@ async def post_set_feed_save_sent_webhooks(
except FeedNotFoundError as e: except FeedNotFoundError as e:
raise HTTPException(status_code=404, detail="Feed not found") from e raise HTTPException(status_code=404, detail="Feed not found") from e
reader.set_tag(clean_feed_url, SAVE_SENT_WEBHOOKS_TAG, should_save) # pyright: ignore[reportArgumentType] reader.set_tag(clean_feed_url, "save_sent_webhooks", should_save) # pyright: ignore[reportArgumentType]
action: str = "Enable" if should_save else "Disable" action: str = "Enable" if should_save else "Disable"
commit_state_change(reader, f"{action} sent webhook storage for {clean_feed_url}") commit_state_change(reader, f"{action} sent webhook storage for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.post("/set_feed_media_gallery_image_limit")
async def post_set_feed_media_gallery_image_limit(
feed_url: Annotated[str, Form()],
image_limit: Annotated[int, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
) -> RedirectResponse:
"""Set whether a feed sends one image or a full media gallery.
Returns:
RedirectResponse: Redirect to the feed page.
Raises:
HTTPException: If the feed does not exist.
"""
clean_feed_url: str = feed_url.strip()
clean_image_limit: int = coerce_media_gallery_image_limit(image_limit)
clean_image_limit_json: JSONType = cast("JSONType", clean_image_limit)
try:
reader.get_feed(clean_feed_url)
except FeedNotFoundError as e:
raise HTTPException(status_code=404, detail="Feed not found") from e
reader.set_tag(
clean_feed_url,
"media_gallery_image_limit",
clean_image_limit_json,
)
commit_state_change(reader, f"Set media gallery image limit to {clean_image_limit} for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.post("/set_update_interval") @app.post("/set_update_interval")
async def post_set_update_interval( async def post_set_update_interval(
feed_url: Annotated[str, Form()], feed_url: Annotated[str, Form()],
@ -1620,6 +1654,9 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
current_webhook_name = hook.get("name", "").strip() current_webhook_name = hook.get("name", "").strip()
break break
has_blacklist_filters: bool = has_filter_values(get_filter_values_from_reader(reader, feed, "blacklist"))
has_whitelist_filters: bool = has_filter_values(get_filter_values_from_reader(reader, feed, "whitelist"))
# Only show button if more than 10 entries. # Only show button if more than 10 entries.
total_entries: int = reader.get_entry_counts(feed=feed).total or 0 total_entries: int = reader.get_entry_counts(feed=feed).total or 0
is_show_more_entries_button_visible: bool = total_entries > entries_per_page is_show_more_entries_button_visible: bool = total_entries > entries_per_page
@ -1668,6 +1705,10 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
"webhooks": webhooks, "webhooks": webhooks,
"current_webhook_url": current_webhook_url, "current_webhook_url": current_webhook_url,
"current_webhook_name": current_webhook_name, "current_webhook_name": current_webhook_name,
"has_blacklist_filters": has_blacklist_filters,
"has_whitelist_filters": has_whitelist_filters,
"media_gallery_image_limit": get_feed_media_gallery_image_limit(reader, feed),
"max_media_gallery_items": 10,
"save_sent_webhooks": feed_saves_sent_webhooks(reader, feed), "save_sent_webhooks": feed_saves_sent_webhooks(reader, feed),
} }
return templates.TemplateResponse(request=request, name="feed.html", context=context) return templates.TemplateResponse(request=request, name="feed.html", context=context)
@ -1728,6 +1769,10 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
"webhooks": webhooks, "webhooks": webhooks,
"current_webhook_url": current_webhook_url, "current_webhook_url": current_webhook_url,
"current_webhook_name": current_webhook_name, "current_webhook_name": current_webhook_name,
"has_blacklist_filters": has_blacklist_filters,
"has_whitelist_filters": has_whitelist_filters,
"media_gallery_image_limit": get_feed_media_gallery_image_limit(reader, feed),
"max_media_gallery_items": 10,
"save_sent_webhooks": feed_saves_sent_webhooks(reader, feed), "save_sent_webhooks": feed_saves_sent_webhooks(reader, feed),
} }
return templates.TemplateResponse(request=request, name="feed.html", context=context) return templates.TemplateResponse(request=request, name="feed.html", context=context)

View file

@ -18,6 +18,11 @@ body {
max-width: 120px; max-width: 120px;
} }
.image-limit-control {
min-width: 12rem;
max-width: 28rem;
}
.screenshot-requirement { .screenshot-requirement {
color: #9c9c9c; color: #9c9c9c;
font-size: 0.9rem; font-size: 0.9rem;
@ -65,6 +70,10 @@ body {
word-break: break-word; word-break: break-word;
} }
.feed-summary-list {
padding-left: 1.15rem;
}
.sent-webhooks__entry, .sent-webhooks__entry,
.sent-webhooks__preview { .sent-webhooks__preview {
min-width: 14rem; min-width: 14rem;

View file

@ -236,10 +236,12 @@
<label for="author_icon_url" class="col-sm-6 col-form-label">Author icon URL</label> <label for="author_icon_url" class="col-sm-6 col-form-label">Author icon URL</label>
<input name="author_icon_url" type="text" class="form-control bg-dark border-dark text-muted" <input name="author_icon_url" type="text" class="form-control bg-dark border-dark text-muted"
id="author_icon_url" {% if author_icon_url %} value="{{- author_icon_url -}}" {% endif %} /> id="author_icon_url" {% if author_icon_url %} value="{{- author_icon_url -}}" {% endif %} />
<label for="image_url" class="col-sm-6 col-form-label">Image URL - Add {% raw %}{{image_1}}{% endraw %} <label for="image_url" class="col-sm-6 col-form-label">Image URL</label>
for first image</label>
<input name="image_url" type="text" class="form-control bg-dark border-dark text-muted" id="image_url" <input name="image_url" type="text" class="form-control bg-dark border-dark text-muted" id="image_url"
{% if image_url %} value="{{- image_url -}}" {% endif %} /> {% if image_url %} value="{{- image_url -}}" {% endif %} />
<div class="form-text">
Use {% raw %}{{image_1}}{% endraw %} to use the first image URL found in the entry. You can also configure how many images gets sent via the feed page.
</div>
<label for="thumbnail_url" class="col-sm-6 col-form-label">Thumbnail</label> <label for="thumbnail_url" class="col-sm-6 col-form-label">Thumbnail</label>
<input name="thumbnail_url" type="text" class="form-control bg-dark border-dark text-muted" <input name="thumbnail_url" type="text" class="form-control bg-dark border-dark text-muted"
id="thumbnail_url" {% if thumbnail_url %} value="{{- thumbnail_url -}}" {% endif %} /> id="thumbnail_url" {% if thumbnail_url %} value="{{- thumbnail_url -}}" {% endif %} />

View file

@ -30,6 +30,18 @@
Text Text
{% endif %} {% endif %}
</span> </span>
{% if not "youtube.com/feeds/videos.xml" in feed.url %}
<span class="badge status-chip bg-secondary">
Images:
{% if media_gallery_image_limit == 0 %}
No images
{% elif media_gallery_image_limit == 1 %}
First image only
{% else %}
Up to {{ media_gallery_image_limit }} images
{% endif %}
</span>
{% endif %}
{% if delivery_mode == "screenshot" %} {% if delivery_mode == "screenshot" %}
<span class="badge status-chip bg-secondary"> <span class="badge status-chip bg-secondary">
Screenshot layout: Screenshot layout:
@ -42,6 +54,72 @@
{% endif %} {% endif %}
</div> </div>
</div> </div>
<section class="mt-4 pt-3 border-top border-secondary-subtle">
<h3 class="h6 text-uppercase text-muted mb-2">Feed Summary</h3>
<p class="text-muted mb-2">
This feed
{% if feed.updates_enabled %}
will send new entries
{% else %}
is paused and will not send new entries
{% endif %}
{% if current_webhook_name %}
to <strong>{{ current_webhook_name }}</strong>
{% elif current_webhook_url %}
to a webhook that is no longer saved
{% else %}
after a webhook is attached
{% endif %}
as
{% if delivery_mode == "embed" %}
an embed.
{% elif delivery_mode == "screenshot" %}
a screenshot of the entry link page in {{ screenshot_layout }} mode.
{% else %}
a text message.
{% endif %}
</p>
<ul class="text-muted small mb-0 feed-summary-list">
<li>
Update interval:
{% if feed_interval %}
{{ feed_interval }} minutes.
{% else %}
{{ global_interval }} minutes because of the global default.
{% endif %}
</li>
{% if delivery_mode == "embed" %}
<li>
Embed images:
{% if media_gallery_image_limit == 0 %}
none.
{% elif media_gallery_image_limit == 1 %}
first image only.
{% else %}
up to {{ media_gallery_image_limit }} images.
{% endif %}
</li>
{% elif delivery_mode == "screenshot" %}
<li>Screenshot layout: {{ screenshot_layout }}.</li>
{% endif %}
<li>
Filters:
{% if has_blacklist_filters and has_whitelist_filters %}
whitelist and blacklist are active; blacklist wins when both match.
{% elif has_blacklist_filters %}
blacklist is active.
{% elif has_whitelist_filters %}
whitelist is active.
{% else %}
no whitelist or blacklist filters are configured.
{% endif %}
</li>
<li>
Updating the Discord webhook when the feed entry changes is
{{ 'enabled.' if save_sent_webhooks else 'disabled.' }}
</li>
</ul>
</section>
{% if feed.last_exception %} {% if feed.last_exception %}
<div class="alert alert-danger mt-4 mb-0" role="alert"> <div class="alert alert-danger mt-4 mb-0" role="alert">
<h5 class="alert-heading mb-2">{{ feed.last_exception.type_name }}</h5> <h5 class="alert-heading mb-2">{{ feed.last_exception.type_name }}</h5>
@ -88,24 +166,56 @@
name="feed_url" name="feed_url"
value="{{ feed.url }}">Send text message instead of embed</button> value="{{ feed.url }}">Send text message instead of embed</button>
</form> </form>
<form action="/use_screenshot" method="post" class="d-inline">
<button class="btn btn-outline-light btn-sm"
name="feed_url"
value="{{ feed.url }}">
Send full-page screenshot instead of embed
</button>
</form>
{% elif delivery_mode == "screenshot" %} {% elif delivery_mode == "screenshot" %}
<form action="/use_embed" method="post" class="d-inline">
<button class="btn btn-outline-light btn-sm"
name="feed_url"
value="{{ feed.url }}">Send embed instead of screenshot</button>
</form>
<form action="/use_text" method="post" class="d-inline"> <form action="/use_text" method="post" class="d-inline">
<button class="btn btn-outline-light btn-sm" <button class="btn btn-outline-light btn-sm"
name="feed_url" name="feed_url"
value="{{ feed.url }}">Send text message instead of screenshot</button> value="{{ feed.url }}">Send text message instead of screenshot</button>
</form> </form>
{% else %}
<form action="/use_embed" method="post" class="d-inline">
<button class="btn btn-outline-light btn-sm"
name="feed_url"
value="{{ feed.url }}">Send embed instead of text message</button>
</form>
{% endif %}
{% endif %}
</div>
</section>
{% if not "youtube.com/feeds/videos.xml" in feed.url %}
<section class="mt-4 pt-3 border-top border-secondary-subtle">
<div class="d-flex flex-wrap align-items-center justify-content-between gap-2 mb-2">
<h3 class="h6 text-uppercase text-muted mb-0">Screenshot Delivery</h3>
<span class="badge {{ 'bg-info' if delivery_mode == 'screenshot' else 'bg-secondary' }}">
{% if delivery_mode == "screenshot" %}
Active:
{% if screenshot_layout == "mobile" %}
Mobile
{% else %}
Desktop
{% endif %}
{% else %}
Inactive
{% endif %}
</span>
</div>
<p class="text-muted mb-3">
Screenshot delivery sends a full-page screenshot of the entry link instead of the normal
embed or text message.
</p>
<div class="d-flex flex-wrap gap-2">
{% if delivery_mode != "screenshot" %}
<form action="/use_screenshot" method="post" class="d-inline">
<button class="btn btn-outline-light btn-sm"
name="feed_url"
value="{{ feed.url }}">Use screenshot delivery</button>
</form>
{% else %}
<form action="/use_embed" method="post" class="d-inline">
<button class="btn btn-outline-light btn-sm"
name="feed_url"
value="{{ feed.url }}">Disable screenshot delivery</button>
</form>
{% if screenshot_layout == "mobile" %} {% if screenshot_layout == "mobile" %}
<form action="/use_screenshot_desktop" method="post" class="d-inline"> <form action="/use_screenshot_desktop" method="post" class="d-inline">
<button class="btn btn-outline-light btn-sm" <button class="btn btn-outline-light btn-sm"
@ -119,26 +229,61 @@
value="{{ feed.url }}">Use mobile screenshot layout</button> value="{{ feed.url }}">Use mobile screenshot layout</button>
</form> </form>
{% endif %} {% endif %}
{% else %}
<form action="/use_embed" method="post" class="d-inline">
<button class="btn btn-outline-light btn-sm"
name="feed_url"
value="{{ feed.url }}">Send embed instead of text message</button>
</form>
<form action="/use_screenshot" method="post" class="d-inline">
<button class="btn btn-outline-light btn-sm"
name="feed_url"
value="{{ feed.url }}">
Send full-page screenshot instead of text message
</button>
</form>
{% endif %} {% endif %}
<div class="w-100 mt-1 screenshot-requirement"> </div>
Screenshot mode requires Chromium to be installed for Playwright. Run <code>uv run playwright install chromium</code> once on this machine. <div class="mt-2 screenshot-requirement">
Screenshot mode requires Chromium to be installed for Playwright. Run
<code>uv run playwright install chromium</code> once on this machine.
</div>
</section>
<section class="mt-4 pt-3 border-top border-secondary-subtle">
<div class="d-flex flex-wrap align-items-center justify-content-between gap-2 mb-2">
<h3 class="h6 text-uppercase text-muted mb-0">Image Delivery</h3>
<span class="badge {{ 'bg-info' if media_gallery_image_limit < max_media_gallery_items else 'bg-secondary' }}">
{% if media_gallery_image_limit == 0 %}
No images
{% elif media_gallery_image_limit == 1 %}
First image only
{% else %}
Up to {{ media_gallery_image_limit }} images
{% endif %}
</span>
</div>
<p id="imageDeliveryHelp" class="text-muted mb-3">
Choose 0 to send no entry images, 1 for the first image only,
or up to {{ max_media_gallery_items }} for a Discord media gallery.
This only affects embed delivery.
</p>
<form action="/set_feed_media_gallery_image_limit"
method="post"
class="mb-0">
<input type="hidden" name="feed_url" value="{{ feed.url }}" />
<label class="form-label small text-muted mb-2" for="image_limit">
Images per entry:
<output name="image_limit_value" for="image_limit">{{ media_gallery_image_limit }}</output>
</label>
<div class="d-flex flex-wrap align-items-center gap-3">
<div class="flex-grow-1 image-limit-control">
<input id="image_limit"
class="form-range"
type="range"
name="image_limit"
min="0"
max="{{ max_media_gallery_items }}"
step="1"
value="{{ media_gallery_image_limit }}"
aria-describedby="imageDeliveryHelp"
oninput="this.form.elements.image_limit_value.value = this.value" />
<div class="d-flex justify-content-between text-muted small">
<span>0</span>
<span>{{ max_media_gallery_items }}</span>
</div>
</div>
<button class="btn btn-outline-light btn-sm" type="submit">Save image limit</button>
</div> </div>
{% endif %} </form>
</div> </section>
</section> {% endif %}
<section class="mt-4 pt-3 border-top border-secondary-subtle"> <section class="mt-4 pt-3 border-top border-secondary-subtle">
<h3 class="h6 text-uppercase text-muted mb-3">Customization</h3> <h3 class="h6 text-uppercase text-muted mb-3">Customization</h3>
<div class="d-flex flex-wrap gap-2"> <div class="d-flex flex-wrap gap-2">

198
discord_rss_bot/webhook.py Normal file
View file

@ -0,0 +1,198 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from typing import cast
type JsonValue = bool | int | float | str | list[JsonValue] | dict[str, JsonValue] | None
type JsonObject = dict[str, JsonValue]
@dataclass(frozen=True)
class WebhookFile:
"""A file uploaded with a Discord webhook request."""
filename: str
content: bytes
class DiscordEmbed:
"""Small Discord embed payload builder used by the webhook sender."""
def __init__(self) -> None: # noqa: D107
self._payload: JsonObject = {}
def to_dict(self) -> JsonObject:
"""Return the JSON payload for this embed."""
return cast("JsonObject", dict(self._payload))
def set_description(self, description: str) -> None:
self._payload["description"] = description
def set_title(self, title: str) -> None:
self._payload["title"] = title
def set_url(self, url: str) -> None:
self._payload["url"] = url
def set_color(self, color: int | str) -> None:
if isinstance(color, int):
self._payload["color"] = color
return
normalized_color: str = color.removeprefix("#")
self._payload["color"] = int(normalized_color, 16)
def set_author(self, *, name: str, url: str | None = None, icon_url: str | None = None) -> None:
author: JsonObject = {"name": name}
if url:
author["url"] = url
if icon_url:
author["icon_url"] = icon_url
self._payload["author"] = author
def set_thumbnail(self, *, url: str) -> None:
self._payload["thumbnail"] = {"url": url}
def set_image(self, *, url: str, **_ignored: Any) -> None: # noqa: ANN401
self._payload["image"] = {"url": url}
def set_footer(self, *, text: str, icon_url: str | None = None) -> None:
footer: JsonObject = {"text": text}
if icon_url:
footer["icon_url"] = icon_url
self._payload["footer"] = footer
def add_embed_field(self, *, name: str, value: str, inline: bool | None = None) -> None:
fields = self._payload.setdefault("fields", [])
if not isinstance(fields, list):
fields = []
self._payload["fields"] = fields
field: JsonObject = {"name": name, "value": value}
if inline is not None:
field["inline"] = inline
fields.append(field)
def set_timestamp(self, *, timestamp: str) -> None:
self._payload["timestamp"] = timestamp
class DiscordWebhook:
"""Discord webhook request data.
This intentionally mirrors the subset of `discord-webhook` used by the app
while leaving the actual HTTP transport to `httpx`.
"""
def __init__( # noqa: D107
self,
url: str,
*,
content: str | None = None,
username: str | None = None,
avatar_url: str | None = None,
tts: bool | None = None,
allowed_mentions: JsonObject | None = None,
flags: int | None = None,
components: list[JsonValue] | None = None,
thread_id: str | None = None,
timeout: float | None = None,
rate_limit_retry: bool = False,
**_ignored: Any, # noqa: ANN401
) -> None:
self.url: str = url
self.thread_id: str | None = thread_id
self.timeout: int | float = timeout or 30.0
self.rate_limit_retry: bool = rate_limit_retry
self.files: list[WebhookFile] = []
self._payload: JsonObject = {}
if content is not None:
self._payload["content"] = content
if username:
self._payload["username"] = username
if avatar_url:
self._payload["avatar_url"] = avatar_url
if tts is not None:
self._payload["tts"] = tts
if allowed_mentions is not None:
self._payload["allowed_mentions"] = allowed_mentions
if flags is not None:
self._payload["flags"] = flags
if components is not None:
self._payload["components"] = components
@property
def json(self) -> JsonObject:
return self._payload
@property
def content(self) -> str | None:
value = self._payload.get("content")
return value if isinstance(value, str) else None
@content.setter
def content(self, value: str | None) -> None:
if value is None:
self._payload.pop("content", None)
else:
self._payload["content"] = value
@property
def username(self) -> str | None:
value = self._payload.get("username")
return value if isinstance(value, str) else None
@username.setter
def username(self, value: str | None) -> None:
if value:
self._payload["username"] = value
else:
self._payload.pop("username", None)
@property
def avatar_url(self) -> str | None:
value = self._payload.get("avatar_url")
return value if isinstance(value, str) else None
@avatar_url.setter
def avatar_url(self, value: str | None) -> None:
if value:
self._payload["avatar_url"] = value
else:
self._payload.pop("avatar_url", None)
@property
def components(self) -> list[JsonValue]:
value = self._payload.get("components")
return cast("list[JsonValue]", value) if isinstance(value, list) else []
@components.setter
def components(self, value: list[JsonValue]) -> None:
self._payload["components"] = value
@property
def flags(self) -> int | None:
value = self._payload.get("flags")
return value if isinstance(value, int) else None
@flags.setter
def flags(self, value: int | None) -> None:
if value is None:
self._payload.pop("flags", None)
else:
self._payload["flags"] = value
def add_file(self, *, file: bytes, filename: str) -> None:
self.files.append(WebhookFile(filename=filename, content=file))
def add_embed(self, embed: DiscordEmbed) -> None:
embeds = self._payload.setdefault("embeds", [])
if not isinstance(embeds, list):
embeds = []
self._payload["embeds"] = embeds
embeds.append(embed.to_dict())
def remove_embeds(self) -> None:
self._payload.pop("embeds", None)

View file

@ -6,7 +6,6 @@ readme = "README.md"
requires-python = ">=3.12" requires-python = ">=3.12"
dependencies = [ dependencies = [
"apscheduler>=3.11.0", "apscheduler>=3.11.0",
"discord-webhook",
"fastapi", "fastapi",
"httpx", "httpx",
"jinja2", "jinja2",

View file

@ -13,6 +13,7 @@ from discord_rss_bot.custom_message import get_custom_message
from discord_rss_bot.custom_message import get_embed from discord_rss_bot.custom_message import get_embed
from discord_rss_bot.custom_message import get_embed_data from discord_rss_bot.custom_message import get_embed_data
from discord_rss_bot.custom_message import get_first_image from discord_rss_bot.custom_message import get_first_image
from discord_rss_bot.custom_message import get_image_urls
from discord_rss_bot.custom_message import replace_tags_in_embed from discord_rss_bot.custom_message import replace_tags_in_embed
from discord_rss_bot.custom_message import replace_tags_in_text_message from discord_rss_bot.custom_message import replace_tags_in_text_message
from discord_rss_bot.custom_message import save_embed from discord_rss_bot.custom_message import save_embed
@ -203,6 +204,34 @@ def test_get_first_image_uses_summary_when_content_image_is_invalid() -> None:
assert image == "https://example.com/from-summary.jpg" assert image == "https://example.com/from-summary.jpg"
def test_get_image_urls_returns_all_valid_images_in_order_without_duplicates() -> None:
summary = (
'<p><img src="https://example.com/from-summary.jpg" /><img src="https://example.com/from-content-1.jpg" /></p>'
)
content = (
'<p><img src="https://example.com/from-content-1.jpg" />'
'<img src="javascript:alert(1)" />'
'<img src="https://example.com/from-content-2.jpg" /></p>'
)
images = get_image_urls(summary, content)
assert images == [
"https://example.com/from-content-1.jpg",
"https://example.com/from-content-2.jpg",
"https://example.com/from-summary.jpg",
]
def test_get_image_urls_respects_limit() -> None:
summary = '<img src="https://example.com/summary.jpg" />'
content = '<img src="https://example.com/one.jpg" /><img src="https://example.com/two.jpg" />'
images = get_image_urls(summary, content, limit=2)
assert images == ["https://example.com/one.jpg", "https://example.com/two.jpg"]
def test_get_first_image_returns_empty_when_images_have_no_src() -> None: def test_get_first_image_returns_empty_when_images_have_no_src() -> None:
summary = "<p></p>" summary = "<p></p>"
content = '<p><img alt="missing source" /></p>' content = '<p><img alt="missing source" /></p>'

View file

@ -39,6 +39,12 @@ from discord_rss_bot.feeds import should_send_embed_check
from discord_rss_bot.feeds import truncate_webhook_message from discord_rss_bot.feeds import truncate_webhook_message
def get_test_webhook_components(webhook: feeds.DiscordWebhook) -> list[feeds.JsonValue]:
components = webhook.json.get("components")
assert isinstance(components, list)
return components
def test_send_to_discord() -> None: def test_send_to_discord() -> None:
"""Test sending to Discord.""" """Test sending to Discord."""
# Skip early if no webhook URL is configured to avoid a real network request. # Skip early if no webhook URL is configured to avoid a real network request.
@ -332,6 +338,45 @@ def test_get_screenshot_layout_defaults_to_desktop() -> None:
assert result == "desktop" assert result == "desktop"
@pytest.mark.parametrize(
("tag_value", "expected_limit"),
[
(0, 0),
(1, 1),
(7, 7),
(-1, 0),
(99, 10),
("first", 1),
("off", 0),
("8", 8),
("unknown", 1),
],
)
def test_get_feed_media_gallery_image_limit_normalizes_stored_tag(
tag_value: feeds.JsonValue,
expected_limit: int,
) -> None:
reader = MagicMock()
feed = MagicMock()
feed.url = "https://example.com/feed.xml"
reader.get_tag.return_value = tag_value
result = feeds.get_feed_media_gallery_image_limit(reader, feed)
assert result == expected_limit
def test_get_feed_media_gallery_image_limit_defaults_to_first_image() -> None:
reader = MagicMock()
feed = MagicMock()
feed.url = "https://example.com/feed.xml"
reader.get_tag.side_effect = lambda resource, key, default=None: default # noqa: ARG005
result = feeds.get_feed_media_gallery_image_limit(reader, feed)
assert result == 1
def test_create_feed_inherits_global_screenshot_layout() -> None: def test_create_feed_inherits_global_screenshot_layout() -> None:
reader = MagicMock() reader = MagicMock()
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005 reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
@ -368,7 +413,24 @@ def test_create_feed_enables_sent_webhook_tracking_by_default() -> None:
create_feed(reader, "https://example.com/feed.xml", "Main") create_feed(reader, "https://example.com/feed.xml", "Main")
reader.set_tag.assert_any_call("https://example.com/feed.xml", feeds.SAVE_SENT_WEBHOOKS_TAG, True) reader.set_tag.assert_any_call("https://example.com/feed.xml", "save_sent_webhooks", True)
def test_create_feed_sets_default_media_gallery_image_limit() -> None:
reader = MagicMock()
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
"webhooks": [{"name": "Main", "url": "https://discord.com/api/webhooks/123/abc"}],
"screenshot_layout": "desktop",
"delivery_mode": "embed",
}.get(key, default)
create_feed(reader, "https://example.com/feed.xml", "Main")
reader.set_tag.assert_any_call(
"https://example.com/feed.xml",
"media_gallery_image_limit",
1,
)
def test_create_feed_falls_back_to_embed_when_global_delivery_mode_is_invalid() -> None: def test_create_feed_falls_back_to_embed_when_global_delivery_mode_is_invalid() -> None:
@ -568,6 +630,195 @@ def test_create_screenshot_webhook_falls_back_to_text_on_failure(
) )
@patch("discord_rss_bot.feeds.fetch_ttvdrops_campaign_media_items", return_value=[])
@patch("discord_rss_bot.feeds.replace_tags_in_embed")
def test_create_embed_webhook_uses_media_gallery_for_entry_images(
mock_replace_tags_in_embed: MagicMock,
mock_fetch_ttvdrops_campaign_media_items: MagicMock,
) -> None:
reader = MagicMock()
reader.get_tag.return_value = 10
entry = MagicMock()
entry.id = "entry-1"
entry.title = "Entry title"
entry.link = "https://example.com/entry"
entry.summary = '<img src="https://example.com/summary.jpg" />'
entry.content = [
MagicMock(value='<img src="https://example.com/content-1.jpg" />'),
MagicMock(value='<img src="https://example.com/content-2.jpg" />'),
]
entry.feed.url = "https://example.com/feed.xml"
mock_replace_tags_in_embed.return_value = feeds.CustomEmbed(
description="Entry body",
author_name="Entry title",
author_url="https://example.com/entry",
)
webhook = feeds.create_embed_webhook("https://discord.com/api/webhooks/123/abc", entry, reader)
assert webhook.flags == 1 << 15
components = get_test_webhook_components(webhook)
assert components[0] == {
"type": 10,
"content": "## [Entry title](https://example.com/entry)\n\nEntry body",
}
gallery = components[1]
assert isinstance(gallery, dict)
assert gallery["type"] == 12
mock_fetch_ttvdrops_campaign_media_items.assert_called_once_with(entry)
assert gallery["items"] == [
{"media": {"url": "https://example.com/content-1.jpg"}, "description": "Entry title"},
{"media": {"url": "https://example.com/content-2.jpg"}, "description": "Entry title"},
{"media": {"url": "https://example.com/summary.jpg"}, "description": "Entry title"},
]
@patch("discord_rss_bot.feeds.fetch_ttvdrops_campaign_media_items", return_value=[])
@patch("discord_rss_bot.feeds.replace_tags_in_embed")
def test_create_embed_webhook_can_limit_media_gallery_to_first_image(
mock_replace_tags_in_embed: MagicMock,
mock_fetch_ttvdrops_campaign_media_items: MagicMock,
) -> None:
reader = MagicMock()
reader.get_tag.return_value = 1
entry = MagicMock()
entry.id = "entry-1"
entry.title = "Entry title"
entry.link = "https://example.com/entry"
entry.summary = '<img src="https://example.com/summary.jpg" />'
entry.content = [
MagicMock(value='<img src="https://example.com/content-1.jpg" />'),
MagicMock(value='<img src="https://example.com/content-2.jpg" />'),
]
entry.feed.url = "https://example.com/feed.xml"
mock_replace_tags_in_embed.return_value = feeds.CustomEmbed(description="Entry body")
webhook = feeds.create_embed_webhook("https://discord.com/api/webhooks/123/abc", entry, reader)
gallery = get_test_webhook_components(webhook)[1]
assert isinstance(gallery, dict)
mock_fetch_ttvdrops_campaign_media_items.assert_called_once_with(entry)
assert gallery["items"] == [
{"media": {"url": "https://example.com/content-1.jpg"}, "description": "Entry title"},
]
@patch("discord_rss_bot.feeds.fetch_ttvdrops_campaign_media_items", return_value=[])
@patch("discord_rss_bot.feeds.replace_tags_in_embed")
def test_create_embed_webhook_can_disable_media_images(
mock_replace_tags_in_embed: MagicMock,
mock_fetch_ttvdrops_campaign_media_items: MagicMock,
) -> None:
reader = MagicMock()
reader.get_tag.return_value = 0
entry = MagicMock()
entry.id = "entry-1"
entry.title = "Entry title"
entry.link = "https://example.com/entry"
entry.summary = '<img src="https://example.com/summary.jpg" />'
entry.content = [MagicMock(value='<img src="https://example.com/content-1.jpg" />')]
entry.feed.url = "https://example.com/feed.xml"
mock_replace_tags_in_embed.return_value = feeds.CustomEmbed(
description="Entry body",
image_url="https://example.com/custom-image.jpg",
thumbnail_url="https://example.com/custom-thumbnail.jpg",
)
webhook = feeds.create_embed_webhook("https://discord.com/api/webhooks/123/abc", entry, reader)
assert "components" not in webhook.json
assert "embeds" in webhook.json
embeds = webhook.json["embeds"]
assert isinstance(embeds, list)
assert isinstance(embeds[0], dict)
assert "image" not in embeds[0]
assert "thumbnail" not in embeds[0]
mock_fetch_ttvdrops_campaign_media_items.assert_not_called()
@patch("discord_rss_bot.feeds.fetch_ttvdrops_campaign_media_items")
@patch("discord_rss_bot.feeds.replace_tags_in_embed")
def test_create_embed_webhook_prefers_ttvdrops_reward_images_and_alt_text(
mock_replace_tags_in_embed: MagicMock,
mock_fetch_ttvdrops_campaign_media_items: MagicMock,
) -> None:
reader = MagicMock()
entry = MagicMock()
entry.id = "entry-2"
entry.title = "Drop campaign"
entry.link = "https://ttvdrops.lovinator.space/twitch/campaigns/93ba35ae-5bfc-43fe-88ac-49a0aabb2fe2/"
entry.summary = '<img src="https://example.com/feed-image.jpg" />'
entry.content = []
entry.feed.url = "https://ttvdrops.lovinator.space/feed.xml"
mock_replace_tags_in_embed.return_value = feeds.CustomEmbed(description="Campaign body")
mock_fetch_ttvdrops_campaign_media_items.return_value = [
{
"url": "https://ttvdrops.lovinator.space/media/benefits/images/reward.png",
"description": "120 minutes watched: Skulbladi",
},
]
webhook = feeds.create_embed_webhook("https://discord.com/api/webhooks/123/abc", entry, reader)
gallery = get_test_webhook_components(webhook)[1]
assert isinstance(gallery, dict)
assert gallery["items"] == [
{
"media": {"url": "https://ttvdrops.lovinator.space/media/benefits/images/reward.png"},
"description": "120 minutes watched: Skulbladi",
},
]
def test_get_ttvdrops_campaign_api_url_from_campaign_page() -> None:
entry = MagicMock()
entry.link = "https://ttvdrops.lovinator.space/twitch/campaigns/93ba35ae-5bfc-43fe-88ac-49a0aabb2fe2/"
entry.id = "entry-3"
entry.feed.url = "https://example.com/feed.xml"
api_url = feeds.get_ttvdrops_campaign_api_url(entry)
assert api_url == "https://ttvdrops.lovinator.space/twitch/api/v1/campaigns/93ba35ae-5bfc-43fe-88ac-49a0aabb2fe2/"
@patch("discord_rss_bot.feeds.httpx.get")
def test_fetch_ttvdrops_campaign_media_items_extracts_reward_alt_text(mock_get: MagicMock) -> None:
response = MagicMock()
response.status_code = 200
response.json.return_value = {
"image_url": "/media/campaigns/images/campaign.png",
"drops": [
{
"name": "Drop",
"required_minutes_watched": 120,
"benefits": [
{"name": "Skulbladi", "image_url": "/media/benefits/images/reward.png"},
{"image_url": "javascript:alert(1)"},
],
},
],
}
mock_get.return_value = response
entry = MagicMock()
entry.link = "https://ttvdrops.lovinator.space/twitch/campaigns/93ba35ae-5bfc-43fe-88ac-49a0aabb2fe2/"
entry.id = "entry-4"
entry.feed.url = "https://example.com/feed.xml"
media_items = feeds.fetch_ttvdrops_campaign_media_items(entry)
assert media_items == [
{
"url": "https://ttvdrops.lovinator.space/media/benefits/images/reward.png",
"description": "120 minutes watched: Skulbladi",
},
]
mock_get.assert_called_once_with(
"https://ttvdrops.lovinator.space/twitch/api/v1/campaigns/93ba35ae-5bfc-43fe-88ac-49a0aabb2fe2/",
follow_redirects=True,
timeout=10.0,
)
def test_capture_full_page_screenshot_uses_thread_when_loop_running() -> None: def test_capture_full_page_screenshot_uses_thread_when_loop_running() -> None:
"""Capture should offload sync Playwright work when called from an active event loop.""" """Capture should offload sync Playwright work when called from an active event loop."""
with patch("discord_rss_bot.feeds._capture_full_page_screenshot_sync", return_value=b"png") as mock_capture_sync: with patch("discord_rss_bot.feeds._capture_full_page_screenshot_sync", return_value=b"png") as mock_capture_sync:
@ -871,10 +1122,14 @@ def test_execute_webhook_skips_when_feed_missing() -> None:
@patch.object(feeds, "logger") @patch.object(feeds, "logger")
def test_execute_webhook_logs_error_on_bad_status(mock_logger: MagicMock) -> None: @patch("discord_rss_bot.feeds.send_webhook_message")
def test_execute_webhook_logs_error_on_bad_status(
mock_send_webhook_message: MagicMock,
mock_logger: MagicMock,
) -> None:
webhook = MagicMock() webhook = MagicMock()
webhook.json = {"content": "test"} webhook.json = {"content": "test"}
webhook.execute.return_value = MagicMock(status_code=500, text="fail") mock_send_webhook_message.return_value = MagicMock(status_code=500, text="fail")
reader = MagicMock() reader = MagicMock()
entry = MagicMock() entry = MagicMock()
entry.id = "entry-8" entry.id = "entry-8"
@ -887,9 +1142,13 @@ def test_execute_webhook_logs_error_on_bad_status(mock_logger: MagicMock) -> Non
@patch.object(feeds, "logger") @patch.object(feeds, "logger")
def test_execute_webhook_logs_info_on_success(mock_logger: MagicMock) -> None: @patch("discord_rss_bot.feeds.send_webhook_message")
def test_execute_webhook_logs_info_on_success(
mock_send_webhook_message: MagicMock,
mock_logger: MagicMock,
) -> None:
webhook = MagicMock() webhook = MagicMock()
webhook.execute.return_value = MagicMock(status_code=204, text="") mock_send_webhook_message.return_value = MagicMock(status_code=204, text="")
reader = MagicMock() reader = MagicMock()
entry = MagicMock() entry = MagicMock()
entry.id = "entry-9" entry.id = "entry-9"
@ -901,14 +1160,15 @@ def test_execute_webhook_logs_info_on_success(mock_logger: MagicMock) -> None:
mock_logger.info.assert_called_once_with("Sent entry to Discord: %s", "entry-9") mock_logger.info.assert_called_once_with("Sent entry to Discord: %s", "entry-9")
def test_execute_webhook_records_sent_webhook_message() -> None: @patch("discord_rss_bot.feeds.send_webhook_message")
def test_execute_webhook_records_sent_webhook_message(mock_send_webhook_message: MagicMock) -> None:
webhook_url = "https://discord.com/api/webhooks/123/abc" webhook_url = "https://discord.com/api/webhooks/123/abc"
state: dict[str, feeds.JsonValue] = {} state: dict[str, feeds.JsonValue] = {}
def get_tag(_resource: str | tuple[()], key: str, default: feeds.JsonValue = None) -> feeds.JsonValue: def get_tag(_resource: str | tuple[()], key: str, default: feeds.JsonValue = None) -> feeds.JsonValue:
if key == feeds.SENT_WEBHOOKS_TAG: if key == "sent_webhooks":
return state.get(feeds.SENT_WEBHOOKS_TAG, default) return state.get("sent_webhooks", default)
if key == feeds.SAVE_SENT_WEBHOOKS_TAG: if key == "save_sent_webhooks":
return True return True
if key == "webhook": if key == "webhook":
return webhook_url return webhook_url
@ -939,11 +1199,11 @@ def test_execute_webhook_records_sent_webhook_message() -> None:
response.status_code = 200 response.status_code = 200
response.text = '{"id": "message-1"}' response.text = '{"id": "message-1"}'
response.json.return_value = {"id": "message-1"} response.json.return_value = {"id": "message-1"}
webhook.execute.return_value = response mock_send_webhook_message.return_value = response
execute_webhook(webhook, entry, reader) execute_webhook(webhook, entry, reader)
records = state[feeds.SENT_WEBHOOKS_TAG] records = state["sent_webhooks"]
assert isinstance(records, list) assert isinstance(records, list)
assert len(records) == 1 assert len(records) == 1
assert isinstance(records[0], dict) assert isinstance(records[0], dict)
@ -959,11 +1219,12 @@ def test_execute_webhook_records_sent_webhook_message() -> None:
assert records[0]["payload"]["content"] == "Entry title" assert records[0]["payload"]["content"] == "Entry title"
def test_execute_webhook_does_not_record_when_feed_tracking_disabled() -> None: @patch("discord_rss_bot.feeds.send_webhook_message")
def test_execute_webhook_does_not_record_when_feed_tracking_disabled(mock_send_webhook_message: MagicMock) -> None:
webhook_url = "https://discord.com/api/webhooks/123/abc" webhook_url = "https://discord.com/api/webhooks/123/abc"
reader = MagicMock() reader = MagicMock()
reader.get_tag.side_effect = lambda _resource, key, default=None: { reader.get_tag.side_effect = lambda _resource, key, default=None: {
feeds.SAVE_SENT_WEBHOOKS_TAG: False, "save_sent_webhooks": False,
"webhook": webhook_url, "webhook": webhook_url,
}.get(key, default) }.get(key, default)
@ -979,13 +1240,62 @@ def test_execute_webhook_does_not_record_when_feed_tracking_disabled() -> None:
response.status_code = 200 response.status_code = 200
response.text = '{"id": "message-2"}' response.text = '{"id": "message-2"}'
response.json.return_value = {"id": "message-2"} response.json.return_value = {"id": "message-2"}
webhook.execute.return_value = response mock_send_webhook_message.return_value = response
execute_webhook(webhook, entry, reader) execute_webhook(webhook, entry, reader)
reader.set_tag.assert_not_called() reader.set_tag.assert_not_called()
@patch("discord_rss_bot.feeds.httpx.request")
def test_send_webhook_message_posts_components_with_httpx(mock_request: MagicMock) -> None:
response = MagicMock(status_code=200, text='{"id": "message-1"}')
mock_request.return_value = response
components: list[feeds.JsonValue] = [
{
"type": 10,
"content": "# Component update",
},
]
webhook = feeds.DiscordWebhook(
url="https://discord.com/api/webhooks/123/abc?thread_id=456",
flags=1 << 15,
components=components,
)
result = feeds.send_webhook_message(webhook, feeds.get_webhook_request_payload(webhook))
assert result is response
mock_request.assert_called_once()
assert mock_request.call_args.args == ("POST", "https://discord.com/api/webhooks/123/abc")
assert mock_request.call_args.kwargs["json"] == {
"components": components,
"flags": 1 << 15,
}
assert mock_request.call_args.kwargs["params"] == {
"thread_id": "456",
"wait": "true",
"with_components": "true",
}
@patch("discord_rss_bot.feeds.httpx.request")
def test_send_webhook_message_uploads_files_as_multipart(mock_request: MagicMock) -> None:
response = MagicMock(status_code=200, text='{"id": "message-2"}')
mock_request.return_value = response
webhook = feeds.DiscordWebhook(url="https://discord.com/api/webhooks/123/abc", content="Entry link")
webhook.add_file(file=b"image-bytes", filename="entry.png")
result = feeds.send_webhook_message(webhook, feeds.get_webhook_request_payload(webhook))
assert result is response
mock_request.assert_called_once()
assert mock_request.call_args.args == ("POST", "https://discord.com/api/webhooks/123/abc")
assert mock_request.call_args.kwargs["data"] == {"payload_json": '{"content": "Entry link"}'}
assert mock_request.call_args.kwargs["files"] == [("files[0]", ("entry.png", b"image-bytes"))]
assert "json" not in mock_request.call_args.kwargs
@patch("discord_rss_bot.feeds.edit_sent_webhook_message") @patch("discord_rss_bot.feeds.edit_sent_webhook_message")
@patch("discord_rss_bot.feeds.create_webhook_for_entry") @patch("discord_rss_bot.feeds.create_webhook_for_entry")
def test_update_sent_webhooks_for_modified_entries_edits_changed_payload( def test_update_sent_webhooks_for_modified_entries_edits_changed_payload(
@ -995,7 +1305,7 @@ def test_update_sent_webhooks_for_modified_entries_edits_changed_payload(
webhook_url = "https://discord.com/api/webhooks/123/abc" webhook_url = "https://discord.com/api/webhooks/123/abc"
old_payload: JsonObject = {"content": "Old title", "embeds": [], "attachments": []} old_payload: JsonObject = {"content": "Old title", "embeds": [], "attachments": []}
state: dict[str, feeds.JsonValue] = { state: dict[str, feeds.JsonValue] = {
feeds.SENT_WEBHOOKS_TAG: [ "sent_webhooks": [
{ {
"feed_url": "https://example.com/feed.xml", "feed_url": "https://example.com/feed.xml",
"entry_id": "entry-3", "entry_id": "entry-3",
@ -1009,9 +1319,9 @@ def test_update_sent_webhooks_for_modified_entries_edits_changed_payload(
} }
def get_tag(_resource: str | tuple[()], key: str, default: feeds.JsonValue = None) -> feeds.JsonValue: def get_tag(_resource: str | tuple[()], key: str, default: feeds.JsonValue = None) -> feeds.JsonValue:
if key == feeds.SENT_WEBHOOKS_TAG: if key == "sent_webhooks":
return state[feeds.SENT_WEBHOOKS_TAG] return state["sent_webhooks"]
if key == feeds.SAVE_SENT_WEBHOOKS_TAG: if key == "save_sent_webhooks":
return True return True
return default return default
@ -1050,7 +1360,7 @@ def test_update_sent_webhooks_for_modified_entries_edits_changed_payload(
mock_edit_sent_webhook_message.assert_called_once() mock_edit_sent_webhook_message.assert_called_once()
edit_payload = mock_edit_sent_webhook_message.call_args.args[3] edit_payload = mock_edit_sent_webhook_message.call_args.args[3]
assert edit_payload == {"content": "New title"} assert edit_payload == {"content": "New title"}
records = state[feeds.SENT_WEBHOOKS_TAG] records = state["sent_webhooks"]
assert isinstance(records, list) assert isinstance(records, list)
assert isinstance(records[0], dict) assert isinstance(records[0], dict)
assert isinstance(records[0]["payload"], dict) assert isinstance(records[0]["payload"], dict)

View file

@ -229,6 +229,12 @@ def test_get() -> None:
response: Response = client.get(url="/feed", params={"feed_url": encoded_feed_url(feed_url)}) response: Response = client.get(url="/feed", params={"feed_url": encoded_feed_url(feed_url)})
assert response.status_code == 200, f"/feed failed: {response.text}" assert response.status_code == 200, f"/feed failed: {response.text}"
assert "Feed Summary" in response.text
assert "This feed" in response.text
assert "Screenshot Delivery" in response.text
assert "Image Delivery" in response.text
assert 'type="range"' in response.text
assert 'max="10"' in response.text
response: Response = client.get(url="/") response: Response = client.get(url="/")
assert response.status_code == 200, f"/ failed: {response.text}" assert response.status_code == 200, f"/ failed: {response.text}"
@ -524,6 +530,8 @@ def test_c3kay_feed_delivery_mode_toggle_routes_update_stored_tags() -> None:
assert reader.get_tag(c3kay_feed_url, "delivery_mode") == "screenshot" assert reader.get_tag(c3kay_feed_url, "delivery_mode") == "screenshot"
assert reader.get_tag(c3kay_feed_url, "screenshot_layout") == "mobile" assert reader.get_tag(c3kay_feed_url, "screenshot_layout") == "mobile"
assert reader.get_tag(c3kay_feed_url, "should_send_embed") is False assert reader.get_tag(c3kay_feed_url, "should_send_embed") is False
assert "Disable screenshot delivery" in response.text
assert "Send embed instead of screenshot" not in response.text
response = client.post(url="/use_embed", data={"feed_url": c3kay_feed_url}) response = client.post(url="/use_embed", data={"feed_url": c3kay_feed_url})
assert response.status_code == 200, f"Failed to set embed mode: {response.text}" assert response.status_code == 200, f"Failed to set embed mode: {response.text}"
@ -561,7 +569,42 @@ def test_set_feed_save_sent_webhooks_route_updates_stored_tag() -> None:
) )
assert response.status_code == 303, f"/set_feed_save_sent_webhooks failed: {response.text}" assert response.status_code == 303, f"/set_feed_save_sent_webhooks failed: {response.text}"
assert stub_reader.tags[stub_reader.feed.url, feeds.SAVE_SENT_WEBHOOKS_TAG] is False assert stub_reader.tags[stub_reader.feed.url, "save_sent_webhooks"] is False
finally:
app.dependency_overrides = {}
def test_set_feed_media_gallery_image_limit_route_updates_stored_tag() -> None:
@dataclass(slots=True)
class DummyFeed:
url: str
title: str
class StubReader:
def __init__(self) -> None:
self.feed = DummyFeed(url="https://example.com/feed.xml", title="Example")
self.tags: dict[tuple[str, str], int] = {}
def get_feed(self, feed_url: str) -> DummyFeed:
assert feed_url == self.feed.url
return self.feed
def set_tag(self, resource: str, key: str, value: int) -> None:
self.tags[resource, key] = value
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
try:
with patch("discord_rss_bot.main.commit_state_change"):
response: Response = client.post(
url="/set_feed_media_gallery_image_limit",
data={"feed_url": stub_reader.feed.url, "image_limit": "7"},
follow_redirects=False,
)
assert response.status_code == 303, f"/set_feed_media_gallery_image_limit failed: {response.text}"
assert stub_reader.tags[stub_reader.feed.url, "media_gallery_image_limit"] == 7
finally: finally:
app.dependency_overrides = {} app.dependency_overrides = {}
@ -585,7 +628,7 @@ def test_sent_webhooks_view_shows_saved_records() -> None:
key: str, key: str,
default: feeds.JsonValue = None, default: feeds.JsonValue = None,
) -> feeds.JsonValue: ) -> feeds.JsonValue:
if resource == () and key == feeds.SENT_WEBHOOKS_TAG: if resource == () and key == "sent_webhooks":
return [ return [
{ {
"feed_url": sent_feed_url, "feed_url": sent_feed_url,