Use Discord webhook components to send 10 images

This commit is contained in:
Joakim Hellsén 2026-05-12 06:09:21 +02:00
commit a0c186559f
Signed by: Joakim Hellsén
SSH key fingerprint: SHA256:/9h/CsExpFp+PRhsfA0xznFx2CGfTT5R/kpuFfUgEQk
8 changed files with 939 additions and 79 deletions

View file

@ -1,7 +1,10 @@
{
"cSpell.words": [
"argnames",
"argvalues",
"autoexport",
"botuser",
"DISCORDTIMESTAMPPLACEHOLDER",
"domcontentloaded",
"Genshins",
"healthcheck",
@ -14,8 +17,10 @@
"networkidle",
"pipx",
"pyproject",
"Skulbladi",
"thead",
"thelovinator",
"ttvdrops",
"uvicorn"
],
"python.analysis.typeCheckingMode": "basic"

View file

@ -196,39 +196,52 @@ def replace_tags_in_text_message(entry: Entry, reader: Reader) -> str:
return custom_message.replace("\\n", "\n")
def get_first_image(summary: str | None, content: str | Sequence[Content] | None) -> str: # noqa: C901
"""Get image from summary or content.
def _extract_entry_text(data: str | list | tuple | Sequence[Content] | None) -> str | None:
"""Extract text from a reader summary/content value.
Returns:
Extracted text, or None when the input is empty.
"""
if not data:
return None
if isinstance(data, str):
return data
if isinstance(data, (list, tuple)):
extracted: list[str] = []
for item in data:
if hasattr(item, "value"):
extracted.append(item.value)
elif isinstance(item, dict) and "value" in item:
extracted.append(item.get("value", ""))
else:
extracted.append(str(item))
return "".join(extracted)
return str(data)
def get_image_urls(
summary: str | None,
content: str | Sequence[Content] | None,
*,
limit: int | None = None,
) -> list[str]:
"""Get valid image URLs from content, then summary.
Args:
summary: The summary from the entry (string, or tuple/list of objects)
content: The content from the entry (string, or tuple/list of objects)
limit: Optional maximum number of URLs to return.
Returns:
The first image
Valid, de-duplicated image URLs.
"""
image_urls: list[str] = []
seen_urls: set[str] = set()
def extract_string(data: str | list | tuple | Sequence[Content] | None) -> str | None:
if not data:
return None
if isinstance(data, str):
return data
if isinstance(data, (list, tuple)):
extracted: list[str] = []
for item in data:
if hasattr(item, "value"):
extracted.append(item.value)
elif isinstance(item, dict) and "value" in item:
extracted.append(item.get("value", ""))
else:
extracted.append(str(item))
return "".join(extracted)
return str(data)
# Convert potentially complex objects into strings
content_str: str | None = extract_string(content)
summary_str: str | None = extract_string(summary)
if content_str and (images := BeautifulSoup(content_str, features="lxml").find_all("img")):
def add_images_from_text(text: str | None) -> None:
if not text:
return
images = BeautifulSoup(text, features="lxml").find_all("img")
for image in images:
if not isinstance(image, Tag) or "src" not in image.attrs:
logger.error("Image is not a Tag or does not have a src attribute.")
@ -239,21 +252,29 @@ def get_first_image(summary: str | None, content: str | Sequence[Content] | None
logger.warning("Invalid URL: %s", src)
continue
return src
if summary_str and (images := BeautifulSoup(summary_str, features="lxml").find_all("img")):
for image in images:
if not isinstance(image, Tag) or "src" not in image.attrs:
logger.error("Image is not a Tag or does not have a src attribute.")
if src in seen_urls:
continue
if not is_url_valid(str(image.attrs["src"])):
logger.warning("Invalid URL: %s", image.attrs["src"])
continue
image_urls.append(src)
seen_urls.add(src)
if limit is not None and len(image_urls) >= limit:
return
return str(image.attrs["src"])
add_images_from_text(_extract_entry_text(content))
if limit is None or len(image_urls) < limit:
add_images_from_text(_extract_entry_text(summary))
return ""
return image_urls
def get_first_image(summary: str | None, content: str | Sequence[Content] | None) -> str:
"""Get the first image from summary or content.
Returns:
First valid image URL, or an empty string.
"""
image_urls: list[str] = get_image_urls(summary, content, limit=1)
return image_urls[0] if image_urls else ""
def replace_tags_in_embed(feed: Feed, entry: Entry, reader: Reader) -> CustomEmbed:

View file

@ -9,20 +9,21 @@ import logging
import os
import pprint
import re
import time
from collections.abc import Callable
from contextlib import suppress
from typing import TYPE_CHECKING
from typing import Any
from typing import Literal
from typing import Protocol
from typing import cast
from urllib.parse import ParseResult
from urllib.parse import parse_qs
from urllib.parse import urljoin
from urllib.parse import urlparse
import httpx
import tldextract
from discord_webhook import DiscordEmbed
from discord_webhook import DiscordWebhook
from fastapi import HTTPException
from markdownify import markdownify
from playwright.sync_api import Browser
@ -43,6 +44,7 @@ from requests import RequestException
from discord_rss_bot.custom_message import CustomEmbed
from discord_rss_bot.custom_message import get_custom_message
from discord_rss_bot.custom_message import get_image_urls
from discord_rss_bot.custom_message import replace_tags_in_embed
from discord_rss_bot.custom_message import replace_tags_in_text_message
from discord_rss_bot.filter.evaluator import get_entry_filter_decision_from_reader
@ -54,12 +56,14 @@ from discord_rss_bot.is_url_valid import is_url_valid
from discord_rss_bot.settings import default_custom_embed
from discord_rss_bot.settings import default_custom_message
from discord_rss_bot.settings import get_reader
from discord_rss_bot.webhook import DiscordEmbed
from discord_rss_bot.webhook import DiscordWebhook
from discord_rss_bot.webhook import WebhookFile
if TYPE_CHECKING:
from collections.abc import Iterable
from reader._types import EntryData
from requests import Response
logger: logging.Logger = logging.getLogger(__name__)
@ -100,15 +104,23 @@ class JsonResponseLike(Protocol):
MAX_DISCORD_UPLOAD_BYTES: int = 8 * 1024 * 1024
MAX_MEDIA_GALLERY_ITEMS: int = 10
MESSAGE_FLAG_IS_COMPONENTS_V2: int = 1 << 15
TTVDROPS_HOST: str = "ttvdrops.lovinator.space"
TTVDROPS_BASE_URL: str = f"https://{TTVDROPS_HOST}"
SENT_WEBHOOKS_TAG: str = "sent_webhooks"
SAVE_SENT_WEBHOOKS_TAG: str = "save_sent_webhooks"
MESSAGE_PAYLOAD_KEYS: tuple[str, ...] = (
"allowed_mentions",
"applied_tags",
"attachments",
"avatar_url",
"components",
"content",
"embeds",
"flags",
"poll",
"thread_name",
"tts",
"username",
)
@ -320,19 +332,19 @@ def save_sent_webhook_records(reader: Reader, records: list[SentWebhookRecord])
reader.set_tag((), SENT_WEBHOOKS_TAG, records) # pyright: ignore[reportArgumentType]
def get_webhook_message_payload(webhook: DiscordWebhook) -> JsonObject:
"""Return the Discord message payload used to compare saved messages.
def get_webhook_request_payload(webhook: DiscordWebhook) -> JsonObject:
"""Return the Discord message payload sent to Discord.
The discord-webhook object also includes client/runtime fields in `json`; only fields that affect the Discord
message itself are persisted. Empty `content`, `embeds`, and `attachments` are kept so message edits can clear
stale content when a feed changes delivery mode.
Runtime fields on the webhook object are intentionally excluded. Unlike
`get_webhook_message_payload`, this does not add empty defaults because
Components V2 messages reject otherwise-empty `content` and `embeds` fields.
Returns:
JsonObject: Normalized Discord message payload.
JsonObject: Discord request payload.
"""
raw_payload = cast("JsonValue", webhook.json)
if not isinstance(raw_payload, dict):
return {"content": "", "embeds": [], "attachments": []}
return {}
payload: JsonObject = {}
webhook_payload = cast("JsonObject", raw_payload)
@ -340,6 +352,19 @@ def get_webhook_message_payload(webhook: DiscordWebhook) -> JsonObject:
if key in webhook_payload:
payload[key] = webhook_payload[key]
return cast("JsonObject", json.loads(json.dumps(payload, default=str)))
def get_webhook_message_payload(webhook: DiscordWebhook) -> JsonObject:
"""Return the normalized Discord message payload used to compare saved messages.
Empty `content`, `embeds`, and `attachments` are kept here so message edits can clear stale content when a feed
changes delivery mode. Use `get_webhook_request_payload` for the payload sent to Discord.
Returns:
JsonObject: Normalized Discord message payload.
"""
payload: JsonObject = get_webhook_request_payload(webhook)
payload.setdefault("content", "")
payload.setdefault("embeds", [])
payload.setdefault("attachments", [])
@ -416,6 +441,13 @@ def get_webhook_message_edit_payload(payload: JsonObject, record: SentWebhookRec
if edit_payload.get("attachments") == [] and not previous_attachments:
edit_payload.pop("attachments", None)
if json_value_to_int(edit_payload.get("flags")) & MESSAGE_FLAG_IS_COMPONENTS_V2:
edit_payload.pop("content", None)
edit_payload.pop("embeds", None)
edit_payload.pop("poll", None)
if edit_payload.get("attachments") == []:
edit_payload.pop("attachments", None)
return edit_payload
@ -469,7 +501,8 @@ def get_discord_message_id_from_response(response_json: JsonObject, webhook: Dis
if isinstance(message_id, str) and message_id:
return message_id
webhook_id: str | None = webhook.id if isinstance(webhook.id, str) else None
raw_webhook_id = getattr(webhook, "id", None)
webhook_id: str | None = raw_webhook_id if isinstance(raw_webhook_id, str) else None
return webhook_id if isinstance(webhook_id, str) else ""
@ -570,36 +603,166 @@ def split_webhook_url_for_message_endpoint(webhook_url: str) -> tuple[str, str |
return clean_url, thread_id
def payload_has_components(payload: JsonObject) -> bool:
"""Return whether a Discord payload includes message components."""
components: JsonValue = payload.get("components")
return isinstance(components, list) and bool(components)
def get_webhook_query_params(
webhook_url: str,
payload: JsonObject,
*,
webhook: DiscordWebhook | None = None,
wait: bool = True,
) -> tuple[str, dict[str, str]]:
"""Return a clean webhook URL and query params for a Discord webhook request."""
clean_webhook_url, thread_id = split_webhook_url_for_message_endpoint(webhook_url)
webhook_thread_id = getattr(webhook, "thread_id", None) if webhook is not None else None
if isinstance(webhook_thread_id, str) and webhook_thread_id.strip():
thread_id = webhook_thread_id.strip()
params: dict[str, str] = {}
if wait:
params["wait"] = "true"
if thread_id:
params["thread_id"] = thread_id
if payload_has_components(payload):
params["with_components"] = "true"
return clean_webhook_url, params
def get_webhook_files(webhook: DiscordWebhook) -> list[WebhookFile]: # noqa: C901
"""Return files attached to a webhook object in a normalized shape."""
raw_files = getattr(webhook, "files", None)
files: list[WebhookFile] = []
if isinstance(raw_files, dict):
for filename, content in raw_files.items():
if isinstance(filename, str) and isinstance(content, bytes):
files.append(WebhookFile(filename=filename, content=content))
return files
if not isinstance(raw_files, list | tuple):
return []
for index, file_value in enumerate(raw_files):
if isinstance(file_value, WebhookFile):
files.append(file_value)
continue
if not isinstance(file_value, tuple) or len(file_value) < 2: # noqa: PLR2004
continue
first, second = file_value[0], file_value[1]
if isinstance(first, str) and isinstance(second, bytes):
files.append(WebhookFile(filename=first, content=second))
continue
if isinstance(second, tuple) and len(second) >= 2: # noqa: PLR2004
nested_file = cast("tuple[object, ...]", second)
nested_filename, nested_content = nested_file[0], nested_file[1]
if isinstance(nested_filename, str) and isinstance(nested_content, bytes):
files.append(WebhookFile(filename=nested_filename, content=nested_content))
continue
if isinstance(second, bytes):
files.append(WebhookFile(filename=f"file-{index}", content=second))
return files
def get_retry_after_seconds(response: httpx.Response) -> float | None:
"""Return Discord's retry delay for a rate-limited response when available."""
response_json: JsonObject = get_response_json(response)
retry_after: JsonValue = response_json.get("retry_after")
if isinstance(retry_after, int | float | str):
with suppress(TypeError, ValueError):
return float(retry_after)
retry_after_header: str | None = response.headers.get("retry-after")
if retry_after_header:
with suppress(TypeError, ValueError):
return float(retry_after_header)
return None
def request_discord_webhook(
method: str,
url: str,
*,
payload: JsonObject,
params: dict[str, str],
files: list[WebhookFile] | None,
timeout: float,
rate_limit_retry: bool,
) -> httpx.Response:
"""Send a Discord webhook request with optional multipart files.
Returns:
Discord API response.
"""
request_kwargs: dict[str, Any] = {"params": params, "timeout": timeout}
if files:
request_kwargs["data"] = {"payload_json": json.dumps(payload, default=str)}
request_kwargs["files"] = [
(f"files[{index}]", (file.filename, file.content)) for index, file in enumerate(files)
]
else:
request_kwargs["json"] = payload
response: httpx.Response = httpx.request(method, url, **request_kwargs)
if not rate_limit_retry or response.status_code != 429: # noqa: PLR2004
return response
retry_after: float | None = get_retry_after_seconds(response)
if retry_after is None:
return response
time.sleep(max(0.0, retry_after))
return httpx.request(method, url, **request_kwargs)
def send_webhook_message(webhook: DiscordWebhook, payload: JsonObject) -> httpx.Response:
"""Execute a Discord webhook message create request using httpx.
Returns:
Discord API response.
"""
clean_webhook_url, params = get_webhook_query_params(webhook.url, payload, webhook=webhook, wait=True)
return request_discord_webhook(
"POST",
clean_webhook_url,
payload=payload,
params=params,
files=get_webhook_files(webhook),
timeout=cast("int | float", getattr(webhook, "timeout", None) or 30.0),
rate_limit_retry=bool(getattr(webhook, "rate_limit_retry", False)),
)
def edit_sent_webhook_message(
webhook_url: str,
message_id: str,
webhook: DiscordWebhook,
payload: JsonObject,
) -> Response | httpx.Response:
) -> httpx.Response:
"""Edit an already-sent Discord webhook message.
Returns:
Response | httpx.Response: Discord API response.
httpx.Response: Discord API response.
"""
clean_webhook_url, thread_id = split_webhook_url_for_message_endpoint(webhook_url)
if getattr(webhook, "files", None):
webhook.url = clean_webhook_url
webhook.id = message_id
if thread_id:
webhook.thread_id = thread_id
return webhook.edit()
params: dict[str, str] = {"wait": "true"}
if thread_id:
params["thread_id"] = thread_id
timeout: int | float = cast("int | float", getattr(webhook, "timeout", None) or 30.0)
return httpx.patch(
clean_webhook_url, params = get_webhook_query_params(webhook_url, payload, webhook=webhook, wait=True)
return request_discord_webhook(
"PATCH",
f"{clean_webhook_url}/messages/{message_id}",
json=payload,
payload=payload,
params=params,
timeout=timeout,
files=get_webhook_files(webhook),
timeout=cast("int | float", getattr(webhook, "timeout", None) or 30.0),
rate_limit_retry=bool(getattr(webhook, "rate_limit_retry", False)),
)
@ -1163,6 +1326,256 @@ def set_title(custom_embed: CustomEmbed, discord_embed: DiscordEmbed) -> None:
discord_embed.set_title(embed_title) if embed_title else None
def add_unique_media_gallery_item(
media_items: list[JsonObject],
image_url: str,
*,
description: str,
limit: int = MAX_MEDIA_GALLERY_ITEMS,
) -> None:
"""Append a valid media gallery item while preserving order and uniqueness."""
clean_image_url: str = image_url.strip()
if (
len(media_items) >= limit
or not clean_image_url
or any(item.get("url") == clean_image_url for item in media_items)
):
return
if not is_url_valid(clean_image_url):
logger.warning("Invalid media gallery URL: %s", clean_image_url)
return
media_items.append({"url": clean_image_url, "description": description[:1024]})
def normalize_ttvdrops_media_url(image_url: str) -> str:
"""Return an absolute ttvdrops media URL."""
clean_image_url: str = image_url.strip()
if not clean_image_url:
return ""
return urljoin(TTVDROPS_BASE_URL, clean_image_url)
def get_ttvdrops_campaign_api_url(entry: Entry) -> str:
"""Return the ttvdrops campaign API URL for an entry when it can be inferred."""
candidate_urls: tuple[str | None, ...] = (
entry.link,
entry.id,
entry.feed.url,
)
for candidate_url in candidate_urls:
if not candidate_url:
continue
parsed_url = urlparse(str(candidate_url))
if parsed_url.netloc.lower() != TTVDROPS_HOST:
continue
if re.fullmatch(r"/twitch/api/v1/campaigns/[^/]+/?", parsed_url.path):
return parsed_url._replace(query="", fragment="").geturl()
campaign_match = re.fullmatch(r"/twitch/campaigns/([^/]+)/?", parsed_url.path)
if campaign_match:
campaign_id: str = campaign_match.group(1)
return parsed_url._replace(
path=f"/twitch/api/v1/campaigns/{campaign_id}/",
query="",
fragment="",
).geturl()
return ""
def get_ttvdrops_reward_description(drop: JsonObject, reward: JsonObject) -> str:
"""Return alt text for a ttvdrops reward image.
Returns:
Reward alt text suitable for a Media Gallery description.
"""
reward_name: str = str(reward.get("name") or drop.get("name") or "Reward")
required_minutes: int = json_value_to_int(drop.get("required_minutes_watched"))
required_subs: int = json_value_to_int(drop.get("required_subs"))
if required_minutes:
return f"{required_minutes} minutes watched: {reward_name}"
if required_subs:
return f"{required_subs} subscriptions: {reward_name}"
return reward_name
def extract_ttvdrops_media_gallery_items(value: JsonValue) -> list[JsonObject]: # noqa: C901
"""Extract benefit/reward media gallery items from a ttvdrops API response.
Returns:
Media Gallery items with absolute URLs and reward descriptions.
"""
media_items: list[JsonObject] = []
def add_reward_image(drop: JsonObject, reward: JsonObject) -> None:
image_url = reward.get("image_url")
if isinstance(image_url, str):
add_unique_media_gallery_item(
media_items,
normalize_ttvdrops_media_url(image_url),
description=get_ttvdrops_reward_description(drop, reward),
)
def collect_benefit_images(current_value: JsonValue) -> None:
if isinstance(current_value, dict):
for key, child_value in current_value.items():
if key in {"benefits", "rewards"} and isinstance(child_value, list):
for item in child_value:
if isinstance(item, dict):
add_reward_image(cast("JsonObject", current_value), cast("JsonObject", item))
collect_benefit_images(item)
continue
collect_benefit_images(child_value)
return
if isinstance(current_value, list):
for item in current_value:
collect_benefit_images(item)
collect_benefit_images(value)
return media_items
def fetch_ttvdrops_campaign_media_items(entry: Entry) -> list[JsonObject]:
"""Fetch extra campaign media gallery items for ttvdrops entries.
Returns:
Media Gallery items for ttvdrops rewards, or an empty list.
"""
api_url: str = get_ttvdrops_campaign_api_url(entry)
if not api_url:
return []
try:
response: httpx.Response = httpx.get(api_url, follow_redirects=True, timeout=10.0)
if response.status_code != 200: # noqa: PLR2004
logger.warning("Failed to fetch ttvdrops campaign data from %s: %s", api_url, response.text[:500])
return []
response_json = cast("JsonValue", response.json())
except (httpx.HTTPError, ValueError, TypeError):
logger.exception("Failed to fetch ttvdrops campaign data from %s", api_url)
return []
return extract_ttvdrops_media_gallery_items(response_json)
def get_entry_media_gallery_items(entry: Entry, custom_embed: CustomEmbed) -> list[JsonObject]:
"""Return items for a Discord Media Gallery component.
Returns:
Media Gallery items capped to Discord's item limit.
"""
media_items: list[JsonObject] = []
ttvdrops_media_items: list[JsonObject] = fetch_ttvdrops_campaign_media_items(entry)
if ttvdrops_media_items:
return ttvdrops_media_items[:MAX_MEDIA_GALLERY_ITEMS]
description: str = entry.title or entry.id
for image_url in get_image_urls(entry.summary, entry.content, limit=MAX_MEDIA_GALLERY_ITEMS):
add_unique_media_gallery_item(media_items, image_url, description=description)
add_unique_media_gallery_item(media_items, custom_embed.image_url, description=description)
add_unique_media_gallery_item(media_items, custom_embed.thumbnail_url, description=description)
return media_items[:MAX_MEDIA_GALLERY_ITEMS]
def truncate_component_text(content: str) -> str:
"""Trim a Text Display component to a conservative Discord-safe length.
Returns:
Original or truncated component text.
"""
max_text_display_length: int = 4000
if len(content) <= max_text_display_length:
return content
return f"{content[: max_text_display_length - 3]}..."
def get_component_text_display_content(custom_embed: CustomEmbed, entry: Entry) -> str:
"""Build markdown text for a Components V2 Text Display.
Returns:
Markdown content for a Text Display component.
"""
parts: list[str] = []
if custom_embed.title:
parts.append(f"# {custom_embed.title}")
if custom_embed.author_name and custom_embed.author_url:
parts.append(f"## [{custom_embed.author_name}]({custom_embed.author_url})")
elif custom_embed.author_name:
parts.append(f"## {custom_embed.author_name}")
elif custom_embed.author_url:
parts.append(f"<{custom_embed.author_url}>")
if custom_embed.description:
parts.append(custom_embed.description)
if custom_embed.footer_text:
parts.append(f"-# {custom_embed.footer_text}")
if not parts:
fallback_text: str = entry.title or entry.link or entry.id
if entry.link and fallback_text != entry.link:
fallback_text = f"[{fallback_text}]({entry.link})"
parts.append(fallback_text)
return truncate_component_text("\n\n".join(parts))
def create_media_gallery_component(media_items: list[JsonObject]) -> JsonObject:
"""Build a Discord Media Gallery component.
Returns:
Discord Media Gallery component payload.
"""
return {
"type": 12,
"items": [
{
"media": {"url": media_item["url"]},
"description": media_item["description"],
}
for media_item in media_items[:MAX_MEDIA_GALLERY_ITEMS]
if isinstance(media_item.get("url"), str) and isinstance(media_item.get("description"), str)
],
}
def create_components_v2_webhook(
webhook_url: str,
entry: Entry,
custom_embed: CustomEmbed,
media_items: list[JsonObject],
) -> DiscordWebhook:
"""Create a Components V2 webhook with text and a media gallery.
Returns:
Webhook payload configured for Components V2.
"""
components: list[JsonValue] = [
{
"type": 10,
"content": get_component_text_display_content(custom_embed, entry),
},
create_media_gallery_component(media_items),
]
return DiscordWebhook(
url=webhook_url,
flags=MESSAGE_FLAG_IS_COMPONENTS_V2,
components=components,
rate_limit_retry=True,
)
def create_embed_webhook( # noqa: C901
webhook_url: str,
entry: Entry,
@ -1183,6 +1596,9 @@ def create_embed_webhook( # noqa: C901
# Get the embed data from the database.
custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry, reader=reader)
media_gallery_items: list[JsonObject] = get_entry_media_gallery_items(entry, custom_embed)
if media_gallery_items:
return create_components_v2_webhook(webhook_url, entry, custom_embed, media_gallery_items)
discord_embed: DiscordEmbed = DiscordEmbed()
@ -1357,11 +1773,12 @@ def execute_webhook(
logger.warning("Feed not found in reader, not sending entry to Discord: %s", entry_feed.url)
return
request_payload: JsonObject = get_webhook_request_payload(webhook)
payload: JsonObject = get_webhook_message_payload(webhook)
response: Response = webhook.execute()
response: httpx.Response = send_webhook_message(webhook, request_payload)
logger.debug("Discord webhook response for entry %s: status=%s", entry.id, response.status_code)
if response.status_code not in {200, 204}:
msg: str = f"Error sending entry to Discord: {response.text}\n{pprint.pformat(webhook.json)}"
msg: str = f"Error sending entry to Discord: {response.text}\n{pprint.pformat(request_payload)}"
if entry:
msg += f"\n{entry}"

View file

@ -8,8 +8,9 @@ from typing import TYPE_CHECKING
from typing import cast
import requests
from discord_webhook import DiscordEmbed
from discord_webhook import DiscordWebhook
from discord_rss_bot.webhook import DiscordEmbed
from discord_rss_bot.webhook import DiscordWebhook
if TYPE_CHECKING:
from reader import Entry

198
discord_rss_bot/webhook.py Normal file
View file

@ -0,0 +1,198 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from typing import cast
type JsonValue = bool | int | float | str | list[JsonValue] | dict[str, JsonValue] | None
type JsonObject = dict[str, JsonValue]
@dataclass(frozen=True)
class WebhookFile:
"""A file uploaded with a Discord webhook request."""
filename: str
content: bytes
class DiscordEmbed:
"""Small Discord embed payload builder used by the webhook sender."""
def __init__(self) -> None: # noqa: D107
self._payload: JsonObject = {}
def to_dict(self) -> JsonObject:
"""Return the JSON payload for this embed."""
return cast("JsonObject", dict(self._payload))
def set_description(self, description: str) -> None:
self._payload["description"] = description
def set_title(self, title: str) -> None:
self._payload["title"] = title
def set_url(self, url: str) -> None:
self._payload["url"] = url
def set_color(self, color: int | str) -> None:
if isinstance(color, int):
self._payload["color"] = color
return
normalized_color: str = color.removeprefix("#")
self._payload["color"] = int(normalized_color, 16)
def set_author(self, *, name: str, url: str | None = None, icon_url: str | None = None) -> None:
author: JsonObject = {"name": name}
if url:
author["url"] = url
if icon_url:
author["icon_url"] = icon_url
self._payload["author"] = author
def set_thumbnail(self, *, url: str) -> None:
self._payload["thumbnail"] = {"url": url}
def set_image(self, *, url: str, **_ignored: Any) -> None: # noqa: ANN401
self._payload["image"] = {"url": url}
def set_footer(self, *, text: str, icon_url: str | None = None) -> None:
footer: JsonObject = {"text": text}
if icon_url:
footer["icon_url"] = icon_url
self._payload["footer"] = footer
def add_embed_field(self, *, name: str, value: str, inline: bool | None = None) -> None:
fields = self._payload.setdefault("fields", [])
if not isinstance(fields, list):
fields = []
self._payload["fields"] = fields
field: JsonObject = {"name": name, "value": value}
if inline is not None:
field["inline"] = inline
fields.append(field)
def set_timestamp(self, *, timestamp: str) -> None:
self._payload["timestamp"] = timestamp
class DiscordWebhook:
"""Discord webhook request data.
This intentionally mirrors the subset of `discord-webhook` used by the app
while leaving the actual HTTP transport to `httpx`.
"""
def __init__( # noqa: D107
self,
url: str,
*,
content: str | None = None,
username: str | None = None,
avatar_url: str | None = None,
tts: bool | None = None,
allowed_mentions: JsonObject | None = None,
flags: int | None = None,
components: list[JsonValue] | None = None,
thread_id: str | None = None,
timeout: float | None = None,
rate_limit_retry: bool = False,
**_ignored: Any, # noqa: ANN401
) -> None:
self.url: str = url
self.thread_id: str | None = thread_id
self.timeout: int | float = timeout or 30.0
self.rate_limit_retry: bool = rate_limit_retry
self.files: list[WebhookFile] = []
self._payload: JsonObject = {}
if content is not None:
self._payload["content"] = content
if username:
self._payload["username"] = username
if avatar_url:
self._payload["avatar_url"] = avatar_url
if tts is not None:
self._payload["tts"] = tts
if allowed_mentions is not None:
self._payload["allowed_mentions"] = allowed_mentions
if flags is not None:
self._payload["flags"] = flags
if components is not None:
self._payload["components"] = components
@property
def json(self) -> JsonObject:
return self._payload
@property
def content(self) -> str | None:
value = self._payload.get("content")
return value if isinstance(value, str) else None
@content.setter
def content(self, value: str | None) -> None:
if value is None:
self._payload.pop("content", None)
else:
self._payload["content"] = value
@property
def username(self) -> str | None:
value = self._payload.get("username")
return value if isinstance(value, str) else None
@username.setter
def username(self, value: str | None) -> None:
if value:
self._payload["username"] = value
else:
self._payload.pop("username", None)
@property
def avatar_url(self) -> str | None:
value = self._payload.get("avatar_url")
return value if isinstance(value, str) else None
@avatar_url.setter
def avatar_url(self, value: str | None) -> None:
if value:
self._payload["avatar_url"] = value
else:
self._payload.pop("avatar_url", None)
@property
def components(self) -> list[JsonValue]:
value = self._payload.get("components")
return cast("list[JsonValue]", value) if isinstance(value, list) else []
@components.setter
def components(self, value: list[JsonValue]) -> None:
self._payload["components"] = value
@property
def flags(self) -> int | None:
value = self._payload.get("flags")
return value if isinstance(value, int) else None
@flags.setter
def flags(self, value: int | None) -> None:
if value is None:
self._payload.pop("flags", None)
else:
self._payload["flags"] = value
def add_file(self, *, file: bytes, filename: str) -> None:
self.files.append(WebhookFile(filename=filename, content=file))
def add_embed(self, embed: DiscordEmbed) -> None:
embeds = self._payload.setdefault("embeds", [])
if not isinstance(embeds, list):
embeds = []
self._payload["embeds"] = embeds
embeds.append(embed.to_dict())
def remove_embeds(self) -> None:
self._payload.pop("embeds", None)

View file

@ -6,7 +6,6 @@ readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"apscheduler>=3.11.0",
"discord-webhook",
"fastapi",
"httpx",
"jinja2",

View file

@ -13,6 +13,7 @@ from discord_rss_bot.custom_message import get_custom_message
from discord_rss_bot.custom_message import get_embed
from discord_rss_bot.custom_message import get_embed_data
from discord_rss_bot.custom_message import get_first_image
from discord_rss_bot.custom_message import get_image_urls
from discord_rss_bot.custom_message import replace_tags_in_embed
from discord_rss_bot.custom_message import replace_tags_in_text_message
from discord_rss_bot.custom_message import save_embed
@ -203,6 +204,34 @@ def test_get_first_image_uses_summary_when_content_image_is_invalid() -> None:
assert image == "https://example.com/from-summary.jpg"
def test_get_image_urls_returns_all_valid_images_in_order_without_duplicates() -> None:
summary = (
'<p><img src="https://example.com/from-summary.jpg" /><img src="https://example.com/from-content-1.jpg" /></p>'
)
content = (
'<p><img src="https://example.com/from-content-1.jpg" />'
'<img src="javascript:alert(1)" />'
'<img src="https://example.com/from-content-2.jpg" /></p>'
)
images = get_image_urls(summary, content)
assert images == [
"https://example.com/from-content-1.jpg",
"https://example.com/from-content-2.jpg",
"https://example.com/from-summary.jpg",
]
def test_get_image_urls_respects_limit() -> None:
summary = '<img src="https://example.com/summary.jpg" />'
content = '<img src="https://example.com/one.jpg" /><img src="https://example.com/two.jpg" />'
images = get_image_urls(summary, content, limit=2)
assert images == ["https://example.com/one.jpg", "https://example.com/two.jpg"]
def test_get_first_image_returns_empty_when_images_have_no_src() -> None:
summary = "<p></p>"
content = '<p><img alt="missing source" /></p>'

View file

@ -39,6 +39,12 @@ from discord_rss_bot.feeds import should_send_embed_check
from discord_rss_bot.feeds import truncate_webhook_message
def get_test_webhook_components(webhook: feeds.DiscordWebhook) -> list[feeds.JsonValue]:
components = webhook.json.get("components")
assert isinstance(components, list)
return components
def test_send_to_discord() -> None:
"""Test sending to Discord."""
# Skip early if no webhook URL is configured to avoid a real network request.
@ -568,6 +574,131 @@ def test_create_screenshot_webhook_falls_back_to_text_on_failure(
)
@patch("discord_rss_bot.feeds.fetch_ttvdrops_campaign_media_items", return_value=[])
@patch("discord_rss_bot.feeds.replace_tags_in_embed")
def test_create_embed_webhook_uses_media_gallery_for_entry_images(
mock_replace_tags_in_embed: MagicMock,
mock_fetch_ttvdrops_campaign_media_items: MagicMock,
) -> None:
reader = MagicMock()
entry = MagicMock()
entry.id = "entry-1"
entry.title = "Entry title"
entry.link = "https://example.com/entry"
entry.summary = '<img src="https://example.com/summary.jpg" />'
entry.content = [
MagicMock(value='<img src="https://example.com/content-1.jpg" />'),
MagicMock(value='<img src="https://example.com/content-2.jpg" />'),
]
entry.feed.url = "https://example.com/feed.xml"
mock_replace_tags_in_embed.return_value = feeds.CustomEmbed(
description="Entry body",
author_name="Entry title",
author_url="https://example.com/entry",
)
webhook = feeds.create_embed_webhook("https://discord.com/api/webhooks/123/abc", entry, reader)
assert webhook.flags == feeds.MESSAGE_FLAG_IS_COMPONENTS_V2
components = get_test_webhook_components(webhook)
assert components[0] == {
"type": 10,
"content": "## [Entry title](https://example.com/entry)\n\nEntry body",
}
gallery = components[1]
assert isinstance(gallery, dict)
assert gallery["type"] == 12
mock_fetch_ttvdrops_campaign_media_items.assert_called_once_with(entry)
assert gallery["items"] == [
{"media": {"url": "https://example.com/content-1.jpg"}, "description": "Entry title"},
{"media": {"url": "https://example.com/content-2.jpg"}, "description": "Entry title"},
{"media": {"url": "https://example.com/summary.jpg"}, "description": "Entry title"},
]
@patch("discord_rss_bot.feeds.fetch_ttvdrops_campaign_media_items")
@patch("discord_rss_bot.feeds.replace_tags_in_embed")
def test_create_embed_webhook_prefers_ttvdrops_reward_images_and_alt_text(
mock_replace_tags_in_embed: MagicMock,
mock_fetch_ttvdrops_campaign_media_items: MagicMock,
) -> None:
reader = MagicMock()
entry = MagicMock()
entry.id = "entry-2"
entry.title = "Drop campaign"
entry.link = "https://ttvdrops.lovinator.space/twitch/campaigns/93ba35ae-5bfc-43fe-88ac-49a0aabb2fe2/"
entry.summary = '<img src="https://example.com/feed-image.jpg" />'
entry.content = []
entry.feed.url = "https://ttvdrops.lovinator.space/feed.xml"
mock_replace_tags_in_embed.return_value = feeds.CustomEmbed(description="Campaign body")
mock_fetch_ttvdrops_campaign_media_items.return_value = [
{
"url": "https://ttvdrops.lovinator.space/media/benefits/images/reward.png",
"description": "120 minutes watched: Skulbladi",
},
]
webhook = feeds.create_embed_webhook("https://discord.com/api/webhooks/123/abc", entry, reader)
gallery = get_test_webhook_components(webhook)[1]
assert isinstance(gallery, dict)
assert gallery["items"] == [
{
"media": {"url": "https://ttvdrops.lovinator.space/media/benefits/images/reward.png"},
"description": "120 minutes watched: Skulbladi",
},
]
def test_get_ttvdrops_campaign_api_url_from_campaign_page() -> None:
entry = MagicMock()
entry.link = "https://ttvdrops.lovinator.space/twitch/campaigns/93ba35ae-5bfc-43fe-88ac-49a0aabb2fe2/"
entry.id = "entry-3"
entry.feed.url = "https://example.com/feed.xml"
api_url = feeds.get_ttvdrops_campaign_api_url(entry)
assert api_url == "https://ttvdrops.lovinator.space/twitch/api/v1/campaigns/93ba35ae-5bfc-43fe-88ac-49a0aabb2fe2/"
@patch("discord_rss_bot.feeds.httpx.get")
def test_fetch_ttvdrops_campaign_media_items_extracts_reward_alt_text(mock_get: MagicMock) -> None:
response = MagicMock()
response.status_code = 200
response.json.return_value = {
"image_url": "/media/campaigns/images/campaign.png",
"drops": [
{
"name": "Drop",
"required_minutes_watched": 120,
"benefits": [
{"name": "Skulbladi", "image_url": "/media/benefits/images/reward.png"},
{"image_url": "javascript:alert(1)"},
],
},
],
}
mock_get.return_value = response
entry = MagicMock()
entry.link = "https://ttvdrops.lovinator.space/twitch/campaigns/93ba35ae-5bfc-43fe-88ac-49a0aabb2fe2/"
entry.id = "entry-4"
entry.feed.url = "https://example.com/feed.xml"
media_items = feeds.fetch_ttvdrops_campaign_media_items(entry)
assert media_items == [
{
"url": "https://ttvdrops.lovinator.space/media/benefits/images/reward.png",
"description": "120 minutes watched: Skulbladi",
},
]
mock_get.assert_called_once_with(
"https://ttvdrops.lovinator.space/twitch/api/v1/campaigns/93ba35ae-5bfc-43fe-88ac-49a0aabb2fe2/",
follow_redirects=True,
timeout=10.0,
)
def test_capture_full_page_screenshot_uses_thread_when_loop_running() -> None:
"""Capture should offload sync Playwright work when called from an active event loop."""
with patch("discord_rss_bot.feeds._capture_full_page_screenshot_sync", return_value=b"png") as mock_capture_sync:
@ -871,10 +1002,14 @@ def test_execute_webhook_skips_when_feed_missing() -> None:
@patch.object(feeds, "logger")
def test_execute_webhook_logs_error_on_bad_status(mock_logger: MagicMock) -> None:
@patch("discord_rss_bot.feeds.send_webhook_message")
def test_execute_webhook_logs_error_on_bad_status(
mock_send_webhook_message: MagicMock,
mock_logger: MagicMock,
) -> None:
webhook = MagicMock()
webhook.json = {"content": "test"}
webhook.execute.return_value = MagicMock(status_code=500, text="fail")
mock_send_webhook_message.return_value = MagicMock(status_code=500, text="fail")
reader = MagicMock()
entry = MagicMock()
entry.id = "entry-8"
@ -887,9 +1022,13 @@ def test_execute_webhook_logs_error_on_bad_status(mock_logger: MagicMock) -> Non
@patch.object(feeds, "logger")
def test_execute_webhook_logs_info_on_success(mock_logger: MagicMock) -> None:
@patch("discord_rss_bot.feeds.send_webhook_message")
def test_execute_webhook_logs_info_on_success(
mock_send_webhook_message: MagicMock,
mock_logger: MagicMock,
) -> None:
webhook = MagicMock()
webhook.execute.return_value = MagicMock(status_code=204, text="")
mock_send_webhook_message.return_value = MagicMock(status_code=204, text="")
reader = MagicMock()
entry = MagicMock()
entry.id = "entry-9"
@ -901,7 +1040,8 @@ def test_execute_webhook_logs_info_on_success(mock_logger: MagicMock) -> None:
mock_logger.info.assert_called_once_with("Sent entry to Discord: %s", "entry-9")
def test_execute_webhook_records_sent_webhook_message() -> None:
@patch("discord_rss_bot.feeds.send_webhook_message")
def test_execute_webhook_records_sent_webhook_message(mock_send_webhook_message: MagicMock) -> None:
webhook_url = "https://discord.com/api/webhooks/123/abc"
state: dict[str, feeds.JsonValue] = {}
@ -939,7 +1079,7 @@ def test_execute_webhook_records_sent_webhook_message() -> None:
response.status_code = 200
response.text = '{"id": "message-1"}'
response.json.return_value = {"id": "message-1"}
webhook.execute.return_value = response
mock_send_webhook_message.return_value = response
execute_webhook(webhook, entry, reader)
@ -959,7 +1099,8 @@ def test_execute_webhook_records_sent_webhook_message() -> None:
assert records[0]["payload"]["content"] == "Entry title"
def test_execute_webhook_does_not_record_when_feed_tracking_disabled() -> None:
@patch("discord_rss_bot.feeds.send_webhook_message")
def test_execute_webhook_does_not_record_when_feed_tracking_disabled(mock_send_webhook_message: MagicMock) -> None:
webhook_url = "https://discord.com/api/webhooks/123/abc"
reader = MagicMock()
reader.get_tag.side_effect = lambda _resource, key, default=None: {
@ -979,13 +1120,62 @@ def test_execute_webhook_does_not_record_when_feed_tracking_disabled() -> None:
response.status_code = 200
response.text = '{"id": "message-2"}'
response.json.return_value = {"id": "message-2"}
webhook.execute.return_value = response
mock_send_webhook_message.return_value = response
execute_webhook(webhook, entry, reader)
reader.set_tag.assert_not_called()
@patch("discord_rss_bot.feeds.httpx.request")
def test_send_webhook_message_posts_components_with_httpx(mock_request: MagicMock) -> None:
response = MagicMock(status_code=200, text='{"id": "message-1"}')
mock_request.return_value = response
components: list[feeds.JsonValue] = [
{
"type": 10,
"content": "# Component update",
},
]
webhook = feeds.DiscordWebhook(
url="https://discord.com/api/webhooks/123/abc?thread_id=456",
flags=feeds.MESSAGE_FLAG_IS_COMPONENTS_V2,
components=components,
)
result = feeds.send_webhook_message(webhook, feeds.get_webhook_request_payload(webhook))
assert result is response
mock_request.assert_called_once()
assert mock_request.call_args.args == ("POST", "https://discord.com/api/webhooks/123/abc")
assert mock_request.call_args.kwargs["json"] == {
"components": components,
"flags": feeds.MESSAGE_FLAG_IS_COMPONENTS_V2,
}
assert mock_request.call_args.kwargs["params"] == {
"thread_id": "456",
"wait": "true",
"with_components": "true",
}
@patch("discord_rss_bot.feeds.httpx.request")
def test_send_webhook_message_uploads_files_as_multipart(mock_request: MagicMock) -> None:
response = MagicMock(status_code=200, text='{"id": "message-2"}')
mock_request.return_value = response
webhook = feeds.DiscordWebhook(url="https://discord.com/api/webhooks/123/abc", content="Entry link")
webhook.add_file(file=b"image-bytes", filename="entry.png")
result = feeds.send_webhook_message(webhook, feeds.get_webhook_request_payload(webhook))
assert result is response
mock_request.assert_called_once()
assert mock_request.call_args.args == ("POST", "https://discord.com/api/webhooks/123/abc")
assert mock_request.call_args.kwargs["data"] == {"payload_json": '{"content": "Entry link"}'}
assert mock_request.call_args.kwargs["files"] == [("files[0]", ("entry.png", b"image-bytes"))]
assert "json" not in mock_request.call_args.kwargs
@patch("discord_rss_bot.feeds.edit_sent_webhook_message")
@patch("discord_rss_bot.feeds.create_webhook_for_entry")
def test_update_sent_webhooks_for_modified_entries_edits_changed_payload(