Compare commits
2 commits
1db98ae161
...
3346994763
| Author | SHA1 | Date | |
|---|---|---|---|
|
3346994763 |
|||
|
a0c186559f |
13 changed files with 1417 additions and 135 deletions
11
.vscode/settings.json
vendored
11
.vscode/settings.json
vendored
|
|
@ -1,22 +1,31 @@
|
||||||
{
|
{
|
||||||
"cSpell.words": [
|
"cSpell.words": [
|
||||||
|
"argnames",
|
||||||
|
"argvalues",
|
||||||
"autoexport",
|
"autoexport",
|
||||||
|
"autoplay",
|
||||||
"botuser",
|
"botuser",
|
||||||
|
"DISCORDTIMESTAMPPLACEHOLDER",
|
||||||
"domcontentloaded",
|
"domcontentloaded",
|
||||||
"Genshins",
|
"Genshins",
|
||||||
"healthcheck",
|
"healthcheck",
|
||||||
"Hoyolab",
|
"Hoyolab",
|
||||||
|
"HTMX",
|
||||||
"KHTML",
|
"KHTML",
|
||||||
"levelname",
|
"levelname",
|
||||||
"Lovinator",
|
"Lovinator",
|
||||||
"markdownified",
|
"markdownified",
|
||||||
"markdownify",
|
"markdownify",
|
||||||
"networkidle",
|
"networkidle",
|
||||||
|
"overwritable",
|
||||||
"pipx",
|
"pipx",
|
||||||
"pyproject",
|
"pyproject",
|
||||||
|
"Skulbladi",
|
||||||
"thead",
|
"thead",
|
||||||
"thelovinator",
|
"thelovinator",
|
||||||
"uvicorn"
|
"ttvdrops",
|
||||||
|
"uvicorn",
|
||||||
|
"youtu"
|
||||||
],
|
],
|
||||||
"python.analysis.typeCheckingMode": "basic"
|
"python.analysis.typeCheckingMode": "basic"
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -196,39 +196,52 @@ def replace_tags_in_text_message(entry: Entry, reader: Reader) -> str:
|
||||||
return custom_message.replace("\\n", "\n")
|
return custom_message.replace("\\n", "\n")
|
||||||
|
|
||||||
|
|
||||||
def get_first_image(summary: str | None, content: str | Sequence[Content] | None) -> str: # noqa: C901
|
def _extract_entry_text(data: str | list | tuple | Sequence[Content] | None) -> str | None:
|
||||||
"""Get image from summary or content.
|
"""Extract text from a reader summary/content value.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Extracted text, or None when the input is empty.
|
||||||
|
"""
|
||||||
|
if not data:
|
||||||
|
return None
|
||||||
|
if isinstance(data, str):
|
||||||
|
return data
|
||||||
|
if isinstance(data, (list, tuple)):
|
||||||
|
extracted: list[str] = []
|
||||||
|
for item in data:
|
||||||
|
if hasattr(item, "value"):
|
||||||
|
extracted.append(item.value)
|
||||||
|
elif isinstance(item, dict) and "value" in item:
|
||||||
|
extracted.append(item.get("value", ""))
|
||||||
|
else:
|
||||||
|
extracted.append(str(item))
|
||||||
|
return "".join(extracted)
|
||||||
|
return str(data)
|
||||||
|
|
||||||
|
|
||||||
|
def get_image_urls(
|
||||||
|
summary: str | None,
|
||||||
|
content: str | Sequence[Content] | None,
|
||||||
|
*,
|
||||||
|
limit: int | None = None,
|
||||||
|
) -> list[str]:
|
||||||
|
"""Get valid image URLs from content, then summary.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
summary: The summary from the entry (string, or tuple/list of objects)
|
summary: The summary from the entry (string, or tuple/list of objects)
|
||||||
content: The content from the entry (string, or tuple/list of objects)
|
content: The content from the entry (string, or tuple/list of objects)
|
||||||
|
limit: Optional maximum number of URLs to return.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The first image
|
Valid, de-duplicated image URLs.
|
||||||
"""
|
"""
|
||||||
|
image_urls: list[str] = []
|
||||||
|
seen_urls: set[str] = set()
|
||||||
|
|
||||||
def extract_string(data: str | list | tuple | Sequence[Content] | None) -> str | None:
|
def add_images_from_text(text: str | None) -> None:
|
||||||
if not data:
|
if not text:
|
||||||
return None
|
return
|
||||||
if isinstance(data, str):
|
images = BeautifulSoup(text, features="lxml").find_all("img")
|
||||||
return data
|
|
||||||
if isinstance(data, (list, tuple)):
|
|
||||||
extracted: list[str] = []
|
|
||||||
for item in data:
|
|
||||||
if hasattr(item, "value"):
|
|
||||||
extracted.append(item.value)
|
|
||||||
elif isinstance(item, dict) and "value" in item:
|
|
||||||
extracted.append(item.get("value", ""))
|
|
||||||
else:
|
|
||||||
extracted.append(str(item))
|
|
||||||
return "".join(extracted)
|
|
||||||
return str(data)
|
|
||||||
|
|
||||||
# Convert potentially complex objects into strings
|
|
||||||
content_str: str | None = extract_string(content)
|
|
||||||
summary_str: str | None = extract_string(summary)
|
|
||||||
|
|
||||||
if content_str and (images := BeautifulSoup(content_str, features="lxml").find_all("img")):
|
|
||||||
for image in images:
|
for image in images:
|
||||||
if not isinstance(image, Tag) or "src" not in image.attrs:
|
if not isinstance(image, Tag) or "src" not in image.attrs:
|
||||||
logger.error("Image is not a Tag or does not have a src attribute.")
|
logger.error("Image is not a Tag or does not have a src attribute.")
|
||||||
|
|
@ -239,21 +252,29 @@ def get_first_image(summary: str | None, content: str | Sequence[Content] | None
|
||||||
logger.warning("Invalid URL: %s", src)
|
logger.warning("Invalid URL: %s", src)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
return src
|
if src in seen_urls:
|
||||||
|
|
||||||
if summary_str and (images := BeautifulSoup(summary_str, features="lxml").find_all("img")):
|
|
||||||
for image in images:
|
|
||||||
if not isinstance(image, Tag) or "src" not in image.attrs:
|
|
||||||
logger.error("Image is not a Tag or does not have a src attribute.")
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if not is_url_valid(str(image.attrs["src"])):
|
image_urls.append(src)
|
||||||
logger.warning("Invalid URL: %s", image.attrs["src"])
|
seen_urls.add(src)
|
||||||
continue
|
if limit is not None and len(image_urls) >= limit:
|
||||||
|
return
|
||||||
|
|
||||||
return str(image.attrs["src"])
|
add_images_from_text(_extract_entry_text(content))
|
||||||
|
if limit is None or len(image_urls) < limit:
|
||||||
|
add_images_from_text(_extract_entry_text(summary))
|
||||||
|
|
||||||
return ""
|
return image_urls
|
||||||
|
|
||||||
|
|
||||||
|
def get_first_image(summary: str | None, content: str | Sequence[Content] | None) -> str:
|
||||||
|
"""Get the first image from summary or content.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
First valid image URL, or an empty string.
|
||||||
|
"""
|
||||||
|
image_urls: list[str] = get_image_urls(summary, content, limit=1)
|
||||||
|
return image_urls[0] if image_urls else ""
|
||||||
|
|
||||||
|
|
||||||
def replace_tags_in_embed(feed: Feed, entry: Entry, reader: Reader) -> CustomEmbed:
|
def replace_tags_in_embed(feed: Feed, entry: Entry, reader: Reader) -> CustomEmbed:
|
||||||
|
|
|
||||||
|
|
@ -9,20 +9,21 @@ import logging
|
||||||
import os
|
import os
|
||||||
import pprint
|
import pprint
|
||||||
import re
|
import re
|
||||||
|
import time
|
||||||
from collections.abc import Callable
|
from collections.abc import Callable
|
||||||
from contextlib import suppress
|
from contextlib import suppress
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
|
from typing import Any
|
||||||
from typing import Literal
|
from typing import Literal
|
||||||
from typing import Protocol
|
from typing import Protocol
|
||||||
from typing import cast
|
from typing import cast
|
||||||
from urllib.parse import ParseResult
|
from urllib.parse import ParseResult
|
||||||
from urllib.parse import parse_qs
|
from urllib.parse import parse_qs
|
||||||
|
from urllib.parse import urljoin
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
import tldextract
|
import tldextract
|
||||||
from discord_webhook import DiscordEmbed
|
|
||||||
from discord_webhook import DiscordWebhook
|
|
||||||
from fastapi import HTTPException
|
from fastapi import HTTPException
|
||||||
from markdownify import markdownify
|
from markdownify import markdownify
|
||||||
from playwright.sync_api import Browser
|
from playwright.sync_api import Browser
|
||||||
|
|
@ -43,6 +44,7 @@ from requests import RequestException
|
||||||
|
|
||||||
from discord_rss_bot.custom_message import CustomEmbed
|
from discord_rss_bot.custom_message import CustomEmbed
|
||||||
from discord_rss_bot.custom_message import get_custom_message
|
from discord_rss_bot.custom_message import get_custom_message
|
||||||
|
from discord_rss_bot.custom_message import get_image_urls
|
||||||
from discord_rss_bot.custom_message import replace_tags_in_embed
|
from discord_rss_bot.custom_message import replace_tags_in_embed
|
||||||
from discord_rss_bot.custom_message import replace_tags_in_text_message
|
from discord_rss_bot.custom_message import replace_tags_in_text_message
|
||||||
from discord_rss_bot.filter.evaluator import get_entry_filter_decision_from_reader
|
from discord_rss_bot.filter.evaluator import get_entry_filter_decision_from_reader
|
||||||
|
|
@ -54,12 +56,15 @@ from discord_rss_bot.is_url_valid import is_url_valid
|
||||||
from discord_rss_bot.settings import default_custom_embed
|
from discord_rss_bot.settings import default_custom_embed
|
||||||
from discord_rss_bot.settings import default_custom_message
|
from discord_rss_bot.settings import default_custom_message
|
||||||
from discord_rss_bot.settings import get_reader
|
from discord_rss_bot.settings import get_reader
|
||||||
|
from discord_rss_bot.webhook import DiscordEmbed
|
||||||
|
from discord_rss_bot.webhook import DiscordWebhook
|
||||||
|
from discord_rss_bot.webhook import WebhookFile
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from collections.abc import Iterable
|
from collections.abc import Iterable
|
||||||
|
|
||||||
from reader._types import EntryData
|
from reader._types import EntryData
|
||||||
from requests import Response
|
from reader.types import JSONType
|
||||||
|
|
||||||
logger: logging.Logger = logging.getLogger(__name__)
|
logger: logging.Logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
@ -99,16 +104,17 @@ class JsonResponseLike(Protocol):
|
||||||
...
|
...
|
||||||
|
|
||||||
|
|
||||||
MAX_DISCORD_UPLOAD_BYTES: int = 8 * 1024 * 1024
|
|
||||||
SENT_WEBHOOKS_TAG: str = "sent_webhooks"
|
|
||||||
SAVE_SENT_WEBHOOKS_TAG: str = "save_sent_webhooks"
|
|
||||||
MESSAGE_PAYLOAD_KEYS: tuple[str, ...] = (
|
MESSAGE_PAYLOAD_KEYS: tuple[str, ...] = (
|
||||||
"allowed_mentions",
|
"allowed_mentions",
|
||||||
|
"applied_tags",
|
||||||
"attachments",
|
"attachments",
|
||||||
"avatar_url",
|
"avatar_url",
|
||||||
|
"components",
|
||||||
"content",
|
"content",
|
||||||
"embeds",
|
"embeds",
|
||||||
"flags",
|
"flags",
|
||||||
|
"poll",
|
||||||
|
"thread_name",
|
||||||
"tts",
|
"tts",
|
||||||
"username",
|
"username",
|
||||||
)
|
)
|
||||||
|
|
@ -278,6 +284,41 @@ def get_screenshot_layout(reader: Reader, feed: Feed) -> ScreenshotLayout:
|
||||||
return "desktop"
|
return "desktop"
|
||||||
|
|
||||||
|
|
||||||
|
def coerce_media_gallery_image_limit(value: JsonValue) -> int: # noqa: PLR0911
|
||||||
|
"""Return the supported media gallery image limit for a stored tag value."""
|
||||||
|
if isinstance(value, bool):
|
||||||
|
return 1
|
||||||
|
if isinstance(value, int):
|
||||||
|
return min(max(value, 0), 10)
|
||||||
|
if isinstance(value, str) and value.strip().lower() in {"1", "first", "first_image", "first-only"}:
|
||||||
|
return 1
|
||||||
|
if isinstance(value, str) and value.strip().lower() in {"0", "none", "no_images", "off", "disabled"}:
|
||||||
|
return 0
|
||||||
|
if isinstance(value, str):
|
||||||
|
try:
|
||||||
|
parsed_value: int = int(value.strip())
|
||||||
|
except ValueError:
|
||||||
|
return 1
|
||||||
|
return min(max(parsed_value, 0), 10)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
def get_feed_media_gallery_image_limit(reader: Reader, feed: Feed | str) -> int:
|
||||||
|
"""Resolve how many feed images should be sent in Discord media galleries.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The configured image limit, normalized to a supported Discord gallery size.
|
||||||
|
"""
|
||||||
|
feed_url: str = str(getattr(feed, "url", feed))
|
||||||
|
try:
|
||||||
|
value = cast("JsonValue", reader.get_tag(feed, "media_gallery_image_limit", 1))
|
||||||
|
except ReaderError:
|
||||||
|
logger.exception("Error getting %s tag for feed: %s", "media_gallery_image_limit", feed_url)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
return coerce_media_gallery_image_limit(value)
|
||||||
|
|
||||||
|
|
||||||
def feed_saves_sent_webhooks(reader: Reader, feed: Feed | str) -> bool:
|
def feed_saves_sent_webhooks(reader: Reader, feed: Feed | str) -> bool:
|
||||||
"""Return whether sent Discord webhook messages should be stored for a feed.
|
"""Return whether sent Discord webhook messages should be stored for a feed.
|
||||||
|
|
||||||
|
|
@ -285,9 +326,9 @@ def feed_saves_sent_webhooks(reader: Reader, feed: Feed | str) -> bool:
|
||||||
"""
|
"""
|
||||||
feed_url: str = feed.url if isinstance(feed, Feed) else str(feed)
|
feed_url: str = feed.url if isinstance(feed, Feed) else str(feed)
|
||||||
try:
|
try:
|
||||||
value = cast("JsonValue", reader.get_tag(feed, SAVE_SENT_WEBHOOKS_TAG, True))
|
value = cast("JsonValue", reader.get_tag(feed, "save_sent_webhooks", True))
|
||||||
except ReaderError:
|
except ReaderError:
|
||||||
logger.exception("Error getting %s tag for feed: %s", SAVE_SENT_WEBHOOKS_TAG, feed_url)
|
logger.exception("Error getting %s tag for feed: %s", "save_sent_webhooks", feed_url)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
if isinstance(value, bool):
|
if isinstance(value, bool):
|
||||||
|
|
@ -305,7 +346,7 @@ def get_sent_webhook_records(reader: Reader) -> list[SentWebhookRecord]:
|
||||||
Returns:
|
Returns:
|
||||||
list[SentWebhookRecord]: Saved sent webhook records.
|
list[SentWebhookRecord]: Saved sent webhook records.
|
||||||
"""
|
"""
|
||||||
raw_records = cast("JsonValue", reader.get_tag((), SENT_WEBHOOKS_TAG, []))
|
raw_records = cast("JsonValue", reader.get_tag((), "sent_webhooks", []))
|
||||||
if not isinstance(raw_records, list):
|
if not isinstance(raw_records, list):
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
|
@ -317,22 +358,22 @@ def get_sent_webhook_records(reader: Reader) -> list[SentWebhookRecord]:
|
||||||
|
|
||||||
def save_sent_webhook_records(reader: Reader, records: list[SentWebhookRecord]) -> None:
|
def save_sent_webhook_records(reader: Reader, records: list[SentWebhookRecord]) -> None:
|
||||||
"""Save sent webhook records to the global reader tag."""
|
"""Save sent webhook records to the global reader tag."""
|
||||||
reader.set_tag((), SENT_WEBHOOKS_TAG, records) # pyright: ignore[reportArgumentType]
|
reader.set_tag((), "sent_webhooks", records) # pyright: ignore[reportArgumentType]
|
||||||
|
|
||||||
|
|
||||||
def get_webhook_message_payload(webhook: DiscordWebhook) -> JsonObject:
|
def get_webhook_request_payload(webhook: DiscordWebhook) -> JsonObject:
|
||||||
"""Return the Discord message payload used to compare saved messages.
|
"""Return the Discord message payload sent to Discord.
|
||||||
|
|
||||||
The discord-webhook object also includes client/runtime fields in `json`; only fields that affect the Discord
|
Runtime fields on the webhook object are intentionally excluded. Unlike
|
||||||
message itself are persisted. Empty `content`, `embeds`, and `attachments` are kept so message edits can clear
|
`get_webhook_message_payload`, this does not add empty defaults because
|
||||||
stale content when a feed changes delivery mode.
|
Components V2 messages reject otherwise-empty `content` and `embeds` fields.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
JsonObject: Normalized Discord message payload.
|
JsonObject: Discord request payload.
|
||||||
"""
|
"""
|
||||||
raw_payload = cast("JsonValue", webhook.json)
|
raw_payload = cast("JsonValue", webhook.json)
|
||||||
if not isinstance(raw_payload, dict):
|
if not isinstance(raw_payload, dict):
|
||||||
return {"content": "", "embeds": [], "attachments": []}
|
return {}
|
||||||
|
|
||||||
payload: JsonObject = {}
|
payload: JsonObject = {}
|
||||||
webhook_payload = cast("JsonObject", raw_payload)
|
webhook_payload = cast("JsonObject", raw_payload)
|
||||||
|
|
@ -340,6 +381,19 @@ def get_webhook_message_payload(webhook: DiscordWebhook) -> JsonObject:
|
||||||
if key in webhook_payload:
|
if key in webhook_payload:
|
||||||
payload[key] = webhook_payload[key]
|
payload[key] = webhook_payload[key]
|
||||||
|
|
||||||
|
return cast("JsonObject", json.loads(json.dumps(payload, default=str)))
|
||||||
|
|
||||||
|
|
||||||
|
def get_webhook_message_payload(webhook: DiscordWebhook) -> JsonObject:
|
||||||
|
"""Return the normalized Discord message payload used to compare saved messages.
|
||||||
|
|
||||||
|
Empty `content`, `embeds`, and `attachments` are kept here so message edits can clear stale content when a feed
|
||||||
|
changes delivery mode. Use `get_webhook_request_payload` for the payload sent to Discord.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
JsonObject: Normalized Discord message payload.
|
||||||
|
"""
|
||||||
|
payload: JsonObject = get_webhook_request_payload(webhook)
|
||||||
payload.setdefault("content", "")
|
payload.setdefault("content", "")
|
||||||
payload.setdefault("embeds", [])
|
payload.setdefault("embeds", [])
|
||||||
payload.setdefault("attachments", [])
|
payload.setdefault("attachments", [])
|
||||||
|
|
@ -416,6 +470,13 @@ def get_webhook_message_edit_payload(payload: JsonObject, record: SentWebhookRec
|
||||||
if edit_payload.get("attachments") == [] and not previous_attachments:
|
if edit_payload.get("attachments") == [] and not previous_attachments:
|
||||||
edit_payload.pop("attachments", None)
|
edit_payload.pop("attachments", None)
|
||||||
|
|
||||||
|
if json_value_to_int(edit_payload.get("flags")) & 1 << 15:
|
||||||
|
edit_payload.pop("content", None)
|
||||||
|
edit_payload.pop("embeds", None)
|
||||||
|
edit_payload.pop("poll", None)
|
||||||
|
if edit_payload.get("attachments") == []:
|
||||||
|
edit_payload.pop("attachments", None)
|
||||||
|
|
||||||
return edit_payload
|
return edit_payload
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -469,7 +530,8 @@ def get_discord_message_id_from_response(response_json: JsonObject, webhook: Dis
|
||||||
if isinstance(message_id, str) and message_id:
|
if isinstance(message_id, str) and message_id:
|
||||||
return message_id
|
return message_id
|
||||||
|
|
||||||
webhook_id: str | None = webhook.id if isinstance(webhook.id, str) else None
|
raw_webhook_id = getattr(webhook, "id", None)
|
||||||
|
webhook_id: str | None = raw_webhook_id if isinstance(raw_webhook_id, str) else None
|
||||||
return webhook_id if isinstance(webhook_id, str) else ""
|
return webhook_id if isinstance(webhook_id, str) else ""
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -570,36 +632,166 @@ def split_webhook_url_for_message_endpoint(webhook_url: str) -> tuple[str, str |
|
||||||
return clean_url, thread_id
|
return clean_url, thread_id
|
||||||
|
|
||||||
|
|
||||||
|
def payload_has_components(payload: JsonObject) -> bool:
|
||||||
|
"""Return whether a Discord payload includes message components."""
|
||||||
|
components: JsonValue = payload.get("components")
|
||||||
|
return isinstance(components, list) and bool(components)
|
||||||
|
|
||||||
|
|
||||||
|
def get_webhook_query_params(
|
||||||
|
webhook_url: str,
|
||||||
|
payload: JsonObject,
|
||||||
|
*,
|
||||||
|
webhook: DiscordWebhook | None = None,
|
||||||
|
wait: bool = True,
|
||||||
|
) -> tuple[str, dict[str, str]]:
|
||||||
|
"""Return a clean webhook URL and query params for a Discord webhook request."""
|
||||||
|
clean_webhook_url, thread_id = split_webhook_url_for_message_endpoint(webhook_url)
|
||||||
|
webhook_thread_id = getattr(webhook, "thread_id", None) if webhook is not None else None
|
||||||
|
if isinstance(webhook_thread_id, str) and webhook_thread_id.strip():
|
||||||
|
thread_id = webhook_thread_id.strip()
|
||||||
|
|
||||||
|
params: dict[str, str] = {}
|
||||||
|
if wait:
|
||||||
|
params["wait"] = "true"
|
||||||
|
if thread_id:
|
||||||
|
params["thread_id"] = thread_id
|
||||||
|
if payload_has_components(payload):
|
||||||
|
params["with_components"] = "true"
|
||||||
|
|
||||||
|
return clean_webhook_url, params
|
||||||
|
|
||||||
|
|
||||||
|
def get_webhook_files(webhook: DiscordWebhook) -> list[WebhookFile]: # noqa: C901
|
||||||
|
"""Return files attached to a webhook object in a normalized shape."""
|
||||||
|
raw_files = getattr(webhook, "files", None)
|
||||||
|
files: list[WebhookFile] = []
|
||||||
|
|
||||||
|
if isinstance(raw_files, dict):
|
||||||
|
for filename, content in raw_files.items():
|
||||||
|
if isinstance(filename, str) and isinstance(content, bytes):
|
||||||
|
files.append(WebhookFile(filename=filename, content=content))
|
||||||
|
return files
|
||||||
|
|
||||||
|
if not isinstance(raw_files, list | tuple):
|
||||||
|
return []
|
||||||
|
|
||||||
|
for index, file_value in enumerate(raw_files):
|
||||||
|
if isinstance(file_value, WebhookFile):
|
||||||
|
files.append(file_value)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not isinstance(file_value, tuple) or len(file_value) < 2: # noqa: PLR2004
|
||||||
|
continue
|
||||||
|
|
||||||
|
first, second = file_value[0], file_value[1]
|
||||||
|
if isinstance(first, str) and isinstance(second, bytes):
|
||||||
|
files.append(WebhookFile(filename=first, content=second))
|
||||||
|
continue
|
||||||
|
|
||||||
|
if isinstance(second, tuple) and len(second) >= 2: # noqa: PLR2004
|
||||||
|
nested_file = cast("tuple[object, ...]", second)
|
||||||
|
nested_filename, nested_content = nested_file[0], nested_file[1]
|
||||||
|
if isinstance(nested_filename, str) and isinstance(nested_content, bytes):
|
||||||
|
files.append(WebhookFile(filename=nested_filename, content=nested_content))
|
||||||
|
continue
|
||||||
|
|
||||||
|
if isinstance(second, bytes):
|
||||||
|
files.append(WebhookFile(filename=f"file-{index}", content=second))
|
||||||
|
|
||||||
|
return files
|
||||||
|
|
||||||
|
|
||||||
|
def get_retry_after_seconds(response: httpx.Response) -> float | None:
|
||||||
|
"""Return Discord's retry delay for a rate-limited response when available."""
|
||||||
|
response_json: JsonObject = get_response_json(response)
|
||||||
|
retry_after: JsonValue = response_json.get("retry_after")
|
||||||
|
if isinstance(retry_after, int | float | str):
|
||||||
|
with suppress(TypeError, ValueError):
|
||||||
|
return float(retry_after)
|
||||||
|
|
||||||
|
retry_after_header: str | None = response.headers.get("retry-after")
|
||||||
|
if retry_after_header:
|
||||||
|
with suppress(TypeError, ValueError):
|
||||||
|
return float(retry_after_header)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def request_discord_webhook(
|
||||||
|
method: str,
|
||||||
|
url: str,
|
||||||
|
*,
|
||||||
|
payload: JsonObject,
|
||||||
|
params: dict[str, str],
|
||||||
|
files: list[WebhookFile] | None,
|
||||||
|
timeout: float,
|
||||||
|
rate_limit_retry: bool,
|
||||||
|
) -> httpx.Response:
|
||||||
|
"""Send a Discord webhook request with optional multipart files.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Discord API response.
|
||||||
|
"""
|
||||||
|
request_kwargs: dict[str, Any] = {"params": params, "timeout": timeout}
|
||||||
|
if files:
|
||||||
|
request_kwargs["data"] = {"payload_json": json.dumps(payload, default=str)}
|
||||||
|
request_kwargs["files"] = [
|
||||||
|
(f"files[{index}]", (file.filename, file.content)) for index, file in enumerate(files)
|
||||||
|
]
|
||||||
|
else:
|
||||||
|
request_kwargs["json"] = payload
|
||||||
|
|
||||||
|
response: httpx.Response = httpx.request(method, url, **request_kwargs)
|
||||||
|
if not rate_limit_retry or response.status_code != 429: # noqa: PLR2004
|
||||||
|
return response
|
||||||
|
|
||||||
|
retry_after: float | None = get_retry_after_seconds(response)
|
||||||
|
if retry_after is None:
|
||||||
|
return response
|
||||||
|
|
||||||
|
time.sleep(max(0.0, retry_after))
|
||||||
|
return httpx.request(method, url, **request_kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
def send_webhook_message(webhook: DiscordWebhook, payload: JsonObject) -> httpx.Response:
|
||||||
|
"""Execute a Discord webhook message create request using httpx.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Discord API response.
|
||||||
|
"""
|
||||||
|
clean_webhook_url, params = get_webhook_query_params(webhook.url, payload, webhook=webhook, wait=True)
|
||||||
|
return request_discord_webhook(
|
||||||
|
"POST",
|
||||||
|
clean_webhook_url,
|
||||||
|
payload=payload,
|
||||||
|
params=params,
|
||||||
|
files=get_webhook_files(webhook),
|
||||||
|
timeout=cast("int | float", getattr(webhook, "timeout", None) or 30.0),
|
||||||
|
rate_limit_retry=bool(getattr(webhook, "rate_limit_retry", False)),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def edit_sent_webhook_message(
|
def edit_sent_webhook_message(
|
||||||
webhook_url: str,
|
webhook_url: str,
|
||||||
message_id: str,
|
message_id: str,
|
||||||
webhook: DiscordWebhook,
|
webhook: DiscordWebhook,
|
||||||
payload: JsonObject,
|
payload: JsonObject,
|
||||||
) -> Response | httpx.Response:
|
) -> httpx.Response:
|
||||||
"""Edit an already-sent Discord webhook message.
|
"""Edit an already-sent Discord webhook message.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Response | httpx.Response: Discord API response.
|
httpx.Response: Discord API response.
|
||||||
"""
|
"""
|
||||||
clean_webhook_url, thread_id = split_webhook_url_for_message_endpoint(webhook_url)
|
clean_webhook_url, params = get_webhook_query_params(webhook_url, payload, webhook=webhook, wait=True)
|
||||||
|
return request_discord_webhook(
|
||||||
if getattr(webhook, "files", None):
|
"PATCH",
|
||||||
webhook.url = clean_webhook_url
|
|
||||||
webhook.id = message_id
|
|
||||||
if thread_id:
|
|
||||||
webhook.thread_id = thread_id
|
|
||||||
return webhook.edit()
|
|
||||||
|
|
||||||
params: dict[str, str] = {"wait": "true"}
|
|
||||||
if thread_id:
|
|
||||||
params["thread_id"] = thread_id
|
|
||||||
|
|
||||||
timeout: int | float = cast("int | float", getattr(webhook, "timeout", None) or 30.0)
|
|
||||||
return httpx.patch(
|
|
||||||
f"{clean_webhook_url}/messages/{message_id}",
|
f"{clean_webhook_url}/messages/{message_id}",
|
||||||
json=payload,
|
payload=payload,
|
||||||
params=params,
|
params=params,
|
||||||
timeout=timeout,
|
files=get_webhook_files(webhook),
|
||||||
|
timeout=cast("int | float", getattr(webhook, "timeout", None) or 30.0),
|
||||||
|
rate_limit_retry=bool(getattr(webhook, "rate_limit_retry", False)),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -908,7 +1100,7 @@ def create_screenshot_webhook(webhook_url: str, entry: Entry, reader: Reader) ->
|
||||||
)
|
)
|
||||||
screenshot_extension: str = "png"
|
screenshot_extension: str = "png"
|
||||||
|
|
||||||
if screenshot_bytes and len(screenshot_bytes) > MAX_DISCORD_UPLOAD_BYTES:
|
if screenshot_bytes and len(screenshot_bytes) > 8 * 1024 * 1024:
|
||||||
logger.info(
|
logger.info(
|
||||||
"Screenshot for entry %s is too large as PNG (%d bytes). Trying JPEG compression.",
|
"Screenshot for entry %s is too large as PNG (%d bytes). Trying JPEG compression.",
|
||||||
entry.id,
|
entry.id,
|
||||||
|
|
@ -934,7 +1126,7 @@ def create_screenshot_webhook(webhook_url: str, entry: Entry, reader: Reader) ->
|
||||||
screenshot_bytes = jpeg_bytes
|
screenshot_bytes = jpeg_bytes
|
||||||
screenshot_extension = "jpg"
|
screenshot_extension = "jpg"
|
||||||
|
|
||||||
if len(screenshot_bytes) <= MAX_DISCORD_UPLOAD_BYTES:
|
if len(screenshot_bytes) <= 8 * 1024 * 1024:
|
||||||
break
|
break
|
||||||
|
|
||||||
if screenshot_bytes is None:
|
if screenshot_bytes is None:
|
||||||
|
|
@ -945,7 +1137,7 @@ def create_screenshot_webhook(webhook_url: str, entry: Entry, reader: Reader) ->
|
||||||
)
|
)
|
||||||
return create_text_webhook(webhook_url, entry, reader=reader, use_default_message_on_empty=True)
|
return create_text_webhook(webhook_url, entry, reader=reader, use_default_message_on_empty=True)
|
||||||
|
|
||||||
if len(screenshot_bytes) > MAX_DISCORD_UPLOAD_BYTES:
|
if len(screenshot_bytes) > 8 * 1024 * 1024:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"Screenshot for entry %s is still too large after compression (%d bytes). Falling back to text message.",
|
"Screenshot for entry %s is still too large after compression (%d bytes). Falling back to text message.",
|
||||||
entry.id,
|
entry.id,
|
||||||
|
|
@ -1163,7 +1355,266 @@ def set_title(custom_embed: CustomEmbed, discord_embed: DiscordEmbed) -> None:
|
||||||
discord_embed.set_title(embed_title) if embed_title else None
|
discord_embed.set_title(embed_title) if embed_title else None
|
||||||
|
|
||||||
|
|
||||||
def create_embed_webhook( # noqa: C901
|
def add_unique_media_gallery_item(
|
||||||
|
media_items: list[JsonObject],
|
||||||
|
image_url: str,
|
||||||
|
*,
|
||||||
|
description: str,
|
||||||
|
limit: int = 10,
|
||||||
|
) -> None:
|
||||||
|
"""Append a valid media gallery item while preserving order and uniqueness."""
|
||||||
|
clean_image_url: str = image_url.strip()
|
||||||
|
if (
|
||||||
|
len(media_items) >= limit
|
||||||
|
or not clean_image_url
|
||||||
|
or any(item.get("url") == clean_image_url for item in media_items)
|
||||||
|
):
|
||||||
|
return
|
||||||
|
if not is_url_valid(clean_image_url):
|
||||||
|
logger.warning("Invalid media gallery URL: %s", clean_image_url)
|
||||||
|
return
|
||||||
|
media_items.append({"url": clean_image_url, "description": description[:1024]})
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_ttvdrops_media_url(image_url: str) -> str:
|
||||||
|
"""Return an absolute ttvdrops media URL."""
|
||||||
|
clean_image_url: str = image_url.strip()
|
||||||
|
if not clean_image_url:
|
||||||
|
return ""
|
||||||
|
return urljoin("https://ttvdrops.lovinator.space/", clean_image_url)
|
||||||
|
|
||||||
|
|
||||||
|
def get_ttvdrops_campaign_api_url(entry: Entry) -> str:
|
||||||
|
"""Return the ttvdrops campaign API URL for an entry when it can be inferred."""
|
||||||
|
candidate_urls: tuple[str | None, ...] = (
|
||||||
|
entry.link,
|
||||||
|
entry.id,
|
||||||
|
entry.feed.url,
|
||||||
|
)
|
||||||
|
|
||||||
|
for candidate_url in candidate_urls:
|
||||||
|
if not candidate_url:
|
||||||
|
continue
|
||||||
|
|
||||||
|
parsed_url = urlparse(str(candidate_url))
|
||||||
|
if parsed_url.netloc.lower() != "ttvdrops.lovinator.space":
|
||||||
|
continue
|
||||||
|
|
||||||
|
if re.fullmatch(r"/twitch/api/v1/campaigns/[^/]+/?", parsed_url.path):
|
||||||
|
return parsed_url._replace(query="", fragment="").geturl()
|
||||||
|
|
||||||
|
campaign_match = re.fullmatch(r"/twitch/campaigns/([^/]+)/?", parsed_url.path)
|
||||||
|
if campaign_match:
|
||||||
|
campaign_id: str = campaign_match.group(1)
|
||||||
|
return parsed_url._replace(
|
||||||
|
path=f"/twitch/api/v1/campaigns/{campaign_id}/",
|
||||||
|
query="",
|
||||||
|
fragment="",
|
||||||
|
).geturl()
|
||||||
|
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def get_ttvdrops_reward_description(drop: JsonObject, reward: JsonObject) -> str:
|
||||||
|
"""Return alt text for a ttvdrops reward image.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Reward alt text suitable for a Media Gallery description.
|
||||||
|
"""
|
||||||
|
reward_name: str = str(reward.get("name") or drop.get("name") or "Reward")
|
||||||
|
required_minutes: int = json_value_to_int(drop.get("required_minutes_watched"))
|
||||||
|
required_subs: int = json_value_to_int(drop.get("required_subs"))
|
||||||
|
|
||||||
|
if required_minutes:
|
||||||
|
return f"{required_minutes} minutes watched: {reward_name}"
|
||||||
|
if required_subs:
|
||||||
|
return f"{required_subs} subscriptions: {reward_name}"
|
||||||
|
return reward_name
|
||||||
|
|
||||||
|
|
||||||
|
def extract_ttvdrops_media_gallery_items(value: JsonValue) -> list[JsonObject]: # noqa: C901
|
||||||
|
"""Extract benefit/reward media gallery items from a ttvdrops API response.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Media Gallery items with absolute URLs and reward descriptions.
|
||||||
|
"""
|
||||||
|
media_items: list[JsonObject] = []
|
||||||
|
|
||||||
|
def add_reward_image(drop: JsonObject, reward: JsonObject) -> None:
|
||||||
|
image_url = reward.get("image_url")
|
||||||
|
if isinstance(image_url, str):
|
||||||
|
add_unique_media_gallery_item(
|
||||||
|
media_items,
|
||||||
|
normalize_ttvdrops_media_url(image_url),
|
||||||
|
description=get_ttvdrops_reward_description(drop, reward),
|
||||||
|
)
|
||||||
|
|
||||||
|
def collect_benefit_images(current_value: JsonValue) -> None:
|
||||||
|
if isinstance(current_value, dict):
|
||||||
|
for key, child_value in current_value.items():
|
||||||
|
if key in {"benefits", "rewards"} and isinstance(child_value, list):
|
||||||
|
for item in child_value:
|
||||||
|
if isinstance(item, dict):
|
||||||
|
add_reward_image(cast("JsonObject", current_value), cast("JsonObject", item))
|
||||||
|
collect_benefit_images(item)
|
||||||
|
continue
|
||||||
|
|
||||||
|
collect_benefit_images(child_value)
|
||||||
|
return
|
||||||
|
|
||||||
|
if isinstance(current_value, list):
|
||||||
|
for item in current_value:
|
||||||
|
collect_benefit_images(item)
|
||||||
|
|
||||||
|
collect_benefit_images(value)
|
||||||
|
return media_items
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_ttvdrops_campaign_media_items(entry: Entry) -> list[JsonObject]:
|
||||||
|
"""Fetch extra campaign media gallery items for ttvdrops entries.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Media Gallery items for ttvdrops rewards, or an empty list.
|
||||||
|
"""
|
||||||
|
api_url: str = get_ttvdrops_campaign_api_url(entry)
|
||||||
|
if not api_url:
|
||||||
|
return []
|
||||||
|
|
||||||
|
try:
|
||||||
|
response: httpx.Response = httpx.get(api_url, follow_redirects=True, timeout=10.0)
|
||||||
|
if response.status_code != 200: # noqa: PLR2004
|
||||||
|
logger.warning("Failed to fetch ttvdrops campaign data from %s: %s", api_url, response.text[:500])
|
||||||
|
return []
|
||||||
|
|
||||||
|
response_json = cast("JsonValue", response.json())
|
||||||
|
except (httpx.HTTPError, ValueError, TypeError):
|
||||||
|
logger.exception("Failed to fetch ttvdrops campaign data from %s", api_url)
|
||||||
|
return []
|
||||||
|
|
||||||
|
return extract_ttvdrops_media_gallery_items(response_json)
|
||||||
|
|
||||||
|
|
||||||
|
def get_entry_media_gallery_items(
|
||||||
|
entry: Entry,
|
||||||
|
custom_embed: CustomEmbed,
|
||||||
|
*,
|
||||||
|
image_limit: int = 10,
|
||||||
|
) -> list[JsonObject]:
|
||||||
|
"""Return items for a Discord Media Gallery component.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Media Gallery items capped to Discord's item limit.
|
||||||
|
"""
|
||||||
|
image_limit = coerce_media_gallery_image_limit(image_limit)
|
||||||
|
if image_limit == 0:
|
||||||
|
return []
|
||||||
|
|
||||||
|
media_items: list[JsonObject] = []
|
||||||
|
ttvdrops_media_items: list[JsonObject] = fetch_ttvdrops_campaign_media_items(entry)
|
||||||
|
if ttvdrops_media_items:
|
||||||
|
return ttvdrops_media_items[:image_limit]
|
||||||
|
|
||||||
|
description: str = entry.title or entry.id
|
||||||
|
for image_url in get_image_urls(entry.summary, entry.content, limit=image_limit):
|
||||||
|
add_unique_media_gallery_item(media_items, image_url, description=description)
|
||||||
|
|
||||||
|
add_unique_media_gallery_item(media_items, custom_embed.image_url, description=description)
|
||||||
|
add_unique_media_gallery_item(media_items, custom_embed.thumbnail_url, description=description)
|
||||||
|
|
||||||
|
return media_items[:image_limit]
|
||||||
|
|
||||||
|
|
||||||
|
def truncate_component_text(content: str) -> str:
|
||||||
|
"""Trim a Text Display component to a conservative Discord-safe length.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Original or truncated component text.
|
||||||
|
"""
|
||||||
|
max_text_display_length: int = 4000
|
||||||
|
if len(content) <= max_text_display_length:
|
||||||
|
return content
|
||||||
|
return f"{content[: max_text_display_length - 3]}..."
|
||||||
|
|
||||||
|
|
||||||
|
def get_component_text_display_content(custom_embed: CustomEmbed, entry: Entry) -> str:
|
||||||
|
"""Build markdown text for a Components V2 Text Display.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Markdown content for a Text Display component.
|
||||||
|
"""
|
||||||
|
parts: list[str] = []
|
||||||
|
|
||||||
|
if custom_embed.title:
|
||||||
|
parts.append(f"# {custom_embed.title}")
|
||||||
|
|
||||||
|
if custom_embed.author_name and custom_embed.author_url:
|
||||||
|
parts.append(f"## [{custom_embed.author_name}]({custom_embed.author_url})")
|
||||||
|
elif custom_embed.author_name:
|
||||||
|
parts.append(f"## {custom_embed.author_name}")
|
||||||
|
elif custom_embed.author_url:
|
||||||
|
parts.append(f"<{custom_embed.author_url}>")
|
||||||
|
|
||||||
|
if custom_embed.description:
|
||||||
|
parts.append(custom_embed.description)
|
||||||
|
|
||||||
|
if custom_embed.footer_text:
|
||||||
|
parts.append(f"-# {custom_embed.footer_text}")
|
||||||
|
|
||||||
|
if not parts:
|
||||||
|
fallback_text: str = entry.title or entry.link or entry.id
|
||||||
|
if entry.link and fallback_text != entry.link:
|
||||||
|
fallback_text = f"[{fallback_text}]({entry.link})"
|
||||||
|
parts.append(fallback_text)
|
||||||
|
|
||||||
|
return truncate_component_text("\n\n".join(parts))
|
||||||
|
|
||||||
|
|
||||||
|
def create_media_gallery_component(media_items: list[JsonObject]) -> JsonObject:
|
||||||
|
"""Build a Discord Media Gallery component.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Discord Media Gallery component payload.
|
||||||
|
"""
|
||||||
|
return {
|
||||||
|
"type": 12,
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"media": {"url": media_item["url"]},
|
||||||
|
"description": media_item["description"],
|
||||||
|
}
|
||||||
|
for media_item in media_items[:10]
|
||||||
|
if isinstance(media_item.get("url"), str) and isinstance(media_item.get("description"), str)
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def create_components_v2_webhook(
|
||||||
|
webhook_url: str,
|
||||||
|
entry: Entry,
|
||||||
|
custom_embed: CustomEmbed,
|
||||||
|
media_items: list[JsonObject],
|
||||||
|
) -> DiscordWebhook:
|
||||||
|
"""Create a Components V2 webhook with text and a media gallery.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Webhook payload configured for Components V2.
|
||||||
|
"""
|
||||||
|
components: list[JsonValue] = [
|
||||||
|
{
|
||||||
|
"type": 10,
|
||||||
|
"content": get_component_text_display_content(custom_embed, entry),
|
||||||
|
},
|
||||||
|
create_media_gallery_component(media_items),
|
||||||
|
]
|
||||||
|
return DiscordWebhook(
|
||||||
|
url=webhook_url,
|
||||||
|
flags=1 << 15,
|
||||||
|
components=components,
|
||||||
|
rate_limit_retry=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def create_embed_webhook( # noqa: C901, PLR0912
|
||||||
webhook_url: str,
|
webhook_url: str,
|
||||||
entry: Entry,
|
entry: Entry,
|
||||||
reader: Reader,
|
reader: Reader,
|
||||||
|
|
@ -1183,6 +1634,18 @@ def create_embed_webhook( # noqa: C901
|
||||||
|
|
||||||
# Get the embed data from the database.
|
# Get the embed data from the database.
|
||||||
custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry, reader=reader)
|
custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry, reader=reader)
|
||||||
|
media_gallery_image_limit: int = get_feed_media_gallery_image_limit(reader, feed)
|
||||||
|
if media_gallery_image_limit == 0:
|
||||||
|
custom_embed.image_url = ""
|
||||||
|
custom_embed.thumbnail_url = ""
|
||||||
|
|
||||||
|
media_gallery_items: list[JsonObject] = get_entry_media_gallery_items(
|
||||||
|
entry,
|
||||||
|
custom_embed,
|
||||||
|
image_limit=media_gallery_image_limit,
|
||||||
|
)
|
||||||
|
if media_gallery_items:
|
||||||
|
return create_components_v2_webhook(webhook_url, entry, custom_embed, media_gallery_items)
|
||||||
|
|
||||||
discord_embed: DiscordEmbed = DiscordEmbed()
|
discord_embed: DiscordEmbed = DiscordEmbed()
|
||||||
|
|
||||||
|
|
@ -1357,11 +1820,12 @@ def execute_webhook(
|
||||||
logger.warning("Feed not found in reader, not sending entry to Discord: %s", entry_feed.url)
|
logger.warning("Feed not found in reader, not sending entry to Discord: %s", entry_feed.url)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
request_payload: JsonObject = get_webhook_request_payload(webhook)
|
||||||
payload: JsonObject = get_webhook_message_payload(webhook)
|
payload: JsonObject = get_webhook_message_payload(webhook)
|
||||||
response: Response = webhook.execute()
|
response: httpx.Response = send_webhook_message(webhook, request_payload)
|
||||||
logger.debug("Discord webhook response for entry %s: status=%s", entry.id, response.status_code)
|
logger.debug("Discord webhook response for entry %s: status=%s", entry.id, response.status_code)
|
||||||
if response.status_code not in {200, 204}:
|
if response.status_code not in {200, 204}:
|
||||||
msg: str = f"Error sending entry to Discord: {response.text}\n{pprint.pformat(webhook.json)}"
|
msg: str = f"Error sending entry to Discord: {response.text}\n{pprint.pformat(request_payload)}"
|
||||||
if entry:
|
if entry:
|
||||||
msg += f"\n{entry}"
|
msg += f"\n{entry}"
|
||||||
|
|
||||||
|
|
@ -1469,7 +1933,14 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None:
|
||||||
reader.set_tag(clean_feed_url, "webhook", webhook_url) # pyright: ignore[reportArgumentType]
|
reader.set_tag(clean_feed_url, "webhook", webhook_url) # pyright: ignore[reportArgumentType]
|
||||||
|
|
||||||
# Store sent Discord message ids by default so modified feed entries can edit the original webhook message.
|
# Store sent Discord message ids by default so modified feed entries can edit the original webhook message.
|
||||||
reader.set_tag(clean_feed_url, SAVE_SENT_WEBHOOKS_TAG, True) # pyright: ignore[reportArgumentType]
|
reader.set_tag(clean_feed_url, "save_sent_webhooks", True) # pyright: ignore[reportArgumentType]
|
||||||
|
|
||||||
|
# Keep the existing delivery behavior for new feeds unless changed from the feed page.
|
||||||
|
reader.set_tag(
|
||||||
|
clean_feed_url,
|
||||||
|
"media_gallery_image_limit",
|
||||||
|
cast("JSONType", 1),
|
||||||
|
)
|
||||||
|
|
||||||
# This is the default message that will be sent to Discord.
|
# This is the default message that will be sent to Discord.
|
||||||
reader.set_tag(clean_feed_url, "custom_message", default_custom_message) # pyright: ignore[reportArgumentType]
|
reader.set_tag(clean_feed_url, "custom_message", default_custom_message) # pyright: ignore[reportArgumentType]
|
||||||
|
|
|
||||||
|
|
@ -8,8 +8,9 @@ from typing import TYPE_CHECKING
|
||||||
from typing import cast
|
from typing import cast
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
from discord_webhook import DiscordEmbed
|
|
||||||
from discord_webhook import DiscordWebhook
|
from discord_rss_bot.webhook import DiscordEmbed
|
||||||
|
from discord_rss_bot.webhook import DiscordWebhook
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from reader import Entry
|
from reader import Entry
|
||||||
|
|
|
||||||
|
|
@ -51,12 +51,13 @@ from discord_rss_bot.custom_message import get_embed
|
||||||
from discord_rss_bot.custom_message import get_first_image
|
from discord_rss_bot.custom_message import get_first_image
|
||||||
from discord_rss_bot.custom_message import replace_tags_in_text_message
|
from discord_rss_bot.custom_message import replace_tags_in_text_message
|
||||||
from discord_rss_bot.custom_message import save_embed
|
from discord_rss_bot.custom_message import save_embed
|
||||||
from discord_rss_bot.feeds import SAVE_SENT_WEBHOOKS_TAG
|
|
||||||
from discord_rss_bot.feeds import SentWebhookRecord
|
from discord_rss_bot.feeds import SentWebhookRecord
|
||||||
|
from discord_rss_bot.feeds import coerce_media_gallery_image_limit
|
||||||
from discord_rss_bot.feeds import create_feed
|
from discord_rss_bot.feeds import create_feed
|
||||||
from discord_rss_bot.feeds import extract_domain
|
from discord_rss_bot.feeds import extract_domain
|
||||||
from discord_rss_bot.feeds import feed_saves_sent_webhooks
|
from discord_rss_bot.feeds import feed_saves_sent_webhooks
|
||||||
from discord_rss_bot.feeds import get_feed_delivery_mode
|
from discord_rss_bot.feeds import get_feed_delivery_mode
|
||||||
|
from discord_rss_bot.feeds import get_feed_media_gallery_image_limit
|
||||||
from discord_rss_bot.feeds import get_screenshot_layout
|
from discord_rss_bot.feeds import get_screenshot_layout
|
||||||
from discord_rss_bot.feeds import get_sent_webhook_records
|
from discord_rss_bot.feeds import get_sent_webhook_records
|
||||||
from discord_rss_bot.feeds import send_entry_to_discord
|
from discord_rss_bot.feeds import send_entry_to_discord
|
||||||
|
|
@ -71,6 +72,7 @@ from discord_rss_bot.filter.evaluator import evaluate_entry_filters
|
||||||
from discord_rss_bot.filter.evaluator import get_entry_decision_key
|
from discord_rss_bot.filter.evaluator import get_entry_decision_key
|
||||||
from discord_rss_bot.filter.evaluator import get_entry_fields
|
from discord_rss_bot.filter.evaluator import get_entry_fields
|
||||||
from discord_rss_bot.filter.evaluator import get_filter_values_from_reader
|
from discord_rss_bot.filter.evaluator import get_filter_values_from_reader
|
||||||
|
from discord_rss_bot.filter.evaluator import has_filter_values
|
||||||
from discord_rss_bot.git_backup import commit_state_change
|
from discord_rss_bot.git_backup import commit_state_change
|
||||||
from discord_rss_bot.git_backup import get_backup_path
|
from discord_rss_bot.git_backup import get_backup_path
|
||||||
from discord_rss_bot.is_url_valid import is_url_valid
|
from discord_rss_bot.is_url_valid import is_url_valid
|
||||||
|
|
@ -1346,12 +1348,44 @@ async def post_set_feed_save_sent_webhooks(
|
||||||
except FeedNotFoundError as e:
|
except FeedNotFoundError as e:
|
||||||
raise HTTPException(status_code=404, detail="Feed not found") from e
|
raise HTTPException(status_code=404, detail="Feed not found") from e
|
||||||
|
|
||||||
reader.set_tag(clean_feed_url, SAVE_SENT_WEBHOOKS_TAG, should_save) # pyright: ignore[reportArgumentType]
|
reader.set_tag(clean_feed_url, "save_sent_webhooks", should_save) # pyright: ignore[reportArgumentType]
|
||||||
action: str = "Enable" if should_save else "Disable"
|
action: str = "Enable" if should_save else "Disable"
|
||||||
commit_state_change(reader, f"{action} sent webhook storage for {clean_feed_url}")
|
commit_state_change(reader, f"{action} sent webhook storage for {clean_feed_url}")
|
||||||
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
|
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/set_feed_media_gallery_image_limit")
|
||||||
|
async def post_set_feed_media_gallery_image_limit(
|
||||||
|
feed_url: Annotated[str, Form()],
|
||||||
|
image_limit: Annotated[int, Form()],
|
||||||
|
reader: Annotated[Reader, Depends(get_reader_dependency)],
|
||||||
|
) -> RedirectResponse:
|
||||||
|
"""Set whether a feed sends one image or a full media gallery.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
RedirectResponse: Redirect to the feed page.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HTTPException: If the feed does not exist.
|
||||||
|
"""
|
||||||
|
clean_feed_url: str = feed_url.strip()
|
||||||
|
clean_image_limit: int = coerce_media_gallery_image_limit(image_limit)
|
||||||
|
clean_image_limit_json: JSONType = cast("JSONType", clean_image_limit)
|
||||||
|
|
||||||
|
try:
|
||||||
|
reader.get_feed(clean_feed_url)
|
||||||
|
except FeedNotFoundError as e:
|
||||||
|
raise HTTPException(status_code=404, detail="Feed not found") from e
|
||||||
|
|
||||||
|
reader.set_tag(
|
||||||
|
clean_feed_url,
|
||||||
|
"media_gallery_image_limit",
|
||||||
|
clean_image_limit_json,
|
||||||
|
)
|
||||||
|
commit_state_change(reader, f"Set media gallery image limit to {clean_image_limit} for {clean_feed_url}")
|
||||||
|
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
|
||||||
|
|
||||||
|
|
||||||
@app.post("/set_update_interval")
|
@app.post("/set_update_interval")
|
||||||
async def post_set_update_interval(
|
async def post_set_update_interval(
|
||||||
feed_url: Annotated[str, Form()],
|
feed_url: Annotated[str, Form()],
|
||||||
|
|
@ -1620,6 +1654,9 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
|
||||||
current_webhook_name = hook.get("name", "").strip()
|
current_webhook_name = hook.get("name", "").strip()
|
||||||
break
|
break
|
||||||
|
|
||||||
|
has_blacklist_filters: bool = has_filter_values(get_filter_values_from_reader(reader, feed, "blacklist"))
|
||||||
|
has_whitelist_filters: bool = has_filter_values(get_filter_values_from_reader(reader, feed, "whitelist"))
|
||||||
|
|
||||||
# Only show button if more than 10 entries.
|
# Only show button if more than 10 entries.
|
||||||
total_entries: int = reader.get_entry_counts(feed=feed).total or 0
|
total_entries: int = reader.get_entry_counts(feed=feed).total or 0
|
||||||
is_show_more_entries_button_visible: bool = total_entries > entries_per_page
|
is_show_more_entries_button_visible: bool = total_entries > entries_per_page
|
||||||
|
|
@ -1668,6 +1705,10 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
|
||||||
"webhooks": webhooks,
|
"webhooks": webhooks,
|
||||||
"current_webhook_url": current_webhook_url,
|
"current_webhook_url": current_webhook_url,
|
||||||
"current_webhook_name": current_webhook_name,
|
"current_webhook_name": current_webhook_name,
|
||||||
|
"has_blacklist_filters": has_blacklist_filters,
|
||||||
|
"has_whitelist_filters": has_whitelist_filters,
|
||||||
|
"media_gallery_image_limit": get_feed_media_gallery_image_limit(reader, feed),
|
||||||
|
"max_media_gallery_items": 10,
|
||||||
"save_sent_webhooks": feed_saves_sent_webhooks(reader, feed),
|
"save_sent_webhooks": feed_saves_sent_webhooks(reader, feed),
|
||||||
}
|
}
|
||||||
return templates.TemplateResponse(request=request, name="feed.html", context=context)
|
return templates.TemplateResponse(request=request, name="feed.html", context=context)
|
||||||
|
|
@ -1728,6 +1769,10 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
|
||||||
"webhooks": webhooks,
|
"webhooks": webhooks,
|
||||||
"current_webhook_url": current_webhook_url,
|
"current_webhook_url": current_webhook_url,
|
||||||
"current_webhook_name": current_webhook_name,
|
"current_webhook_name": current_webhook_name,
|
||||||
|
"has_blacklist_filters": has_blacklist_filters,
|
||||||
|
"has_whitelist_filters": has_whitelist_filters,
|
||||||
|
"media_gallery_image_limit": get_feed_media_gallery_image_limit(reader, feed),
|
||||||
|
"max_media_gallery_items": 10,
|
||||||
"save_sent_webhooks": feed_saves_sent_webhooks(reader, feed),
|
"save_sent_webhooks": feed_saves_sent_webhooks(reader, feed),
|
||||||
}
|
}
|
||||||
return templates.TemplateResponse(request=request, name="feed.html", context=context)
|
return templates.TemplateResponse(request=request, name="feed.html", context=context)
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,11 @@ body {
|
||||||
max-width: 120px;
|
max-width: 120px;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
.image-limit-control {
|
||||||
|
min-width: 12rem;
|
||||||
|
max-width: 28rem;
|
||||||
|
}
|
||||||
|
|
||||||
.screenshot-requirement {
|
.screenshot-requirement {
|
||||||
color: #9c9c9c;
|
color: #9c9c9c;
|
||||||
font-size: 0.9rem;
|
font-size: 0.9rem;
|
||||||
|
|
@ -65,6 +70,10 @@ body {
|
||||||
word-break: break-word;
|
word-break: break-word;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
.feed-summary-list {
|
||||||
|
padding-left: 1.15rem;
|
||||||
|
}
|
||||||
|
|
||||||
.sent-webhooks__entry,
|
.sent-webhooks__entry,
|
||||||
.sent-webhooks__preview {
|
.sent-webhooks__preview {
|
||||||
min-width: 14rem;
|
min-width: 14rem;
|
||||||
|
|
|
||||||
|
|
@ -236,10 +236,12 @@
|
||||||
<label for="author_icon_url" class="col-sm-6 col-form-label">Author icon URL</label>
|
<label for="author_icon_url" class="col-sm-6 col-form-label">Author icon URL</label>
|
||||||
<input name="author_icon_url" type="text" class="form-control bg-dark border-dark text-muted"
|
<input name="author_icon_url" type="text" class="form-control bg-dark border-dark text-muted"
|
||||||
id="author_icon_url" {% if author_icon_url %} value="{{- author_icon_url -}}" {% endif %} />
|
id="author_icon_url" {% if author_icon_url %} value="{{- author_icon_url -}}" {% endif %} />
|
||||||
<label for="image_url" class="col-sm-6 col-form-label">Image URL - Add {% raw %}{{image_1}}{% endraw %}
|
<label for="image_url" class="col-sm-6 col-form-label">Image URL</label>
|
||||||
for first image</label>
|
|
||||||
<input name="image_url" type="text" class="form-control bg-dark border-dark text-muted" id="image_url"
|
<input name="image_url" type="text" class="form-control bg-dark border-dark text-muted" id="image_url"
|
||||||
{% if image_url %} value="{{- image_url -}}" {% endif %} />
|
{% if image_url %} value="{{- image_url -}}" {% endif %} />
|
||||||
|
<div class="form-text">
|
||||||
|
Use {% raw %}{{image_1}}{% endraw %} to use the first image URL found in the entry. You can also configure how many images gets sent via the feed page.
|
||||||
|
</div>
|
||||||
<label for="thumbnail_url" class="col-sm-6 col-form-label">Thumbnail</label>
|
<label for="thumbnail_url" class="col-sm-6 col-form-label">Thumbnail</label>
|
||||||
<input name="thumbnail_url" type="text" class="form-control bg-dark border-dark text-muted"
|
<input name="thumbnail_url" type="text" class="form-control bg-dark border-dark text-muted"
|
||||||
id="thumbnail_url" {% if thumbnail_url %} value="{{- thumbnail_url -}}" {% endif %} />
|
id="thumbnail_url" {% if thumbnail_url %} value="{{- thumbnail_url -}}" {% endif %} />
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,18 @@
|
||||||
Text
|
Text
|
||||||
{% endif %}
|
{% endif %}
|
||||||
</span>
|
</span>
|
||||||
|
{% if not "youtube.com/feeds/videos.xml" in feed.url %}
|
||||||
|
<span class="badge status-chip bg-secondary">
|
||||||
|
Images:
|
||||||
|
{% if media_gallery_image_limit == 0 %}
|
||||||
|
No images
|
||||||
|
{% elif media_gallery_image_limit == 1 %}
|
||||||
|
First image only
|
||||||
|
{% else %}
|
||||||
|
Up to {{ media_gallery_image_limit }} images
|
||||||
|
{% endif %}
|
||||||
|
</span>
|
||||||
|
{% endif %}
|
||||||
{% if delivery_mode == "screenshot" %}
|
{% if delivery_mode == "screenshot" %}
|
||||||
<span class="badge status-chip bg-secondary">
|
<span class="badge status-chip bg-secondary">
|
||||||
Screenshot layout:
|
Screenshot layout:
|
||||||
|
|
@ -42,6 +54,72 @@
|
||||||
{% endif %}
|
{% endif %}
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
<section class="mt-4 pt-3 border-top border-secondary-subtle">
|
||||||
|
<h3 class="h6 text-uppercase text-muted mb-2">Feed Summary</h3>
|
||||||
|
<p class="text-muted mb-2">
|
||||||
|
This feed
|
||||||
|
{% if feed.updates_enabled %}
|
||||||
|
will send new entries
|
||||||
|
{% else %}
|
||||||
|
is paused and will not send new entries
|
||||||
|
{% endif %}
|
||||||
|
{% if current_webhook_name %}
|
||||||
|
to <strong>{{ current_webhook_name }}</strong>
|
||||||
|
{% elif current_webhook_url %}
|
||||||
|
to a webhook that is no longer saved
|
||||||
|
{% else %}
|
||||||
|
after a webhook is attached
|
||||||
|
{% endif %}
|
||||||
|
as
|
||||||
|
{% if delivery_mode == "embed" %}
|
||||||
|
an embed.
|
||||||
|
{% elif delivery_mode == "screenshot" %}
|
||||||
|
a screenshot of the entry link page in {{ screenshot_layout }} mode.
|
||||||
|
{% else %}
|
||||||
|
a text message.
|
||||||
|
{% endif %}
|
||||||
|
</p>
|
||||||
|
<ul class="text-muted small mb-0 feed-summary-list">
|
||||||
|
<li>
|
||||||
|
Update interval:
|
||||||
|
{% if feed_interval %}
|
||||||
|
{{ feed_interval }} minutes.
|
||||||
|
{% else %}
|
||||||
|
{{ global_interval }} minutes because of the global default.
|
||||||
|
{% endif %}
|
||||||
|
</li>
|
||||||
|
{% if delivery_mode == "embed" %}
|
||||||
|
<li>
|
||||||
|
Embed images:
|
||||||
|
{% if media_gallery_image_limit == 0 %}
|
||||||
|
none.
|
||||||
|
{% elif media_gallery_image_limit == 1 %}
|
||||||
|
first image only.
|
||||||
|
{% else %}
|
||||||
|
up to {{ media_gallery_image_limit }} images.
|
||||||
|
{% endif %}
|
||||||
|
</li>
|
||||||
|
{% elif delivery_mode == "screenshot" %}
|
||||||
|
<li>Screenshot layout: {{ screenshot_layout }}.</li>
|
||||||
|
{% endif %}
|
||||||
|
<li>
|
||||||
|
Filters:
|
||||||
|
{% if has_blacklist_filters and has_whitelist_filters %}
|
||||||
|
whitelist and blacklist are active; blacklist wins when both match.
|
||||||
|
{% elif has_blacklist_filters %}
|
||||||
|
blacklist is active.
|
||||||
|
{% elif has_whitelist_filters %}
|
||||||
|
whitelist is active.
|
||||||
|
{% else %}
|
||||||
|
no whitelist or blacklist filters are configured.
|
||||||
|
{% endif %}
|
||||||
|
</li>
|
||||||
|
<li>
|
||||||
|
Updating the Discord webhook when the feed entry changes is
|
||||||
|
{{ 'enabled.' if save_sent_webhooks else 'disabled.' }}
|
||||||
|
</li>
|
||||||
|
</ul>
|
||||||
|
</section>
|
||||||
{% if feed.last_exception %}
|
{% if feed.last_exception %}
|
||||||
<div class="alert alert-danger mt-4 mb-0" role="alert">
|
<div class="alert alert-danger mt-4 mb-0" role="alert">
|
||||||
<h5 class="alert-heading mb-2">{{ feed.last_exception.type_name }}</h5>
|
<h5 class="alert-heading mb-2">{{ feed.last_exception.type_name }}</h5>
|
||||||
|
|
@ -88,24 +166,56 @@
|
||||||
name="feed_url"
|
name="feed_url"
|
||||||
value="{{ feed.url }}">Send text message instead of embed</button>
|
value="{{ feed.url }}">Send text message instead of embed</button>
|
||||||
</form>
|
</form>
|
||||||
<form action="/use_screenshot" method="post" class="d-inline">
|
|
||||||
<button class="btn btn-outline-light btn-sm"
|
|
||||||
name="feed_url"
|
|
||||||
value="{{ feed.url }}">
|
|
||||||
Send full-page screenshot instead of embed
|
|
||||||
</button>
|
|
||||||
</form>
|
|
||||||
{% elif delivery_mode == "screenshot" %}
|
{% elif delivery_mode == "screenshot" %}
|
||||||
<form action="/use_embed" method="post" class="d-inline">
|
|
||||||
<button class="btn btn-outline-light btn-sm"
|
|
||||||
name="feed_url"
|
|
||||||
value="{{ feed.url }}">Send embed instead of screenshot</button>
|
|
||||||
</form>
|
|
||||||
<form action="/use_text" method="post" class="d-inline">
|
<form action="/use_text" method="post" class="d-inline">
|
||||||
<button class="btn btn-outline-light btn-sm"
|
<button class="btn btn-outline-light btn-sm"
|
||||||
name="feed_url"
|
name="feed_url"
|
||||||
value="{{ feed.url }}">Send text message instead of screenshot</button>
|
value="{{ feed.url }}">Send text message instead of screenshot</button>
|
||||||
</form>
|
</form>
|
||||||
|
{% else %}
|
||||||
|
<form action="/use_embed" method="post" class="d-inline">
|
||||||
|
<button class="btn btn-outline-light btn-sm"
|
||||||
|
name="feed_url"
|
||||||
|
value="{{ feed.url }}">Send embed instead of text message</button>
|
||||||
|
</form>
|
||||||
|
{% endif %}
|
||||||
|
{% endif %}
|
||||||
|
</div>
|
||||||
|
</section>
|
||||||
|
{% if not "youtube.com/feeds/videos.xml" in feed.url %}
|
||||||
|
<section class="mt-4 pt-3 border-top border-secondary-subtle">
|
||||||
|
<div class="d-flex flex-wrap align-items-center justify-content-between gap-2 mb-2">
|
||||||
|
<h3 class="h6 text-uppercase text-muted mb-0">Screenshot Delivery</h3>
|
||||||
|
<span class="badge {{ 'bg-info' if delivery_mode == 'screenshot' else 'bg-secondary' }}">
|
||||||
|
{% if delivery_mode == "screenshot" %}
|
||||||
|
Active:
|
||||||
|
{% if screenshot_layout == "mobile" %}
|
||||||
|
Mobile
|
||||||
|
{% else %}
|
||||||
|
Desktop
|
||||||
|
{% endif %}
|
||||||
|
{% else %}
|
||||||
|
Inactive
|
||||||
|
{% endif %}
|
||||||
|
</span>
|
||||||
|
</div>
|
||||||
|
<p class="text-muted mb-3">
|
||||||
|
Screenshot delivery sends a full-page screenshot of the entry link instead of the normal
|
||||||
|
embed or text message.
|
||||||
|
</p>
|
||||||
|
<div class="d-flex flex-wrap gap-2">
|
||||||
|
{% if delivery_mode != "screenshot" %}
|
||||||
|
<form action="/use_screenshot" method="post" class="d-inline">
|
||||||
|
<button class="btn btn-outline-light btn-sm"
|
||||||
|
name="feed_url"
|
||||||
|
value="{{ feed.url }}">Use screenshot delivery</button>
|
||||||
|
</form>
|
||||||
|
{% else %}
|
||||||
|
<form action="/use_embed" method="post" class="d-inline">
|
||||||
|
<button class="btn btn-outline-light btn-sm"
|
||||||
|
name="feed_url"
|
||||||
|
value="{{ feed.url }}">Disable screenshot delivery</button>
|
||||||
|
</form>
|
||||||
{% if screenshot_layout == "mobile" %}
|
{% if screenshot_layout == "mobile" %}
|
||||||
<form action="/use_screenshot_desktop" method="post" class="d-inline">
|
<form action="/use_screenshot_desktop" method="post" class="d-inline">
|
||||||
<button class="btn btn-outline-light btn-sm"
|
<button class="btn btn-outline-light btn-sm"
|
||||||
|
|
@ -119,26 +229,61 @@
|
||||||
value="{{ feed.url }}">Use mobile screenshot layout</button>
|
value="{{ feed.url }}">Use mobile screenshot layout</button>
|
||||||
</form>
|
</form>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
{% else %}
|
|
||||||
<form action="/use_embed" method="post" class="d-inline">
|
|
||||||
<button class="btn btn-outline-light btn-sm"
|
|
||||||
name="feed_url"
|
|
||||||
value="{{ feed.url }}">Send embed instead of text message</button>
|
|
||||||
</form>
|
|
||||||
<form action="/use_screenshot" method="post" class="d-inline">
|
|
||||||
<button class="btn btn-outline-light btn-sm"
|
|
||||||
name="feed_url"
|
|
||||||
value="{{ feed.url }}">
|
|
||||||
Send full-page screenshot instead of text message
|
|
||||||
</button>
|
|
||||||
</form>
|
|
||||||
{% endif %}
|
{% endif %}
|
||||||
<div class="w-100 mt-1 screenshot-requirement">
|
</div>
|
||||||
Screenshot mode requires Chromium to be installed for Playwright. Run <code>uv run playwright install chromium</code> once on this machine.
|
<div class="mt-2 screenshot-requirement">
|
||||||
|
Screenshot mode requires Chromium to be installed for Playwright. Run
|
||||||
|
<code>uv run playwright install chromium</code> once on this machine.
|
||||||
|
</div>
|
||||||
|
</section>
|
||||||
|
<section class="mt-4 pt-3 border-top border-secondary-subtle">
|
||||||
|
<div class="d-flex flex-wrap align-items-center justify-content-between gap-2 mb-2">
|
||||||
|
<h3 class="h6 text-uppercase text-muted mb-0">Image Delivery</h3>
|
||||||
|
<span class="badge {{ 'bg-info' if media_gallery_image_limit < max_media_gallery_items else 'bg-secondary' }}">
|
||||||
|
{% if media_gallery_image_limit == 0 %}
|
||||||
|
No images
|
||||||
|
{% elif media_gallery_image_limit == 1 %}
|
||||||
|
First image only
|
||||||
|
{% else %}
|
||||||
|
Up to {{ media_gallery_image_limit }} images
|
||||||
|
{% endif %}
|
||||||
|
</span>
|
||||||
|
</div>
|
||||||
|
<p id="imageDeliveryHelp" class="text-muted mb-3">
|
||||||
|
Choose 0 to send no entry images, 1 for the first image only,
|
||||||
|
or up to {{ max_media_gallery_items }} for a Discord media gallery.
|
||||||
|
This only affects embed delivery.
|
||||||
|
</p>
|
||||||
|
<form action="/set_feed_media_gallery_image_limit"
|
||||||
|
method="post"
|
||||||
|
class="mb-0">
|
||||||
|
<input type="hidden" name="feed_url" value="{{ feed.url }}" />
|
||||||
|
<label class="form-label small text-muted mb-2" for="image_limit">
|
||||||
|
Images per entry:
|
||||||
|
<output name="image_limit_value" for="image_limit">{{ media_gallery_image_limit }}</output>
|
||||||
|
</label>
|
||||||
|
<div class="d-flex flex-wrap align-items-center gap-3">
|
||||||
|
<div class="flex-grow-1 image-limit-control">
|
||||||
|
<input id="image_limit"
|
||||||
|
class="form-range"
|
||||||
|
type="range"
|
||||||
|
name="image_limit"
|
||||||
|
min="0"
|
||||||
|
max="{{ max_media_gallery_items }}"
|
||||||
|
step="1"
|
||||||
|
value="{{ media_gallery_image_limit }}"
|
||||||
|
aria-describedby="imageDeliveryHelp"
|
||||||
|
oninput="this.form.elements.image_limit_value.value = this.value" />
|
||||||
|
<div class="d-flex justify-content-between text-muted small">
|
||||||
|
<span>0</span>
|
||||||
|
<span>{{ max_media_gallery_items }}</span>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
<button class="btn btn-outline-light btn-sm" type="submit">Save image limit</button>
|
||||||
</div>
|
</div>
|
||||||
{% endif %}
|
</form>
|
||||||
</div>
|
</section>
|
||||||
</section>
|
{% endif %}
|
||||||
<section class="mt-4 pt-3 border-top border-secondary-subtle">
|
<section class="mt-4 pt-3 border-top border-secondary-subtle">
|
||||||
<h3 class="h6 text-uppercase text-muted mb-3">Customization</h3>
|
<h3 class="h6 text-uppercase text-muted mb-3">Customization</h3>
|
||||||
<div class="d-flex flex-wrap gap-2">
|
<div class="d-flex flex-wrap gap-2">
|
||||||
|
|
|
||||||
198
discord_rss_bot/webhook.py
Normal file
198
discord_rss_bot/webhook.py
Normal file
|
|
@ -0,0 +1,198 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Any
|
||||||
|
from typing import cast
|
||||||
|
|
||||||
|
type JsonValue = bool | int | float | str | list[JsonValue] | dict[str, JsonValue] | None
|
||||||
|
type JsonObject = dict[str, JsonValue]
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class WebhookFile:
|
||||||
|
"""A file uploaded with a Discord webhook request."""
|
||||||
|
|
||||||
|
filename: str
|
||||||
|
content: bytes
|
||||||
|
|
||||||
|
|
||||||
|
class DiscordEmbed:
|
||||||
|
"""Small Discord embed payload builder used by the webhook sender."""
|
||||||
|
|
||||||
|
def __init__(self) -> None: # noqa: D107
|
||||||
|
self._payload: JsonObject = {}
|
||||||
|
|
||||||
|
def to_dict(self) -> JsonObject:
|
||||||
|
"""Return the JSON payload for this embed."""
|
||||||
|
return cast("JsonObject", dict(self._payload))
|
||||||
|
|
||||||
|
def set_description(self, description: str) -> None:
|
||||||
|
self._payload["description"] = description
|
||||||
|
|
||||||
|
def set_title(self, title: str) -> None:
|
||||||
|
self._payload["title"] = title
|
||||||
|
|
||||||
|
def set_url(self, url: str) -> None:
|
||||||
|
self._payload["url"] = url
|
||||||
|
|
||||||
|
def set_color(self, color: int | str) -> None:
|
||||||
|
if isinstance(color, int):
|
||||||
|
self._payload["color"] = color
|
||||||
|
return
|
||||||
|
|
||||||
|
normalized_color: str = color.removeprefix("#")
|
||||||
|
self._payload["color"] = int(normalized_color, 16)
|
||||||
|
|
||||||
|
def set_author(self, *, name: str, url: str | None = None, icon_url: str | None = None) -> None:
|
||||||
|
author: JsonObject = {"name": name}
|
||||||
|
if url:
|
||||||
|
author["url"] = url
|
||||||
|
if icon_url:
|
||||||
|
author["icon_url"] = icon_url
|
||||||
|
self._payload["author"] = author
|
||||||
|
|
||||||
|
def set_thumbnail(self, *, url: str) -> None:
|
||||||
|
self._payload["thumbnail"] = {"url": url}
|
||||||
|
|
||||||
|
def set_image(self, *, url: str, **_ignored: Any) -> None: # noqa: ANN401
|
||||||
|
self._payload["image"] = {"url": url}
|
||||||
|
|
||||||
|
def set_footer(self, *, text: str, icon_url: str | None = None) -> None:
|
||||||
|
footer: JsonObject = {"text": text}
|
||||||
|
if icon_url:
|
||||||
|
footer["icon_url"] = icon_url
|
||||||
|
self._payload["footer"] = footer
|
||||||
|
|
||||||
|
def add_embed_field(self, *, name: str, value: str, inline: bool | None = None) -> None:
|
||||||
|
fields = self._payload.setdefault("fields", [])
|
||||||
|
if not isinstance(fields, list):
|
||||||
|
fields = []
|
||||||
|
self._payload["fields"] = fields
|
||||||
|
|
||||||
|
field: JsonObject = {"name": name, "value": value}
|
||||||
|
if inline is not None:
|
||||||
|
field["inline"] = inline
|
||||||
|
fields.append(field)
|
||||||
|
|
||||||
|
def set_timestamp(self, *, timestamp: str) -> None:
|
||||||
|
self._payload["timestamp"] = timestamp
|
||||||
|
|
||||||
|
|
||||||
|
class DiscordWebhook:
|
||||||
|
"""Discord webhook request data.
|
||||||
|
|
||||||
|
This intentionally mirrors the subset of `discord-webhook` used by the app
|
||||||
|
while leaving the actual HTTP transport to `httpx`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__( # noqa: D107
|
||||||
|
self,
|
||||||
|
url: str,
|
||||||
|
*,
|
||||||
|
content: str | None = None,
|
||||||
|
username: str | None = None,
|
||||||
|
avatar_url: str | None = None,
|
||||||
|
tts: bool | None = None,
|
||||||
|
allowed_mentions: JsonObject | None = None,
|
||||||
|
flags: int | None = None,
|
||||||
|
components: list[JsonValue] | None = None,
|
||||||
|
thread_id: str | None = None,
|
||||||
|
timeout: float | None = None,
|
||||||
|
rate_limit_retry: bool = False,
|
||||||
|
**_ignored: Any, # noqa: ANN401
|
||||||
|
) -> None:
|
||||||
|
self.url: str = url
|
||||||
|
self.thread_id: str | None = thread_id
|
||||||
|
self.timeout: int | float = timeout or 30.0
|
||||||
|
self.rate_limit_retry: bool = rate_limit_retry
|
||||||
|
self.files: list[WebhookFile] = []
|
||||||
|
self._payload: JsonObject = {}
|
||||||
|
|
||||||
|
if content is not None:
|
||||||
|
self._payload["content"] = content
|
||||||
|
if username:
|
||||||
|
self._payload["username"] = username
|
||||||
|
if avatar_url:
|
||||||
|
self._payload["avatar_url"] = avatar_url
|
||||||
|
if tts is not None:
|
||||||
|
self._payload["tts"] = tts
|
||||||
|
if allowed_mentions is not None:
|
||||||
|
self._payload["allowed_mentions"] = allowed_mentions
|
||||||
|
if flags is not None:
|
||||||
|
self._payload["flags"] = flags
|
||||||
|
if components is not None:
|
||||||
|
self._payload["components"] = components
|
||||||
|
|
||||||
|
@property
|
||||||
|
def json(self) -> JsonObject:
|
||||||
|
return self._payload
|
||||||
|
|
||||||
|
@property
|
||||||
|
def content(self) -> str | None:
|
||||||
|
value = self._payload.get("content")
|
||||||
|
return value if isinstance(value, str) else None
|
||||||
|
|
||||||
|
@content.setter
|
||||||
|
def content(self, value: str | None) -> None:
|
||||||
|
if value is None:
|
||||||
|
self._payload.pop("content", None)
|
||||||
|
else:
|
||||||
|
self._payload["content"] = value
|
||||||
|
|
||||||
|
@property
|
||||||
|
def username(self) -> str | None:
|
||||||
|
value = self._payload.get("username")
|
||||||
|
return value if isinstance(value, str) else None
|
||||||
|
|
||||||
|
@username.setter
|
||||||
|
def username(self, value: str | None) -> None:
|
||||||
|
if value:
|
||||||
|
self._payload["username"] = value
|
||||||
|
else:
|
||||||
|
self._payload.pop("username", None)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def avatar_url(self) -> str | None:
|
||||||
|
value = self._payload.get("avatar_url")
|
||||||
|
return value if isinstance(value, str) else None
|
||||||
|
|
||||||
|
@avatar_url.setter
|
||||||
|
def avatar_url(self, value: str | None) -> None:
|
||||||
|
if value:
|
||||||
|
self._payload["avatar_url"] = value
|
||||||
|
else:
|
||||||
|
self._payload.pop("avatar_url", None)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def components(self) -> list[JsonValue]:
|
||||||
|
value = self._payload.get("components")
|
||||||
|
return cast("list[JsonValue]", value) if isinstance(value, list) else []
|
||||||
|
|
||||||
|
@components.setter
|
||||||
|
def components(self, value: list[JsonValue]) -> None:
|
||||||
|
self._payload["components"] = value
|
||||||
|
|
||||||
|
@property
|
||||||
|
def flags(self) -> int | None:
|
||||||
|
value = self._payload.get("flags")
|
||||||
|
return value if isinstance(value, int) else None
|
||||||
|
|
||||||
|
@flags.setter
|
||||||
|
def flags(self, value: int | None) -> None:
|
||||||
|
if value is None:
|
||||||
|
self._payload.pop("flags", None)
|
||||||
|
else:
|
||||||
|
self._payload["flags"] = value
|
||||||
|
|
||||||
|
def add_file(self, *, file: bytes, filename: str) -> None:
|
||||||
|
self.files.append(WebhookFile(filename=filename, content=file))
|
||||||
|
|
||||||
|
def add_embed(self, embed: DiscordEmbed) -> None:
|
||||||
|
embeds = self._payload.setdefault("embeds", [])
|
||||||
|
if not isinstance(embeds, list):
|
||||||
|
embeds = []
|
||||||
|
self._payload["embeds"] = embeds
|
||||||
|
embeds.append(embed.to_dict())
|
||||||
|
|
||||||
|
def remove_embeds(self) -> None:
|
||||||
|
self._payload.pop("embeds", None)
|
||||||
|
|
@ -6,7 +6,6 @@ readme = "README.md"
|
||||||
requires-python = ">=3.12"
|
requires-python = ">=3.12"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"apscheduler>=3.11.0",
|
"apscheduler>=3.11.0",
|
||||||
"discord-webhook",
|
|
||||||
"fastapi",
|
"fastapi",
|
||||||
"httpx",
|
"httpx",
|
||||||
"jinja2",
|
"jinja2",
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@ from discord_rss_bot.custom_message import get_custom_message
|
||||||
from discord_rss_bot.custom_message import get_embed
|
from discord_rss_bot.custom_message import get_embed
|
||||||
from discord_rss_bot.custom_message import get_embed_data
|
from discord_rss_bot.custom_message import get_embed_data
|
||||||
from discord_rss_bot.custom_message import get_first_image
|
from discord_rss_bot.custom_message import get_first_image
|
||||||
|
from discord_rss_bot.custom_message import get_image_urls
|
||||||
from discord_rss_bot.custom_message import replace_tags_in_embed
|
from discord_rss_bot.custom_message import replace_tags_in_embed
|
||||||
from discord_rss_bot.custom_message import replace_tags_in_text_message
|
from discord_rss_bot.custom_message import replace_tags_in_text_message
|
||||||
from discord_rss_bot.custom_message import save_embed
|
from discord_rss_bot.custom_message import save_embed
|
||||||
|
|
@ -203,6 +204,34 @@ def test_get_first_image_uses_summary_when_content_image_is_invalid() -> None:
|
||||||
assert image == "https://example.com/from-summary.jpg"
|
assert image == "https://example.com/from-summary.jpg"
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_image_urls_returns_all_valid_images_in_order_without_duplicates() -> None:
|
||||||
|
summary = (
|
||||||
|
'<p><img src="https://example.com/from-summary.jpg" /><img src="https://example.com/from-content-1.jpg" /></p>'
|
||||||
|
)
|
||||||
|
content = (
|
||||||
|
'<p><img src="https://example.com/from-content-1.jpg" />'
|
||||||
|
'<img src="javascript:alert(1)" />'
|
||||||
|
'<img src="https://example.com/from-content-2.jpg" /></p>'
|
||||||
|
)
|
||||||
|
|
||||||
|
images = get_image_urls(summary, content)
|
||||||
|
|
||||||
|
assert images == [
|
||||||
|
"https://example.com/from-content-1.jpg",
|
||||||
|
"https://example.com/from-content-2.jpg",
|
||||||
|
"https://example.com/from-summary.jpg",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_image_urls_respects_limit() -> None:
|
||||||
|
summary = '<img src="https://example.com/summary.jpg" />'
|
||||||
|
content = '<img src="https://example.com/one.jpg" /><img src="https://example.com/two.jpg" />'
|
||||||
|
|
||||||
|
images = get_image_urls(summary, content, limit=2)
|
||||||
|
|
||||||
|
assert images == ["https://example.com/one.jpg", "https://example.com/two.jpg"]
|
||||||
|
|
||||||
|
|
||||||
def test_get_first_image_returns_empty_when_images_have_no_src() -> None:
|
def test_get_first_image_returns_empty_when_images_have_no_src() -> None:
|
||||||
summary = "<p></p>"
|
summary = "<p></p>"
|
||||||
content = '<p><img alt="missing source" /></p>'
|
content = '<p><img alt="missing source" /></p>'
|
||||||
|
|
|
||||||
|
|
@ -39,6 +39,12 @@ from discord_rss_bot.feeds import should_send_embed_check
|
||||||
from discord_rss_bot.feeds import truncate_webhook_message
|
from discord_rss_bot.feeds import truncate_webhook_message
|
||||||
|
|
||||||
|
|
||||||
|
def get_test_webhook_components(webhook: feeds.DiscordWebhook) -> list[feeds.JsonValue]:
|
||||||
|
components = webhook.json.get("components")
|
||||||
|
assert isinstance(components, list)
|
||||||
|
return components
|
||||||
|
|
||||||
|
|
||||||
def test_send_to_discord() -> None:
|
def test_send_to_discord() -> None:
|
||||||
"""Test sending to Discord."""
|
"""Test sending to Discord."""
|
||||||
# Skip early if no webhook URL is configured to avoid a real network request.
|
# Skip early if no webhook URL is configured to avoid a real network request.
|
||||||
|
|
@ -332,6 +338,45 @@ def test_get_screenshot_layout_defaults_to_desktop() -> None:
|
||||||
assert result == "desktop"
|
assert result == "desktop"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
("tag_value", "expected_limit"),
|
||||||
|
[
|
||||||
|
(0, 0),
|
||||||
|
(1, 1),
|
||||||
|
(7, 7),
|
||||||
|
(-1, 0),
|
||||||
|
(99, 10),
|
||||||
|
("first", 1),
|
||||||
|
("off", 0),
|
||||||
|
("8", 8),
|
||||||
|
("unknown", 1),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_get_feed_media_gallery_image_limit_normalizes_stored_tag(
|
||||||
|
tag_value: feeds.JsonValue,
|
||||||
|
expected_limit: int,
|
||||||
|
) -> None:
|
||||||
|
reader = MagicMock()
|
||||||
|
feed = MagicMock()
|
||||||
|
feed.url = "https://example.com/feed.xml"
|
||||||
|
reader.get_tag.return_value = tag_value
|
||||||
|
|
||||||
|
result = feeds.get_feed_media_gallery_image_limit(reader, feed)
|
||||||
|
|
||||||
|
assert result == expected_limit
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_feed_media_gallery_image_limit_defaults_to_first_image() -> None:
|
||||||
|
reader = MagicMock()
|
||||||
|
feed = MagicMock()
|
||||||
|
feed.url = "https://example.com/feed.xml"
|
||||||
|
reader.get_tag.side_effect = lambda resource, key, default=None: default # noqa: ARG005
|
||||||
|
|
||||||
|
result = feeds.get_feed_media_gallery_image_limit(reader, feed)
|
||||||
|
|
||||||
|
assert result == 1
|
||||||
|
|
||||||
|
|
||||||
def test_create_feed_inherits_global_screenshot_layout() -> None:
|
def test_create_feed_inherits_global_screenshot_layout() -> None:
|
||||||
reader = MagicMock()
|
reader = MagicMock()
|
||||||
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
|
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
|
||||||
|
|
@ -368,7 +413,24 @@ def test_create_feed_enables_sent_webhook_tracking_by_default() -> None:
|
||||||
|
|
||||||
create_feed(reader, "https://example.com/feed.xml", "Main")
|
create_feed(reader, "https://example.com/feed.xml", "Main")
|
||||||
|
|
||||||
reader.set_tag.assert_any_call("https://example.com/feed.xml", feeds.SAVE_SENT_WEBHOOKS_TAG, True)
|
reader.set_tag.assert_any_call("https://example.com/feed.xml", "save_sent_webhooks", True)
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_feed_sets_default_media_gallery_image_limit() -> None:
|
||||||
|
reader = MagicMock()
|
||||||
|
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
|
||||||
|
"webhooks": [{"name": "Main", "url": "https://discord.com/api/webhooks/123/abc"}],
|
||||||
|
"screenshot_layout": "desktop",
|
||||||
|
"delivery_mode": "embed",
|
||||||
|
}.get(key, default)
|
||||||
|
|
||||||
|
create_feed(reader, "https://example.com/feed.xml", "Main")
|
||||||
|
|
||||||
|
reader.set_tag.assert_any_call(
|
||||||
|
"https://example.com/feed.xml",
|
||||||
|
"media_gallery_image_limit",
|
||||||
|
1,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_create_feed_falls_back_to_embed_when_global_delivery_mode_is_invalid() -> None:
|
def test_create_feed_falls_back_to_embed_when_global_delivery_mode_is_invalid() -> None:
|
||||||
|
|
@ -568,6 +630,195 @@ def test_create_screenshot_webhook_falls_back_to_text_on_failure(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@patch("discord_rss_bot.feeds.fetch_ttvdrops_campaign_media_items", return_value=[])
|
||||||
|
@patch("discord_rss_bot.feeds.replace_tags_in_embed")
|
||||||
|
def test_create_embed_webhook_uses_media_gallery_for_entry_images(
|
||||||
|
mock_replace_tags_in_embed: MagicMock,
|
||||||
|
mock_fetch_ttvdrops_campaign_media_items: MagicMock,
|
||||||
|
) -> None:
|
||||||
|
reader = MagicMock()
|
||||||
|
reader.get_tag.return_value = 10
|
||||||
|
entry = MagicMock()
|
||||||
|
entry.id = "entry-1"
|
||||||
|
entry.title = "Entry title"
|
||||||
|
entry.link = "https://example.com/entry"
|
||||||
|
entry.summary = '<img src="https://example.com/summary.jpg" />'
|
||||||
|
entry.content = [
|
||||||
|
MagicMock(value='<img src="https://example.com/content-1.jpg" />'),
|
||||||
|
MagicMock(value='<img src="https://example.com/content-2.jpg" />'),
|
||||||
|
]
|
||||||
|
entry.feed.url = "https://example.com/feed.xml"
|
||||||
|
mock_replace_tags_in_embed.return_value = feeds.CustomEmbed(
|
||||||
|
description="Entry body",
|
||||||
|
author_name="Entry title",
|
||||||
|
author_url="https://example.com/entry",
|
||||||
|
)
|
||||||
|
|
||||||
|
webhook = feeds.create_embed_webhook("https://discord.com/api/webhooks/123/abc", entry, reader)
|
||||||
|
|
||||||
|
assert webhook.flags == 1 << 15
|
||||||
|
components = get_test_webhook_components(webhook)
|
||||||
|
assert components[0] == {
|
||||||
|
"type": 10,
|
||||||
|
"content": "## [Entry title](https://example.com/entry)\n\nEntry body",
|
||||||
|
}
|
||||||
|
gallery = components[1]
|
||||||
|
assert isinstance(gallery, dict)
|
||||||
|
assert gallery["type"] == 12
|
||||||
|
mock_fetch_ttvdrops_campaign_media_items.assert_called_once_with(entry)
|
||||||
|
assert gallery["items"] == [
|
||||||
|
{"media": {"url": "https://example.com/content-1.jpg"}, "description": "Entry title"},
|
||||||
|
{"media": {"url": "https://example.com/content-2.jpg"}, "description": "Entry title"},
|
||||||
|
{"media": {"url": "https://example.com/summary.jpg"}, "description": "Entry title"},
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@patch("discord_rss_bot.feeds.fetch_ttvdrops_campaign_media_items", return_value=[])
|
||||||
|
@patch("discord_rss_bot.feeds.replace_tags_in_embed")
|
||||||
|
def test_create_embed_webhook_can_limit_media_gallery_to_first_image(
|
||||||
|
mock_replace_tags_in_embed: MagicMock,
|
||||||
|
mock_fetch_ttvdrops_campaign_media_items: MagicMock,
|
||||||
|
) -> None:
|
||||||
|
reader = MagicMock()
|
||||||
|
reader.get_tag.return_value = 1
|
||||||
|
entry = MagicMock()
|
||||||
|
entry.id = "entry-1"
|
||||||
|
entry.title = "Entry title"
|
||||||
|
entry.link = "https://example.com/entry"
|
||||||
|
entry.summary = '<img src="https://example.com/summary.jpg" />'
|
||||||
|
entry.content = [
|
||||||
|
MagicMock(value='<img src="https://example.com/content-1.jpg" />'),
|
||||||
|
MagicMock(value='<img src="https://example.com/content-2.jpg" />'),
|
||||||
|
]
|
||||||
|
entry.feed.url = "https://example.com/feed.xml"
|
||||||
|
mock_replace_tags_in_embed.return_value = feeds.CustomEmbed(description="Entry body")
|
||||||
|
|
||||||
|
webhook = feeds.create_embed_webhook("https://discord.com/api/webhooks/123/abc", entry, reader)
|
||||||
|
|
||||||
|
gallery = get_test_webhook_components(webhook)[1]
|
||||||
|
assert isinstance(gallery, dict)
|
||||||
|
mock_fetch_ttvdrops_campaign_media_items.assert_called_once_with(entry)
|
||||||
|
assert gallery["items"] == [
|
||||||
|
{"media": {"url": "https://example.com/content-1.jpg"}, "description": "Entry title"},
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@patch("discord_rss_bot.feeds.fetch_ttvdrops_campaign_media_items", return_value=[])
|
||||||
|
@patch("discord_rss_bot.feeds.replace_tags_in_embed")
|
||||||
|
def test_create_embed_webhook_can_disable_media_images(
|
||||||
|
mock_replace_tags_in_embed: MagicMock,
|
||||||
|
mock_fetch_ttvdrops_campaign_media_items: MagicMock,
|
||||||
|
) -> None:
|
||||||
|
reader = MagicMock()
|
||||||
|
reader.get_tag.return_value = 0
|
||||||
|
entry = MagicMock()
|
||||||
|
entry.id = "entry-1"
|
||||||
|
entry.title = "Entry title"
|
||||||
|
entry.link = "https://example.com/entry"
|
||||||
|
entry.summary = '<img src="https://example.com/summary.jpg" />'
|
||||||
|
entry.content = [MagicMock(value='<img src="https://example.com/content-1.jpg" />')]
|
||||||
|
entry.feed.url = "https://example.com/feed.xml"
|
||||||
|
mock_replace_tags_in_embed.return_value = feeds.CustomEmbed(
|
||||||
|
description="Entry body",
|
||||||
|
image_url="https://example.com/custom-image.jpg",
|
||||||
|
thumbnail_url="https://example.com/custom-thumbnail.jpg",
|
||||||
|
)
|
||||||
|
|
||||||
|
webhook = feeds.create_embed_webhook("https://discord.com/api/webhooks/123/abc", entry, reader)
|
||||||
|
|
||||||
|
assert "components" not in webhook.json
|
||||||
|
assert "embeds" in webhook.json
|
||||||
|
embeds = webhook.json["embeds"]
|
||||||
|
assert isinstance(embeds, list)
|
||||||
|
assert isinstance(embeds[0], dict)
|
||||||
|
assert "image" not in embeds[0]
|
||||||
|
assert "thumbnail" not in embeds[0]
|
||||||
|
mock_fetch_ttvdrops_campaign_media_items.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
@patch("discord_rss_bot.feeds.fetch_ttvdrops_campaign_media_items")
|
||||||
|
@patch("discord_rss_bot.feeds.replace_tags_in_embed")
|
||||||
|
def test_create_embed_webhook_prefers_ttvdrops_reward_images_and_alt_text(
|
||||||
|
mock_replace_tags_in_embed: MagicMock,
|
||||||
|
mock_fetch_ttvdrops_campaign_media_items: MagicMock,
|
||||||
|
) -> None:
|
||||||
|
reader = MagicMock()
|
||||||
|
entry = MagicMock()
|
||||||
|
entry.id = "entry-2"
|
||||||
|
entry.title = "Drop campaign"
|
||||||
|
entry.link = "https://ttvdrops.lovinator.space/twitch/campaigns/93ba35ae-5bfc-43fe-88ac-49a0aabb2fe2/"
|
||||||
|
entry.summary = '<img src="https://example.com/feed-image.jpg" />'
|
||||||
|
entry.content = []
|
||||||
|
entry.feed.url = "https://ttvdrops.lovinator.space/feed.xml"
|
||||||
|
mock_replace_tags_in_embed.return_value = feeds.CustomEmbed(description="Campaign body")
|
||||||
|
mock_fetch_ttvdrops_campaign_media_items.return_value = [
|
||||||
|
{
|
||||||
|
"url": "https://ttvdrops.lovinator.space/media/benefits/images/reward.png",
|
||||||
|
"description": "120 minutes watched: Skulbladi",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
webhook = feeds.create_embed_webhook("https://discord.com/api/webhooks/123/abc", entry, reader)
|
||||||
|
|
||||||
|
gallery = get_test_webhook_components(webhook)[1]
|
||||||
|
assert isinstance(gallery, dict)
|
||||||
|
assert gallery["items"] == [
|
||||||
|
{
|
||||||
|
"media": {"url": "https://ttvdrops.lovinator.space/media/benefits/images/reward.png"},
|
||||||
|
"description": "120 minutes watched: Skulbladi",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_ttvdrops_campaign_api_url_from_campaign_page() -> None:
|
||||||
|
entry = MagicMock()
|
||||||
|
entry.link = "https://ttvdrops.lovinator.space/twitch/campaigns/93ba35ae-5bfc-43fe-88ac-49a0aabb2fe2/"
|
||||||
|
entry.id = "entry-3"
|
||||||
|
entry.feed.url = "https://example.com/feed.xml"
|
||||||
|
|
||||||
|
api_url = feeds.get_ttvdrops_campaign_api_url(entry)
|
||||||
|
|
||||||
|
assert api_url == "https://ttvdrops.lovinator.space/twitch/api/v1/campaigns/93ba35ae-5bfc-43fe-88ac-49a0aabb2fe2/"
|
||||||
|
|
||||||
|
|
||||||
|
@patch("discord_rss_bot.feeds.httpx.get")
|
||||||
|
def test_fetch_ttvdrops_campaign_media_items_extracts_reward_alt_text(mock_get: MagicMock) -> None:
|
||||||
|
response = MagicMock()
|
||||||
|
response.status_code = 200
|
||||||
|
response.json.return_value = {
|
||||||
|
"image_url": "/media/campaigns/images/campaign.png",
|
||||||
|
"drops": [
|
||||||
|
{
|
||||||
|
"name": "Drop",
|
||||||
|
"required_minutes_watched": 120,
|
||||||
|
"benefits": [
|
||||||
|
{"name": "Skulbladi", "image_url": "/media/benefits/images/reward.png"},
|
||||||
|
{"image_url": "javascript:alert(1)"},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
mock_get.return_value = response
|
||||||
|
entry = MagicMock()
|
||||||
|
entry.link = "https://ttvdrops.lovinator.space/twitch/campaigns/93ba35ae-5bfc-43fe-88ac-49a0aabb2fe2/"
|
||||||
|
entry.id = "entry-4"
|
||||||
|
entry.feed.url = "https://example.com/feed.xml"
|
||||||
|
|
||||||
|
media_items = feeds.fetch_ttvdrops_campaign_media_items(entry)
|
||||||
|
|
||||||
|
assert media_items == [
|
||||||
|
{
|
||||||
|
"url": "https://ttvdrops.lovinator.space/media/benefits/images/reward.png",
|
||||||
|
"description": "120 minutes watched: Skulbladi",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
mock_get.assert_called_once_with(
|
||||||
|
"https://ttvdrops.lovinator.space/twitch/api/v1/campaigns/93ba35ae-5bfc-43fe-88ac-49a0aabb2fe2/",
|
||||||
|
follow_redirects=True,
|
||||||
|
timeout=10.0,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_capture_full_page_screenshot_uses_thread_when_loop_running() -> None:
|
def test_capture_full_page_screenshot_uses_thread_when_loop_running() -> None:
|
||||||
"""Capture should offload sync Playwright work when called from an active event loop."""
|
"""Capture should offload sync Playwright work when called from an active event loop."""
|
||||||
with patch("discord_rss_bot.feeds._capture_full_page_screenshot_sync", return_value=b"png") as mock_capture_sync:
|
with patch("discord_rss_bot.feeds._capture_full_page_screenshot_sync", return_value=b"png") as mock_capture_sync:
|
||||||
|
|
@ -871,10 +1122,14 @@ def test_execute_webhook_skips_when_feed_missing() -> None:
|
||||||
|
|
||||||
|
|
||||||
@patch.object(feeds, "logger")
|
@patch.object(feeds, "logger")
|
||||||
def test_execute_webhook_logs_error_on_bad_status(mock_logger: MagicMock) -> None:
|
@patch("discord_rss_bot.feeds.send_webhook_message")
|
||||||
|
def test_execute_webhook_logs_error_on_bad_status(
|
||||||
|
mock_send_webhook_message: MagicMock,
|
||||||
|
mock_logger: MagicMock,
|
||||||
|
) -> None:
|
||||||
webhook = MagicMock()
|
webhook = MagicMock()
|
||||||
webhook.json = {"content": "test"}
|
webhook.json = {"content": "test"}
|
||||||
webhook.execute.return_value = MagicMock(status_code=500, text="fail")
|
mock_send_webhook_message.return_value = MagicMock(status_code=500, text="fail")
|
||||||
reader = MagicMock()
|
reader = MagicMock()
|
||||||
entry = MagicMock()
|
entry = MagicMock()
|
||||||
entry.id = "entry-8"
|
entry.id = "entry-8"
|
||||||
|
|
@ -887,9 +1142,13 @@ def test_execute_webhook_logs_error_on_bad_status(mock_logger: MagicMock) -> Non
|
||||||
|
|
||||||
|
|
||||||
@patch.object(feeds, "logger")
|
@patch.object(feeds, "logger")
|
||||||
def test_execute_webhook_logs_info_on_success(mock_logger: MagicMock) -> None:
|
@patch("discord_rss_bot.feeds.send_webhook_message")
|
||||||
|
def test_execute_webhook_logs_info_on_success(
|
||||||
|
mock_send_webhook_message: MagicMock,
|
||||||
|
mock_logger: MagicMock,
|
||||||
|
) -> None:
|
||||||
webhook = MagicMock()
|
webhook = MagicMock()
|
||||||
webhook.execute.return_value = MagicMock(status_code=204, text="")
|
mock_send_webhook_message.return_value = MagicMock(status_code=204, text="")
|
||||||
reader = MagicMock()
|
reader = MagicMock()
|
||||||
entry = MagicMock()
|
entry = MagicMock()
|
||||||
entry.id = "entry-9"
|
entry.id = "entry-9"
|
||||||
|
|
@ -901,14 +1160,15 @@ def test_execute_webhook_logs_info_on_success(mock_logger: MagicMock) -> None:
|
||||||
mock_logger.info.assert_called_once_with("Sent entry to Discord: %s", "entry-9")
|
mock_logger.info.assert_called_once_with("Sent entry to Discord: %s", "entry-9")
|
||||||
|
|
||||||
|
|
||||||
def test_execute_webhook_records_sent_webhook_message() -> None:
|
@patch("discord_rss_bot.feeds.send_webhook_message")
|
||||||
|
def test_execute_webhook_records_sent_webhook_message(mock_send_webhook_message: MagicMock) -> None:
|
||||||
webhook_url = "https://discord.com/api/webhooks/123/abc"
|
webhook_url = "https://discord.com/api/webhooks/123/abc"
|
||||||
state: dict[str, feeds.JsonValue] = {}
|
state: dict[str, feeds.JsonValue] = {}
|
||||||
|
|
||||||
def get_tag(_resource: str | tuple[()], key: str, default: feeds.JsonValue = None) -> feeds.JsonValue:
|
def get_tag(_resource: str | tuple[()], key: str, default: feeds.JsonValue = None) -> feeds.JsonValue:
|
||||||
if key == feeds.SENT_WEBHOOKS_TAG:
|
if key == "sent_webhooks":
|
||||||
return state.get(feeds.SENT_WEBHOOKS_TAG, default)
|
return state.get("sent_webhooks", default)
|
||||||
if key == feeds.SAVE_SENT_WEBHOOKS_TAG:
|
if key == "save_sent_webhooks":
|
||||||
return True
|
return True
|
||||||
if key == "webhook":
|
if key == "webhook":
|
||||||
return webhook_url
|
return webhook_url
|
||||||
|
|
@ -939,11 +1199,11 @@ def test_execute_webhook_records_sent_webhook_message() -> None:
|
||||||
response.status_code = 200
|
response.status_code = 200
|
||||||
response.text = '{"id": "message-1"}'
|
response.text = '{"id": "message-1"}'
|
||||||
response.json.return_value = {"id": "message-1"}
|
response.json.return_value = {"id": "message-1"}
|
||||||
webhook.execute.return_value = response
|
mock_send_webhook_message.return_value = response
|
||||||
|
|
||||||
execute_webhook(webhook, entry, reader)
|
execute_webhook(webhook, entry, reader)
|
||||||
|
|
||||||
records = state[feeds.SENT_WEBHOOKS_TAG]
|
records = state["sent_webhooks"]
|
||||||
assert isinstance(records, list)
|
assert isinstance(records, list)
|
||||||
assert len(records) == 1
|
assert len(records) == 1
|
||||||
assert isinstance(records[0], dict)
|
assert isinstance(records[0], dict)
|
||||||
|
|
@ -959,11 +1219,12 @@ def test_execute_webhook_records_sent_webhook_message() -> None:
|
||||||
assert records[0]["payload"]["content"] == "Entry title"
|
assert records[0]["payload"]["content"] == "Entry title"
|
||||||
|
|
||||||
|
|
||||||
def test_execute_webhook_does_not_record_when_feed_tracking_disabled() -> None:
|
@patch("discord_rss_bot.feeds.send_webhook_message")
|
||||||
|
def test_execute_webhook_does_not_record_when_feed_tracking_disabled(mock_send_webhook_message: MagicMock) -> None:
|
||||||
webhook_url = "https://discord.com/api/webhooks/123/abc"
|
webhook_url = "https://discord.com/api/webhooks/123/abc"
|
||||||
reader = MagicMock()
|
reader = MagicMock()
|
||||||
reader.get_tag.side_effect = lambda _resource, key, default=None: {
|
reader.get_tag.side_effect = lambda _resource, key, default=None: {
|
||||||
feeds.SAVE_SENT_WEBHOOKS_TAG: False,
|
"save_sent_webhooks": False,
|
||||||
"webhook": webhook_url,
|
"webhook": webhook_url,
|
||||||
}.get(key, default)
|
}.get(key, default)
|
||||||
|
|
||||||
|
|
@ -979,13 +1240,62 @@ def test_execute_webhook_does_not_record_when_feed_tracking_disabled() -> None:
|
||||||
response.status_code = 200
|
response.status_code = 200
|
||||||
response.text = '{"id": "message-2"}'
|
response.text = '{"id": "message-2"}'
|
||||||
response.json.return_value = {"id": "message-2"}
|
response.json.return_value = {"id": "message-2"}
|
||||||
webhook.execute.return_value = response
|
mock_send_webhook_message.return_value = response
|
||||||
|
|
||||||
execute_webhook(webhook, entry, reader)
|
execute_webhook(webhook, entry, reader)
|
||||||
|
|
||||||
reader.set_tag.assert_not_called()
|
reader.set_tag.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
@patch("discord_rss_bot.feeds.httpx.request")
|
||||||
|
def test_send_webhook_message_posts_components_with_httpx(mock_request: MagicMock) -> None:
|
||||||
|
response = MagicMock(status_code=200, text='{"id": "message-1"}')
|
||||||
|
mock_request.return_value = response
|
||||||
|
components: list[feeds.JsonValue] = [
|
||||||
|
{
|
||||||
|
"type": 10,
|
||||||
|
"content": "# Component update",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
webhook = feeds.DiscordWebhook(
|
||||||
|
url="https://discord.com/api/webhooks/123/abc?thread_id=456",
|
||||||
|
flags=1 << 15,
|
||||||
|
components=components,
|
||||||
|
)
|
||||||
|
|
||||||
|
result = feeds.send_webhook_message(webhook, feeds.get_webhook_request_payload(webhook))
|
||||||
|
|
||||||
|
assert result is response
|
||||||
|
mock_request.assert_called_once()
|
||||||
|
assert mock_request.call_args.args == ("POST", "https://discord.com/api/webhooks/123/abc")
|
||||||
|
assert mock_request.call_args.kwargs["json"] == {
|
||||||
|
"components": components,
|
||||||
|
"flags": 1 << 15,
|
||||||
|
}
|
||||||
|
assert mock_request.call_args.kwargs["params"] == {
|
||||||
|
"thread_id": "456",
|
||||||
|
"wait": "true",
|
||||||
|
"with_components": "true",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@patch("discord_rss_bot.feeds.httpx.request")
|
||||||
|
def test_send_webhook_message_uploads_files_as_multipart(mock_request: MagicMock) -> None:
|
||||||
|
response = MagicMock(status_code=200, text='{"id": "message-2"}')
|
||||||
|
mock_request.return_value = response
|
||||||
|
webhook = feeds.DiscordWebhook(url="https://discord.com/api/webhooks/123/abc", content="Entry link")
|
||||||
|
webhook.add_file(file=b"image-bytes", filename="entry.png")
|
||||||
|
|
||||||
|
result = feeds.send_webhook_message(webhook, feeds.get_webhook_request_payload(webhook))
|
||||||
|
|
||||||
|
assert result is response
|
||||||
|
mock_request.assert_called_once()
|
||||||
|
assert mock_request.call_args.args == ("POST", "https://discord.com/api/webhooks/123/abc")
|
||||||
|
assert mock_request.call_args.kwargs["data"] == {"payload_json": '{"content": "Entry link"}'}
|
||||||
|
assert mock_request.call_args.kwargs["files"] == [("files[0]", ("entry.png", b"image-bytes"))]
|
||||||
|
assert "json" not in mock_request.call_args.kwargs
|
||||||
|
|
||||||
|
|
||||||
@patch("discord_rss_bot.feeds.edit_sent_webhook_message")
|
@patch("discord_rss_bot.feeds.edit_sent_webhook_message")
|
||||||
@patch("discord_rss_bot.feeds.create_webhook_for_entry")
|
@patch("discord_rss_bot.feeds.create_webhook_for_entry")
|
||||||
def test_update_sent_webhooks_for_modified_entries_edits_changed_payload(
|
def test_update_sent_webhooks_for_modified_entries_edits_changed_payload(
|
||||||
|
|
@ -995,7 +1305,7 @@ def test_update_sent_webhooks_for_modified_entries_edits_changed_payload(
|
||||||
webhook_url = "https://discord.com/api/webhooks/123/abc"
|
webhook_url = "https://discord.com/api/webhooks/123/abc"
|
||||||
old_payload: JsonObject = {"content": "Old title", "embeds": [], "attachments": []}
|
old_payload: JsonObject = {"content": "Old title", "embeds": [], "attachments": []}
|
||||||
state: dict[str, feeds.JsonValue] = {
|
state: dict[str, feeds.JsonValue] = {
|
||||||
feeds.SENT_WEBHOOKS_TAG: [
|
"sent_webhooks": [
|
||||||
{
|
{
|
||||||
"feed_url": "https://example.com/feed.xml",
|
"feed_url": "https://example.com/feed.xml",
|
||||||
"entry_id": "entry-3",
|
"entry_id": "entry-3",
|
||||||
|
|
@ -1009,9 +1319,9 @@ def test_update_sent_webhooks_for_modified_entries_edits_changed_payload(
|
||||||
}
|
}
|
||||||
|
|
||||||
def get_tag(_resource: str | tuple[()], key: str, default: feeds.JsonValue = None) -> feeds.JsonValue:
|
def get_tag(_resource: str | tuple[()], key: str, default: feeds.JsonValue = None) -> feeds.JsonValue:
|
||||||
if key == feeds.SENT_WEBHOOKS_TAG:
|
if key == "sent_webhooks":
|
||||||
return state[feeds.SENT_WEBHOOKS_TAG]
|
return state["sent_webhooks"]
|
||||||
if key == feeds.SAVE_SENT_WEBHOOKS_TAG:
|
if key == "save_sent_webhooks":
|
||||||
return True
|
return True
|
||||||
return default
|
return default
|
||||||
|
|
||||||
|
|
@ -1050,7 +1360,7 @@ def test_update_sent_webhooks_for_modified_entries_edits_changed_payload(
|
||||||
mock_edit_sent_webhook_message.assert_called_once()
|
mock_edit_sent_webhook_message.assert_called_once()
|
||||||
edit_payload = mock_edit_sent_webhook_message.call_args.args[3]
|
edit_payload = mock_edit_sent_webhook_message.call_args.args[3]
|
||||||
assert edit_payload == {"content": "New title"}
|
assert edit_payload == {"content": "New title"}
|
||||||
records = state[feeds.SENT_WEBHOOKS_TAG]
|
records = state["sent_webhooks"]
|
||||||
assert isinstance(records, list)
|
assert isinstance(records, list)
|
||||||
assert isinstance(records[0], dict)
|
assert isinstance(records[0], dict)
|
||||||
assert isinstance(records[0]["payload"], dict)
|
assert isinstance(records[0]["payload"], dict)
|
||||||
|
|
|
||||||
|
|
@ -229,6 +229,12 @@ def test_get() -> None:
|
||||||
|
|
||||||
response: Response = client.get(url="/feed", params={"feed_url": encoded_feed_url(feed_url)})
|
response: Response = client.get(url="/feed", params={"feed_url": encoded_feed_url(feed_url)})
|
||||||
assert response.status_code == 200, f"/feed failed: {response.text}"
|
assert response.status_code == 200, f"/feed failed: {response.text}"
|
||||||
|
assert "Feed Summary" in response.text
|
||||||
|
assert "This feed" in response.text
|
||||||
|
assert "Screenshot Delivery" in response.text
|
||||||
|
assert "Image Delivery" in response.text
|
||||||
|
assert 'type="range"' in response.text
|
||||||
|
assert 'max="10"' in response.text
|
||||||
|
|
||||||
response: Response = client.get(url="/")
|
response: Response = client.get(url="/")
|
||||||
assert response.status_code == 200, f"/ failed: {response.text}"
|
assert response.status_code == 200, f"/ failed: {response.text}"
|
||||||
|
|
@ -524,6 +530,8 @@ def test_c3kay_feed_delivery_mode_toggle_routes_update_stored_tags() -> None:
|
||||||
assert reader.get_tag(c3kay_feed_url, "delivery_mode") == "screenshot"
|
assert reader.get_tag(c3kay_feed_url, "delivery_mode") == "screenshot"
|
||||||
assert reader.get_tag(c3kay_feed_url, "screenshot_layout") == "mobile"
|
assert reader.get_tag(c3kay_feed_url, "screenshot_layout") == "mobile"
|
||||||
assert reader.get_tag(c3kay_feed_url, "should_send_embed") is False
|
assert reader.get_tag(c3kay_feed_url, "should_send_embed") is False
|
||||||
|
assert "Disable screenshot delivery" in response.text
|
||||||
|
assert "Send embed instead of screenshot" not in response.text
|
||||||
|
|
||||||
response = client.post(url="/use_embed", data={"feed_url": c3kay_feed_url})
|
response = client.post(url="/use_embed", data={"feed_url": c3kay_feed_url})
|
||||||
assert response.status_code == 200, f"Failed to set embed mode: {response.text}"
|
assert response.status_code == 200, f"Failed to set embed mode: {response.text}"
|
||||||
|
|
@ -561,7 +569,42 @@ def test_set_feed_save_sent_webhooks_route_updates_stored_tag() -> None:
|
||||||
)
|
)
|
||||||
|
|
||||||
assert response.status_code == 303, f"/set_feed_save_sent_webhooks failed: {response.text}"
|
assert response.status_code == 303, f"/set_feed_save_sent_webhooks failed: {response.text}"
|
||||||
assert stub_reader.tags[stub_reader.feed.url, feeds.SAVE_SENT_WEBHOOKS_TAG] is False
|
assert stub_reader.tags[stub_reader.feed.url, "save_sent_webhooks"] is False
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides = {}
|
||||||
|
|
||||||
|
|
||||||
|
def test_set_feed_media_gallery_image_limit_route_updates_stored_tag() -> None:
|
||||||
|
@dataclass(slots=True)
|
||||||
|
class DummyFeed:
|
||||||
|
url: str
|
||||||
|
title: str
|
||||||
|
|
||||||
|
class StubReader:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.feed = DummyFeed(url="https://example.com/feed.xml", title="Example")
|
||||||
|
self.tags: dict[tuple[str, str], int] = {}
|
||||||
|
|
||||||
|
def get_feed(self, feed_url: str) -> DummyFeed:
|
||||||
|
assert feed_url == self.feed.url
|
||||||
|
return self.feed
|
||||||
|
|
||||||
|
def set_tag(self, resource: str, key: str, value: int) -> None:
|
||||||
|
self.tags[resource, key] = value
|
||||||
|
|
||||||
|
stub_reader = StubReader()
|
||||||
|
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
|
||||||
|
|
||||||
|
try:
|
||||||
|
with patch("discord_rss_bot.main.commit_state_change"):
|
||||||
|
response: Response = client.post(
|
||||||
|
url="/set_feed_media_gallery_image_limit",
|
||||||
|
data={"feed_url": stub_reader.feed.url, "image_limit": "7"},
|
||||||
|
follow_redirects=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 303, f"/set_feed_media_gallery_image_limit failed: {response.text}"
|
||||||
|
assert stub_reader.tags[stub_reader.feed.url, "media_gallery_image_limit"] == 7
|
||||||
finally:
|
finally:
|
||||||
app.dependency_overrides = {}
|
app.dependency_overrides = {}
|
||||||
|
|
||||||
|
|
@ -585,7 +628,7 @@ def test_sent_webhooks_view_shows_saved_records() -> None:
|
||||||
key: str,
|
key: str,
|
||||||
default: feeds.JsonValue = None,
|
default: feeds.JsonValue = None,
|
||||||
) -> feeds.JsonValue:
|
) -> feeds.JsonValue:
|
||||||
if resource == () and key == feeds.SENT_WEBHOOKS_TAG:
|
if resource == () and key == "sent_webhooks":
|
||||||
return [
|
return [
|
||||||
{
|
{
|
||||||
"feed_url": sent_feed_url,
|
"feed_url": sent_feed_url,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue