Instead of embed or text mode, optionally send a full-page screenshot of the entry URL as a Discord file upload
All checks were successful
Test and build Docker image / docker (push) Successful in 1m26s

This commit is contained in:
Joakim Hellsén 2026-04-10 00:32:02 +02:00
commit 9ec0166e7f
Signed by: Joakim Hellsén
SSH key fingerprint: SHA256:/9h/CsExpFp+PRhsfA0xznFx2CGfTT5R/kpuFfUgEQk
14 changed files with 1571 additions and 241 deletions

View file

@ -9,9 +9,9 @@ Discord: TheLovinator#9276
## Features
- Subscribe to RSS feeds and get updates to a Discord webhook.
- Web interface to manage subscriptions.
- Web interface to manage subscriptions, webhooks, and filters.
- Customizable message format for each feed.
- Choose between Discord embed or plain text.
- Choose between sending a Discord embed, plain text or full-page screenshot to the webhook.
- Regex filters for RSS feeds.
- Blacklist/whitelist words in the title/description/author/etc.
- Set different update frequencies for each feed or use a global default.
@ -54,6 +54,8 @@ or [install directly on your computer](#install-directly-on-your-computer).
- Start the bot:
- Type `uv run discord_rss_bot/main.py` into the PowerShell window.
- You can stop the bot with <kbd>Ctrl</kbd> + <kbd>c</kbd>.
- Required for screenshot mode: install browser runtime once:
- `uv run playwright install chromium`
- Bot is now running on port 3000.
- You should run this bot behind a reverse proxy like [Caddy](https://caddyserver.com/)
or [Nginx](https://www.nginx.com/) if you want to access it from the internet. Remember to add authentication.

View file

@ -1,5 +1,7 @@
from __future__ import annotations
import asyncio
import concurrent.futures
import datetime
import json
import logging
@ -8,6 +10,8 @@ import pprint
import re
from typing import TYPE_CHECKING
from typing import Any
from typing import Literal
from typing import cast
from urllib.parse import ParseResult
from urllib.parse import urlparse
@ -16,6 +20,10 @@ from discord_webhook import DiscordEmbed
from discord_webhook import DiscordWebhook
from fastapi import HTTPException
from markdownify import markdownify
from playwright.sync_api import Browser
from playwright.sync_api import Page
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from playwright.sync_api import sync_playwright
from reader import Entry
from reader import EntryNotFoundError
from reader import Feed
@ -48,6 +56,13 @@ if TYPE_CHECKING:
logger: logging.Logger = logging.getLogger(__name__)
type DeliveryMode = Literal["embed", "text", "screenshot"]
type ScreenshotLayout = Literal["desktop", "mobile"]
type ScreenshotFileType = Literal["png", "jpeg"]
MAX_DISCORD_UPLOAD_BYTES: int = 8 * 1024 * 1024
JPEG_QUALITY_STEPS: tuple[int, ...] = (85, 70, 55, 40)
def extract_domain(url: str) -> str: # noqa: PLR0911
"""Extract the domain name from a URL.
@ -98,7 +113,7 @@ def extract_domain(url: str) -> str: # noqa: PLR0911
return "Other"
def send_entry_to_discord(entry: Entry, reader: Reader) -> str | None: # noqa: C901
def send_entry_to_discord(entry: Entry, reader: Reader) -> str | None:
"""Send a single entry to Discord.
Args:
@ -116,8 +131,16 @@ def send_entry_to_discord(entry: Entry, reader: Reader) -> str | None: # noqa:
# If https://discord.com/quests/<quest_id> is in the URL, send a separate message with the URL.
send_discord_quest_notification(entry, webhook_url, reader=reader)
# Check if this is a c3kay feed
if is_c3kay_feed(entry.feed.url):
delivery_mode: DeliveryMode = get_entry_delivery_mode(reader, entry)
logger.info(
"Manual send entry %s from %s using delivery_mode=%s",
entry.id,
entry.feed.url,
delivery_mode,
)
# Hoyolab/c3kay feeds use a custom embed only when embed mode is selected.
if delivery_mode == "embed" and is_c3kay_feed(entry.feed.url):
entry_link: str | None = entry.link
if entry_link:
post_id: str | None = extract_post_id_from_hoyolab_url(entry_link)
@ -134,33 +157,332 @@ def send_entry_to_discord(entry: Entry, reader: Reader) -> str | None: # noqa:
else:
logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url)
if delivery_mode == "embed":
webhook: DiscordWebhook = create_embed_webhook(webhook_url, entry, reader=reader)
elif delivery_mode == "screenshot":
webhook = create_screenshot_webhook(webhook_url, entry, reader=reader)
else:
webhook = create_text_webhook(webhook_url, entry, reader=reader, use_default_message_on_empty=False)
execute_webhook(webhook, entry, reader=reader)
return None
def get_entry_delivery_mode(reader: Reader, entry: Entry) -> DeliveryMode:
"""Resolve the effective delivery mode for an entry.
Priority order:
1. YouTube feeds are forced to text mode.
2. New `delivery_mode` tag when valid.
3. Legacy `should_send_embed` flag for backwards compatibility.
Returns:
DeliveryMode: The effective delivery mode for this entry.
"""
if is_youtube_feed(entry.feed.url):
return "text"
try:
delivery_mode_raw: str = str(reader.get_tag(entry.feed, "delivery_mode", "")).strip().lower()
except ReaderError:
logger.exception("Error getting delivery_mode tag for feed: %s", entry.feed.url)
delivery_mode_raw = ""
if delivery_mode_raw in {"embed", "text", "screenshot"}:
return cast("DeliveryMode", delivery_mode_raw)
try:
should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed", True))
except ReaderError:
logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url)
should_send_embed = True
return "embed" if should_send_embed else "text"
def get_feed_delivery_mode(reader: Reader, feed: Feed) -> DeliveryMode:
"""Resolve the effective delivery mode for a feed.
This mirrors `get_entry_delivery_mode` and is used by the web UI.
Returns:
DeliveryMode: The effective delivery mode for this feed.
"""
if is_youtube_feed(feed.url):
return "text"
try:
delivery_mode_raw: str = str(reader.get_tag(feed, "delivery_mode", "")).strip().lower()
except ReaderError:
logger.exception("Error getting delivery_mode tag for feed: %s", feed.url)
delivery_mode_raw = ""
if delivery_mode_raw in {"embed", "text", "screenshot"}:
return cast("DeliveryMode", delivery_mode_raw)
try:
should_send_embed = bool(reader.get_tag(feed, "should_send_embed", True))
except ReaderError:
logger.exception("Error getting should_send_embed tag for feed: %s", feed.url)
should_send_embed = True
return "embed" if should_send_embed else "text"
def get_screenshot_layout(reader: Reader, feed: Feed) -> ScreenshotLayout:
"""Resolve the screenshot layout for a feed.
Returns:
ScreenshotLayout: The screenshot layout (`desktop` or `mobile`).
"""
try:
screenshot_layout_raw: str = str(reader.get_tag(feed, "screenshot_layout", "desktop")).strip().lower()
except ReaderError:
logger.exception("Error getting screenshot_layout tag for feed: %s", feed.url)
screenshot_layout_raw = "desktop"
if screenshot_layout_raw == "mobile":
return "mobile"
return "desktop"
def create_text_webhook(
webhook_url: str,
entry: Entry,
reader: Reader,
*,
use_default_message_on_empty: bool,
) -> DiscordWebhook:
"""Create a text webhook using the configured custom message for a feed.
Returns:
DiscordWebhook: Configured webhook that sends a text message.
"""
webhook_message: str = ""
# Try to get the custom message for the feed. If the user has none, we will use the default message.
# This has to be a string for some reason so don't change it to "not custom_message.get_custom_message()"
if get_custom_message(reader, entry.feed) != "": # noqa: PLC1901
webhook_message: str = replace_tags_in_text_message(entry=entry, reader=reader)
webhook_message = replace_tags_in_text_message(entry=entry, reader=reader)
if not webhook_message and use_default_message_on_empty:
webhook_message = str(default_custom_message)
if not webhook_message:
webhook_message = "No message found."
# Create the webhook.
webhook_message = truncate_webhook_message(webhook_message)
return DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True)
def create_screenshot_webhook(webhook_url: str, entry: Entry, reader: Reader) -> DiscordWebhook:
"""Create a webhook that uploads a full-page screenshot of the entry URL.
Returns:
DiscordWebhook: Configured webhook with screenshot upload, or text fallback on failure.
"""
entry_link: str = str(entry.link or "").strip()
webhook_content: str | None = f"<{entry_link}>" if entry_link else None
webhook = DiscordWebhook(url=webhook_url, content=webhook_content, rate_limit_retry=True)
if not entry_link:
logger.warning("Entry %s has no link. Falling back to text message for screenshot mode.", entry.id)
return create_text_webhook(webhook_url, entry, reader=reader, use_default_message_on_empty=True)
screenshot_layout: ScreenshotLayout = get_screenshot_layout(reader, entry.feed)
logger.info(
"Attempting screenshot capture for entry %s with layout=%s: %s",
entry.id,
screenshot_layout,
entry_link,
)
screenshot_bytes: bytes | None = capture_full_page_screenshot(
entry_link,
screenshot_layout=screenshot_layout,
screenshot_type="png",
)
screenshot_extension: str = "png"
if screenshot_bytes and len(screenshot_bytes) > MAX_DISCORD_UPLOAD_BYTES:
logger.info(
"Screenshot for entry %s is too large as PNG (%d bytes). Trying JPEG compression.",
entry.id,
len(screenshot_bytes),
)
for quality in JPEG_QUALITY_STEPS:
jpeg_bytes = capture_full_page_screenshot(
entry_link,
screenshot_layout=screenshot_layout,
screenshot_type="jpeg",
jpeg_quality=quality,
)
if jpeg_bytes is None:
continue
logger.info(
"JPEG quality=%d produced %d bytes for entry %s",
quality,
len(jpeg_bytes),
entry.id,
)
screenshot_bytes = jpeg_bytes
screenshot_extension = "jpg"
if len(screenshot_bytes) <= MAX_DISCORD_UPLOAD_BYTES:
break
if screenshot_bytes is None:
logger.warning(
"Screenshot capture failed for entry %s (%s). Falling back to text message.",
entry.id,
entry_link,
)
return create_text_webhook(webhook_url, entry, reader=reader, use_default_message_on_empty=True)
if len(screenshot_bytes) > MAX_DISCORD_UPLOAD_BYTES:
logger.warning(
"Screenshot for entry %s is still too large after compression (%d bytes). Falling back to text message.",
entry.id,
len(screenshot_bytes),
)
return create_text_webhook(webhook_url, entry, reader=reader, use_default_message_on_empty=True)
filename: str = screenshot_filename_for_entry(entry, extension=screenshot_extension)
logger.info("Screenshot capture succeeded for entry %s (%d bytes)", entry.id, len(screenshot_bytes))
webhook.add_file(file=screenshot_bytes, filename=filename)
return webhook
def screenshot_filename_for_entry(entry: Entry, *, extension: str = "png") -> str:
"""Build a safe screenshot filename for Discord uploads.
Args:
entry: Entry used to derive a stable filename.
extension: File extension to use.
Returns:
str: Safe filename ending in the selected extension.
"""
base_name: str = str(entry.id or "entry").strip().lower()
safe_name: str = re.sub(r"[^a-z0-9._-]+", "_", base_name)
safe_name: str = safe_name.strip("._")
if not safe_name:
safe_name = "entry"
safe_extension: str = re.sub(r"[^a-z0-9]+", "", extension.lower())
if not safe_extension:
safe_extension = "png"
return f"{safe_name[:80]}.{safe_extension}"
def capture_full_page_screenshot(
url: str,
*,
screenshot_layout: ScreenshotLayout = "desktop",
screenshot_type: ScreenshotFileType = "png",
jpeg_quality: int = 85,
) -> bytes | None:
"""Capture a full-page PNG screenshot for a URL.
Returns:
bytes | None: PNG bytes on success, otherwise None.
"""
# Playwright sync API cannot run in an active asyncio loop.
# FastAPI manual routes run on the event loop, so offload to a worker thread.
try:
should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed", True))
except StorageError:
logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url)
should_send_embed = True
asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(
_capture_full_page_screenshot_sync,
url,
screenshot_layout=screenshot_layout,
screenshot_type=screenshot_type,
jpeg_quality=jpeg_quality,
)
return future.result()
except RuntimeError:
# No running loop in this thread (e.g. scheduler path).
return _capture_full_page_screenshot_sync(
url,
screenshot_layout=screenshot_layout,
screenshot_type=screenshot_type,
jpeg_quality=jpeg_quality,
)
# YouTube feeds should never use embeds
if is_youtube_feed(entry.feed.url):
should_send_embed = False
if should_send_embed:
webhook = create_embed_webhook(webhook_url, entry, reader=reader)
def _capture_full_page_screenshot_sync(
url: str,
*,
screenshot_layout: ScreenshotLayout = "desktop",
screenshot_type: ScreenshotFileType = "png",
jpeg_quality: int = 85,
) -> bytes | None:
"""Capture a full-page PNG screenshot for a URL.
Returns:
bytes | None: PNG bytes on success, otherwise None.
"""
try:
with sync_playwright() as playwright:
browser: Browser = playwright.chromium.launch(
headless=True,
args=["--disable-dev-shm-usage", "--no-sandbox"],
)
try:
if screenshot_layout == "mobile":
page = browser.new_page(
viewport={"width": 390, "height": 844},
is_mobile=True,
has_touch=True,
device_scale_factor=3,
color_scheme="dark",
user_agent=(
"Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 "
"Mobile/15E148 Safari/604.1"
),
)
else:
webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True)
page = browser.new_page(viewport={"width": 1366, "height": 768}, color_scheme="dark")
execute_webhook(webhook, entry, reader=reader)
page = cast("Page", page)
# `networkidle` can hang on pages with long-polling/analytics;
# load DOM first and then best-effort wait for network idle.
page.goto(url, wait_until="domcontentloaded", timeout=30000)
try:
page.wait_for_load_state("networkidle", timeout=5000)
except PlaywrightTimeoutError:
logger.debug("Timed out waiting for network idle for URL: %s", url)
# Scroll through the page in viewport-sized steps to trigger
# lazy-loaded images and content before taking the screenshot.
page.evaluate(
"""
async () => {
const viewportHeight = window.innerHeight;
const totalHeight = document.body.scrollHeight;
let scrolled = 0;
while (scrolled < totalHeight) {
window.scrollBy(0, viewportHeight);
scrolled += viewportHeight;
await new Promise(r => setTimeout(r, 200));
}
window.scrollTo(0, 0);
}
""",
)
# Brief pause for any content revealed by scrolling to settle.
page.wait_for_timeout(500)
if screenshot_type == "jpeg":
clamped_quality: int = max(1, min(100, jpeg_quality))
return page.screenshot(type="jpeg", quality=clamped_quality, full_page=True)
return page.screenshot(type="png", full_page=True)
finally:
browser.close()
except OSError:
logger.exception("Playwright browser is not installed. Failed to capture screenshot for URL: %s", url)
except Exception:
logger.exception("Failed to capture screenshot for URL: %s", url)
return None
@ -375,26 +697,19 @@ def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, d
logger.info("No webhook URL found for feed: %s", entry.feed.url)
continue
should_send_embed: bool = should_send_embed_check(effective_reader, entry)
delivery_mode: DeliveryMode = get_entry_delivery_mode(effective_reader, entry)
# Youtube feeds only need to send the link
if is_youtube_feed(entry.feed.url):
should_send_embed = False
if should_send_embed:
if delivery_mode == "embed":
webhook = create_embed_webhook(webhook_url, entry, reader=effective_reader)
elif delivery_mode == "screenshot":
webhook = create_screenshot_webhook(webhook_url, entry, reader=effective_reader)
else:
# If the user has set the custom message to an empty string, we will use the default message, otherwise we
# will use the custom message.
if get_custom_message(effective_reader, entry.feed) != "": # noqa: PLC1901
webhook_message = replace_tags_in_text_message(entry, reader=effective_reader)
else:
webhook_message: str = str(default_custom_message)
webhook_message = truncate_webhook_message(webhook_message)
# Create the webhook.
webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True)
webhook = create_text_webhook(
webhook_url,
entry,
reader=effective_reader,
use_default_message_on_empty=True,
)
# Check if the entry is blacklisted, and if it is, we will skip it.
if entry_should_be_skipped(effective_reader, entry):
@ -455,6 +770,7 @@ def execute_webhook(webhook: DiscordWebhook, entry: Entry, reader: Reader) -> No
return
response: Response = webhook.execute()
logger.debug("Discord webhook response for entry %s: status=%s", entry.id, response.status_code)
if response.status_code not in {200, 204}:
msg: str = f"Error sending entry to Discord: {response.text}\n{pprint.pformat(webhook.json)}"
if entry:
@ -487,17 +803,7 @@ def should_send_embed_check(reader: Reader, entry: Entry) -> bool:
Returns:
bool: True if we should send an embed, False otherwise.
"""
# YouTube feeds should never use embeds - only links
if is_youtube_feed(entry.feed.url):
return False
try:
should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed", True))
except ReaderError:
logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url)
should_send_embed = True
return should_send_embed
return get_entry_delivery_mode(reader, entry) == "embed"
def truncate_webhook_message(webhook_message: str) -> str:
@ -516,7 +822,7 @@ def truncate_webhook_message(webhook_message: str) -> str:
return webhook_message
def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None: # noqa: C901
def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None: # noqa: C901, PLR0912
"""Add a new feed, update it and mark every entry as read.
Args:
@ -572,8 +878,19 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None:
# This is the default message that will be sent to Discord.
reader.set_tag(clean_feed_url, "custom_message", default_custom_message) # pyright: ignore[reportArgumentType]
global_screenshot_layout: str = str(reader.get_tag((), "screenshot_layout", "desktop")).strip().lower()
if global_screenshot_layout not in {"desktop", "mobile"}:
global_screenshot_layout = "desktop"
reader.set_tag(clean_feed_url, "screenshot_layout", global_screenshot_layout) # pyright: ignore[reportArgumentType]
global_delivery_mode: str = str(reader.get_tag((), "delivery_mode", "embed")).strip().lower()
if global_delivery_mode not in {"embed", "text"}:
global_delivery_mode = "embed"
reader.set_tag(clean_feed_url, "delivery_mode", global_delivery_mode) # pyright: ignore[reportArgumentType]
reader.set_tag(clean_feed_url, "should_send_embed", global_delivery_mode == "embed") # pyright: ignore[reportArgumentType]
# Set the default embed tag when creating the feed
reader.set_tag(clean_feed_url, "embed", json.dumps(default_custom_embed))
reader.set_tag(clean_feed_url, "embed", json.dumps(default_custom_embed)) # pyright: ignore[reportArgumentType]
# Update the full-text search index so our new feed is searchable.
reader.update_search()

View file

@ -47,6 +47,8 @@ type TAG_VALUE = (
_FEED_TAGS: tuple[str, ...] = (
"webhook",
"custom_message",
"delivery_mode",
"screenshot_layout",
"should_send_embed",
"embed",
"blacklist_title",
@ -184,9 +186,18 @@ def export_state(reader: Reader, backup_path: Path) -> None:
if isinstance(global_update_config, dict):
global_update_interval = global_update_config
global_screenshot_layout: str | None = None
screenshot_layout = reader.get_tag((), "screenshot_layout", None)
if isinstance(screenshot_layout, str):
clean_layout = screenshot_layout.strip().lower()
if clean_layout in {"desktop", "mobile"}:
global_screenshot_layout = clean_layout
state: dict = {"feeds": feeds_state, "webhooks": webhooks}
if global_update_interval is not None:
state["global_update_interval"] = global_update_interval
if global_screenshot_layout is not None:
state["global_screenshot_layout"] = global_screenshot_layout
state_file: Path = backup_path / "state.json"
state_file.write_text(json.dumps(state, indent=2, default=str), encoding="utf-8")

View file

@ -50,6 +50,8 @@ from discord_rss_bot.custom_message import replace_tags_in_text_message
from discord_rss_bot.custom_message import save_embed
from discord_rss_bot.feeds import create_feed
from discord_rss_bot.feeds import extract_domain
from discord_rss_bot.feeds import get_feed_delivery_mode
from discord_rss_bot.feeds import get_screenshot_layout
from discord_rss_bot.feeds import send_entry_to_discord
from discord_rss_bot.feeds import send_to_discord
from discord_rss_bot.git_backup import commit_state_change
@ -67,7 +69,7 @@ if TYPE_CHECKING:
LOGGING_CONFIG: dict[str, Any] = {
"version": 1,
"disable_existing_loggers": True,
"disable_existing_loggers": False,
"formatters": {
"standard": {
"format": "%(asctime)s [%(processName)s: %(process)d] [%(threadName)s: %(thread)d] [%(levelname)s] %(name)s: %(message)s", # noqa: E501
@ -179,10 +181,10 @@ templates: Jinja2Templates = Jinja2Templates(directory="discord_rss_bot/template
# Add the filters to the Jinja2 environment so they can be used in html templates.
templates.env.filters["encode_url"] = lambda url: urllib.parse.quote(url) if url else ""
templates.env.filters["discord_markdown"] = markdownify
templates.env.filters["encode_url"] = lambda url: urllib.parse.quote(str(url)) if url else ""
templates.env.filters["discord_markdown"] = markdownify # pyright: ignore[reportArgumentType]
templates.env.filters["relative_time"] = relative_time
templates.env.globals["get_backup_path"] = get_backup_path
templates.env.globals["get_backup_path"] = get_backup_path # pyright: ignore[reportArgumentType]
@app.post("/add_webhook")
@ -703,6 +705,7 @@ async def post_use_embed(
RedirectResponse: Redirect to the feed page.
"""
clean_feed_url: str = feed_url.strip()
reader.set_tag(clean_feed_url, "delivery_mode", "embed") # pyright: ignore[reportArgumentType]
reader.set_tag(clean_feed_url, "should_send_embed", True) # pyright: ignore[reportArgumentType]
commit_state_change(reader, f"Enable embed mode for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@ -723,11 +726,78 @@ async def post_use_text(
RedirectResponse: Redirect to the feed page.
"""
clean_feed_url: str = feed_url.strip()
reader.set_tag(clean_feed_url, "delivery_mode", "text") # pyright: ignore[reportArgumentType]
reader.set_tag(clean_feed_url, "should_send_embed", False) # pyright: ignore[reportArgumentType]
commit_state_change(reader, f"Disable embed mode for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.post("/use_screenshot")
async def post_use_screenshot(
feed_url: Annotated[str, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
) -> RedirectResponse:
"""Use full-page screenshot mode instead of embed or text.
Args:
feed_url: The feed to change.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the feed page.
"""
clean_feed_url: str = feed_url.strip()
reader.set_tag(clean_feed_url, "delivery_mode", "screenshot") # pyright: ignore[reportArgumentType]
reader.set_tag(clean_feed_url, "screenshot_layout", "desktop") # pyright: ignore[reportArgumentType]
reader.set_tag(clean_feed_url, "should_send_embed", False) # pyright: ignore[reportArgumentType]
commit_state_change(reader, f"Enable screenshot mode for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.post("/use_screenshot_mobile")
async def post_use_screenshot_mobile(
feed_url: Annotated[str, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
) -> RedirectResponse:
"""Use screenshot mode with mobile layout.
Args:
feed_url: The feed to change.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the feed page.
"""
clean_feed_url: str = feed_url.strip()
reader.set_tag(clean_feed_url, "delivery_mode", "screenshot") # pyright: ignore[reportArgumentType]
reader.set_tag(clean_feed_url, "screenshot_layout", "mobile") # pyright: ignore[reportArgumentType]
reader.set_tag(clean_feed_url, "should_send_embed", False) # pyright: ignore[reportArgumentType]
commit_state_change(reader, f"Enable screenshot mobile layout for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.post("/use_screenshot_desktop")
async def post_use_screenshot_desktop(
feed_url: Annotated[str, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
) -> RedirectResponse:
"""Use screenshot mode with desktop layout.
Args:
feed_url: The feed to change.
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the feed page.
"""
clean_feed_url: str = feed_url.strip()
reader.set_tag(clean_feed_url, "delivery_mode", "screenshot") # pyright: ignore[reportArgumentType]
reader.set_tag(clean_feed_url, "screenshot_layout", "desktop") # pyright: ignore[reportArgumentType]
reader.set_tag(clean_feed_url, "should_send_embed", False) # pyright: ignore[reportArgumentType]
commit_state_change(reader, f"Enable screenshot desktop layout for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.post("/set_update_interval")
async def post_set_update_interval(
feed_url: Annotated[str, Form()],
@ -886,6 +956,52 @@ async def post_set_global_update_interval(
return RedirectResponse(url="/settings", status_code=303)
@app.post("/set_global_screenshot_layout")
async def post_set_global_screenshot_layout(
screenshot_layout: Annotated[str, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
) -> RedirectResponse:
"""Set the global default screenshot layout for newly added feeds.
Args:
screenshot_layout: The screenshot layout (`desktop` or `mobile`).
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the settings page.
"""
clean_layout: str = screenshot_layout.strip().lower()
if clean_layout not in {"desktop", "mobile"}:
clean_layout = "desktop"
reader.set_tag((), "screenshot_layout", clean_layout) # pyright: ignore[reportArgumentType]
commit_state_change(reader, f"Set global screenshot layout to {clean_layout}")
return RedirectResponse(url="/settings", status_code=303)
@app.post("/set_global_delivery_mode")
async def post_set_global_delivery_mode(
delivery_mode: Annotated[str, Form()],
reader: Annotated[Reader, Depends(get_reader_dependency)],
) -> RedirectResponse:
"""Set the global default delivery mode for newly added feeds.
Args:
delivery_mode: The delivery mode (`embed` or `text`).
reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the settings page.
"""
clean_delivery_mode: str = delivery_mode.strip().lower()
if clean_delivery_mode not in {"embed", "text"}:
clean_delivery_mode = "embed"
reader.set_tag((), "delivery_mode", clean_delivery_mode) # pyright: ignore[reportArgumentType]
commit_state_change(reader, f"Set global delivery mode to {clean_delivery_mode}")
return RedirectResponse(url="/settings", status_code=303)
@app.get("/add", response_class=HTMLResponse)
def get_add(
request: Request,
@ -900,9 +1016,14 @@ def get_add(
Returns:
HTMLResponse: The add feed page.
"""
global_delivery_mode: str = str(reader.get_tag((), "delivery_mode", "embed")).strip().lower()
if global_delivery_mode not in {"embed", "text"}:
global_delivery_mode = "embed"
context = {
"request": request,
"webhooks": reader.get_tag((), "webhooks", []),
"global_delivery_mode": global_delivery_mode,
}
return templates.TemplateResponse(request=request, name="add.html", context=context)
@ -974,6 +1095,8 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
"feed_counts": reader.get_feed_counts(feed=clean_feed_url),
"html": html,
"should_send_embed": False,
"delivery_mode": "text",
"screenshot_layout": "desktop",
"last_entry": None,
"messages": msg,
"is_show_more_entries_button_visible": is_show_more_entries_button_visible,
@ -1002,7 +1125,9 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
# Create the html for the entries.
html: str = create_html_for_feed(reader=reader, entries=entries, current_feed_url=clean_feed_url)
should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed", True))
delivery_mode: str = get_feed_delivery_mode(reader, feed)
should_send_embed: bool = delivery_mode == "embed"
screenshot_layout: str = get_screenshot_layout(reader, feed)
# Get the update interval for this feed
feed_interval: int | None = None
@ -1027,6 +1152,8 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
"feed_counts": reader.get_feed_counts(feed=clean_feed_url),
"html": html,
"should_send_embed": should_send_embed,
"delivery_mode": delivery_mode,
"screenshot_layout": screenshot_layout,
"last_entry": last_entry,
"is_show_more_entries_button_visible": is_show_more_entries_button_visible,
"total_entries": total_entries,
@ -1212,6 +1339,14 @@ async def get_settings(
if isinstance(interval_value, int):
global_interval = interval_value
global_screenshot_layout: str = str(reader.get_tag((), "screenshot_layout", "desktop")).strip().lower()
if global_screenshot_layout not in {"desktop", "mobile"}:
global_screenshot_layout = "desktop"
global_delivery_mode: str = str(reader.get_tag((), "delivery_mode", "embed")).strip().lower()
if global_delivery_mode not in {"embed", "text"}:
global_delivery_mode = "embed"
# Get all feeds with their intervals
feeds: Iterable[Feed] = reader.get_feeds()
feed_intervals = []
@ -1233,6 +1368,8 @@ async def get_settings(
context = {
"request": request,
"global_interval": global_interval,
"global_delivery_mode": global_delivery_mode,
"global_screenshot_layout": global_screenshot_layout,
"feed_intervals": feed_intervals,
}
return templates.TemplateResponse(request=request, name="settings.html", context=context)

View file

@ -51,4 +51,12 @@ def get_reader(custom_location: Path | None = None) -> Reader:
# Set default
reader.set_tag((), ".reader.update", {"interval": 15})
# Set the default screenshot layout to desktop if not already configured.
if reader.get_tag((), "screenshot_layout", None) is None:
reader.set_tag((), "screenshot_layout", "desktop") # pyright: ignore[reportArgumentType]
# Set the default delivery mode for new feeds to embed if not already configured.
if reader.get_tag((), "delivery_mode", None) is None:
reader.set_tag((), "delivery_mode", "embed") # pyright: ignore[reportArgumentType]
return reader

View file

@ -17,3 +17,50 @@ body {
.interval-input {
max-width: 120px;
}
.screenshot-requirement {
color: #9c9c9c;
font-size: 0.9rem;
}
.screenshot-requirement code {
color: #cfcfcf;
background: #1b1b1b;
border: 1px solid #2a2a2a;
border-radius: 4px;
padding: 0 0.35rem;
}
.status-chip {
display: inline-flex !important;
align-items: center;
width: auto !important;
max-width: 100%;
white-space: normal;
}
.feed-page,
.feed-page .card,
.feed-page .card-body,
.feed-page .input-group,
.feed-page .input-group>* {
min-width: 0;
}
.feed-page__content {
min-width: 0;
max-width: 100%;
}
.feed-page__wrap {
overflow-wrap: anywhere;
word-break: break-word;
}
.feed-page__pre {
max-width: 100%;
overflow-x: auto;
white-space: pre-wrap;
overflow-wrap: anywhere;
word-break: break-word;
}

View file

@ -5,18 +5,27 @@
{% block content %}
<div class="p-2 border border-dark">
<form action="/add" method="post">
<div class="mb-3 text-muted">
New feeds currently default to
<strong>{{ global_delivery_mode }}</strong>
delivery mode.
</div>
<!-- Feed URL -->
<div class="row pb-2">
<label for="feed_url" class="col-sm-2 col-form-label">Feed URL</label>
<div class="col-sm-10">
<input name="feed_url" type="text" class="form-control bg-dark border-dark text-muted" id="feed_url" />
<input name="feed_url"
type="text"
class="form-control bg-dark border-dark text-muted"
id="feed_url" />
</div>
</div>
<!-- Webhook dropdown -->
<div class="row pb-2">
<label for="webhook_dropdown" class="col-sm-2 col-form-label">Webhook</label>
<div class="col-sm-10">
<select class="col-auto form-select bg-dark border-dark text-muted" id="webhook_dropdown"
<select class="col-auto form-select bg-dark border-dark text-muted"
id="webhook_dropdown"
name="webhook_dropdown">
<option selected>Choose webhook...</option>
{% for hook in webhooks %}<option value="{{ hook.name }}">{{- hook.name -}}</option>{% endfor %}

View file

@ -3,36 +3,69 @@
| {{ feed.title }}
{% endblock title %}
{% block content %}
<div class="card mb-3 border border-dark p-3 text-light">
<!-- Feed Title -->
<h2>
<a class="text-muted" href="{{ feed.url }}">{{ feed.title }}</a> ({{ total_entries }} entries)
<div class="row g-3 feed-page">
<div class="col-12">
<article class="card border border-dark shadow-sm text-light">
<div class="card-body p-3 p-md-4 text-light">
<div class="d-flex flex-wrap justify-content-between align-items-start gap-3">
<div class="feed-page__content">
<h2 class="h3 mb-1">
<a class="text-muted text-decoration-none feed-page__wrap"
href="{{ feed.url }}">{{ feed.title }}</a>
</h2>
{% if not feed.updates_enabled %}<span class="badge bg-danger">Disabled</span>{% endif %}
<p class="text-muted mb-0">{{ total_entries }} entries</p>
</div>
<div class="d-flex flex-wrap gap-2">
{% if not feed.updates_enabled %}<span class="badge bg-danger status-chip">Disabled</span>{% endif %}
<span class="badge status-chip {% if delivery_mode == "embed" %} bg-primary {% else %} bg-secondary {% endif %}">
Current mode:
{% if delivery_mode == "embed" %}
Embed
{% elif delivery_mode == "screenshot" %}
Screenshot
{% else %}
Text
{% endif %}
</span>
{% if delivery_mode == "screenshot" %}
<span class="badge status-chip bg-secondary">
Screenshot layout:
{% if screenshot_layout == "mobile" %}
Mobile
{% else %}
Desktop
{% endif %}
</span>
{% endif %}
</div>
</div>
{% if feed.last_exception %}
<div class="mt-3">
<h5 class="text-danger">{{ feed.last_exception.type_name }}:</h5>
<code class="d-block">{{ feed.last_exception.value_str }}</code>
<button class="btn btn-secondary btn-sm mt-2"
<div class="alert alert-danger mt-4 mb-0" role="alert">
<h5 class="alert-heading mb-2">{{ feed.last_exception.type_name }}</h5>
<code class="d-block mb-2 feed-page__wrap">{{ feed.last_exception.value_str }}</code>
<button class="btn btn-outline-light btn-sm"
type="button"
data-bs-toggle="collapse"
data-bs-target="#exceptionDetails"
aria-expanded="false"
aria-controls="exceptionDetails">Show Traceback</button>
<div class="collapse" id="exceptionDetails">
<pre><code>{{ feed.last_exception.traceback_str }}</code></pre>
<div class="collapse mt-2" id="exceptionDetails">
<pre class="mb-0 feed-page__pre"><code>{{ feed.last_exception.traceback_str }}</code></pre>
</div>
</div>
{% endif %}
<!-- Feed Actions -->
<div class="mt-3 d-flex flex-wrap gap-2">
<section class="mt-4 pt-3 border-top border-secondary-subtle">
<h3 class="h6 text-uppercase text-muted mb-3">Actions</h3>
<div class="d-flex flex-wrap gap-2">
<a href="/update?feed_url={{ feed.url|encode_url }}"
class="btn btn-primary btn-sm">Update</a>
<form action="/remove" method="post" class="d-inline">
<button class="btn btn-danger btn-sm"
name="feed_url"
value="{{ feed.url }}"
onclick="return confirm('Are you sure you want to delete this feed?')">Remove</button>
onclick="return confirm('Are you sure you want to delete this feed?')">
Remove
</button>
</form>
{% if not feed.updates_enabled %}
<form action="/unpause" method="post" class="d-inline">
@ -46,90 +79,137 @@
</form>
{% endif %}
{% if not "youtube.com/feeds/videos.xml" in feed.url %}
{% if should_send_embed %}
{% if delivery_mode == "embed" %}
<form action="/use_text" method="post" class="d-inline">
<button class="btn btn-dark btn-sm" name="feed_url" value="{{ feed.url }}">Send text message instead of embed</button>
<button class="btn btn-outline-light btn-sm"
name="feed_url"
value="{{ feed.url }}">Send text message instead of embed</button>
</form>
<form action="/use_screenshot" method="post" class="d-inline">
<button class="btn btn-outline-light btn-sm"
name="feed_url"
value="{{ feed.url }}">
Send full-page screenshot instead of embed
</button>
</form>
{% elif delivery_mode == "screenshot" %}
<form action="/use_embed" method="post" class="d-inline">
<button class="btn btn-outline-light btn-sm"
name="feed_url"
value="{{ feed.url }}">Send embed instead of screenshot</button>
</form>
<form action="/use_text" method="post" class="d-inline">
<button class="btn btn-outline-light btn-sm"
name="feed_url"
value="{{ feed.url }}">Send text message instead of screenshot</button>
</form>
{% if screenshot_layout == "mobile" %}
<form action="/use_screenshot_desktop" method="post" class="d-inline">
<button class="btn btn-outline-secondary btn-sm"
name="feed_url"
value="{{ feed.url }}">Use desktop screenshot layout</button>
</form>
{% else %}
<form action="/use_embed" method="post" class="d-inline">
<button class="btn btn-dark btn-sm" name="feed_url" value="{{ feed.url }}">Send embed instead of text message</button>
<form action="/use_screenshot_mobile" method="post" class="d-inline">
<button class="btn btn-outline-secondary btn-sm"
name="feed_url"
value="{{ feed.url }}">Use mobile screenshot layout</button>
</form>
{% endif %}
{% else %}
<form action="/use_embed" method="post" class="d-inline">
<button class="btn btn-outline-light btn-sm"
name="feed_url"
value="{{ feed.url }}">Send embed instead of text message</button>
</form>
<form action="/use_screenshot" method="post" class="d-inline">
<button class="btn btn-outline-light btn-sm"
name="feed_url"
value="{{ feed.url }}">
Send full-page screenshot instead of text message
</button>
</form>
{% endif %}
<div class="w-100 mt-1 screenshot-requirement">
Screenshot mode requires Chromium to be installed for Playwright. Run <code>uv run playwright install chromium</code> once on this machine.
</div>
{% endif %}
</div>
<!-- Additional Links -->
<div class="mt-3">
<a class="text-muted d-block"
</section>
<section class="mt-4 pt-3 border-top border-secondary-subtle">
<h3 class="h6 text-uppercase text-muted mb-3">Customization</h3>
<div class="d-flex flex-column align-items-start gap-2">
<a class="text-muted text-decoration-none"
href="/whitelist?feed_url={{ feed.url|encode_url }}">Whitelist</a>
<a class="text-muted d-block"
<a class="text-muted text-decoration-none"
href="/blacklist?feed_url={{ feed.url|encode_url }}">Blacklist</a>
<a class="text-muted d-block"
<a class="text-muted text-decoration-none"
href="/custom?feed_url={{ feed.url|encode_url }}">
Customize message
{% if not should_send_embed %}(Currently active){% endif %}
{% if delivery_mode == "text" %}(Currently active){% endif %}
</a>
{% if not "youtube.com/feeds/videos.xml" in feed.url %}
<a class="text-muted d-block"
<a class="text-muted text-decoration-none"
href="/embed?feed_url={{ feed.url|encode_url }}">
Customize embed
{% if should_send_embed %}(Currently active){% endif %}
{% if delivery_mode == "embed" %}(Currently active){% endif %}
</a>
{% endif %}
</div>
<!-- Feed URL Configuration -->
<div class="mt-4 border-top border-secondary pt-3">
<h5 class="mb-3">Feed URL</h5>
<form action="/change_feed_url" method="post" class="mb-2">
</section>
<section class="mt-4 pt-3 border-top border-secondary-subtle">
<h3 class="h6 text-uppercase text-muted mb-3">Feed URL</h3>
<form action="/change_feed_url" method="post" class="mb-0">
<input type="hidden" name="old_feed_url" value="{{ feed.url }}" />
<div class="input-group input-group-sm mb-2">
<div class="input-group input-group-sm feed-page__content">
<input type="url"
class="form-control form-control-sm"
class="form-control feed-page__wrap"
name="new_feed_url"
value="{{ feed.url }}"
required />
<button class="btn btn-warning" type="submit">Update URL</button>
</div>
</form>
</section>
<section class="mt-4 pt-3 border-top border-secondary-subtle">
<h3 class="h6 text-uppercase text-muted mb-3">Feed Information</h3>
<div class="row g-2 text-muted small">
<div class="col-12 col-md-6">
<div class="p-2 border border-secondary rounded">Added: {{ feed.added | relative_time }}</div>
</div>
<!-- Feed Metadata -->
<div class="mt-4 border-top border-secondary pt-3">
<h5 class="mb-3">Feed Information</h5>
<div class="row text-muted">
<div class="col-md-6 mb-2">
<small><strong>Added:</strong> {{ feed.added | relative_time }}</small>
<div class="col-12 col-md-6">
<div class="p-2 border border-secondary rounded">Last Updated: {{ feed.last_updated | relative_time }}</div>
</div>
<div class="col-md-6 mb-2">
<small><strong>Last Updated:</strong> {{ feed.last_updated | relative_time }}</small>
<div class="col-12 col-md-6">
<div class="p-2 border border-secondary rounded">Last Retrieved: {{ feed.last_retrieved | relative_time }}</div>
</div>
<div class="col-md-6 mb-2">
<small><strong>Last Retrieved:</strong> {{ feed.last_retrieved | relative_time }}</small>
<div class="col-12 col-md-6">
<div class="p-2 border border-secondary rounded">Next Update: {{ feed.update_after | relative_time }}</div>
</div>
<div class="col-md-6 mb-2">
<small><strong>Next Update:</strong> {{ feed.update_after | relative_time }}</small>
</div>
<div class="col-md-6 mb-2">
<small><strong>Updates:</strong> <span class="badge {{ 'bg-success' if feed.updates_enabled else 'bg-danger' }}">{{ 'Enabled' if feed.updates_enabled else 'Disabled' }}</span></small>
<div class="col-12 col-md-6">
<div class="p-2 border border-secondary rounded">
Updates:
<span class="badge {{ 'bg-success' if feed.updates_enabled else 'bg-danger' }}">
{{ 'Enabled' if feed.updates_enabled else 'Disabled' }}
</span>
</div>
</div>
</div>
<!-- Update Interval Configuration -->
<div class="mt-4 border-top border-secondary pt-3">
<h5 class="mb-3">
Update Interval <span class="badge
{% if feed_interval %}
bg-info
{% else %}
bg-secondary
{% endif %}">
</section>
<section class="mt-4 pt-3 border-top border-secondary-subtle">
<div class="d-flex flex-wrap align-items-center justify-content-between gap-2 mb-3">
<h3 class="h6 text-uppercase text-muted mb-0">Update Interval</h3>
<span class="badge {% if feed_interval %} bg-info {% else %} bg-secondary {% endif %}">
{% if feed_interval %}
Custom
{% else %}
Using global default
{% endif %}
</span>
</h5>
<div class="d-flex align-items-center gap-2 flex-wrap">
<span class="text-muted">Current: <strong>
</div>
<p class="text-muted mb-3">
Current:
<strong>
{% if feed_interval %}
{{ feed_interval }}
{% if feed_interval >= 60 %}({{ (feed_interval / 60) | round(1) }} hours){% endif %}
@ -137,7 +217,10 @@
{{ global_interval }}
{% if global_interval >= 60 %}({{ (global_interval / 60) | round(1) }} hours){% endif %}
{% endif %}
minutes</strong></span>
minutes
</strong>
</p>
<div class="d-flex flex-wrap align-items-center gap-2">
<form action="/set_update_interval"
method="post"
class="d-inline-flex gap-2 align-items-center">
@ -159,14 +242,25 @@
</form>
{% endif %}
</div>
</section>
</div>
</article>
</div>
<div class="col-12">
<section class="card border border-dark shadow-sm text-light">
<div class="card-header bg-transparent text-muted border-secondary">Rendered HTML content</div>
<div class="card-body p-0">
<pre class="m-0 p-3 feed-page__pre">{{ html|safe }}</pre>
</div>
</section>
</div>
{# Rendered HTML content #}
<pre>{{ html|safe }}</pre>
{% if is_show_more_entries_button_visible %}
<a class="btn btn-dark mt-3"
<div class="col-12">
<a class="btn btn-dark"
href="/feed?feed_url={{ feed.url|encode_url }}&starting_after={{ last_entry.id|encode_url }}">
Show more entries
</a>
</div>
{% endif %}
</div>
{% endblock content %}

View file

@ -34,6 +34,47 @@
</div>
</div>
</form>
<form action="/set_global_delivery_mode" method="post" class="mt-4">
<div class="settings-form-row mb-2">
<label for="delivery_mode" class="form-label mb-1">Default delivery mode for new feeds</label>
<div class="input-group input-group-lg">
<select id="delivery_mode"
class="form-select settings-input"
name="delivery_mode">
<option value="embed"
{% if global_delivery_mode == "embed" %}selected{% endif %}>Embed</option>
<option value="text"
{% if global_delivery_mode == "text" %}selected{% endif %}>Text</option>
</select>
<button class="btn btn-primary px-4" type="submit">Save</button>
</div>
<div class="form-text text-muted mt-2">
New feeds inherit this value. Existing feeds keep their current delivery mode.
</div>
</div>
</form>
<form action="/set_global_screenshot_layout" method="post" class="mt-4">
<div class="settings-form-row mb-2">
<label for="screenshot_layout" class="form-label mb-1">Default screenshot layout for new feeds</label>
<div class="input-group input-group-lg">
<select id="screenshot_layout"
class="form-select settings-input"
name="screenshot_layout">
<option value="desktop"
{% if global_screenshot_layout == "desktop" %}selected{% endif %}>Desktop</option>
<option value="mobile"
{% if global_screenshot_layout == "mobile" %}selected{% endif %}>Mobile</option>
</select>
<button class="btn btn-primary px-4" type="submit">Save</button>
</div>
<div class="form-text text-muted mt-2">
New feeds inherit this value. Existing feeds keep their current screenshot layout.
</div>
<div class="form-text screenshot-requirement mt-1">
Screenshot mode requires Chromium to be installed for Playwright. Run <code>uv run playwright install chromium</code> once on this machine.
</div>
</div>
</form>
</section>
<section class="mt-5">
<div class="text-light">

View file

@ -13,6 +13,7 @@ dependencies = [
"lxml",
"markdownify",
"platformdirs",
"playwright",
"python-dotenv",
"python-multipart",
"reader",

View file

@ -1,5 +1,6 @@
from __future__ import annotations
import asyncio
import os
import tempfile
from pathlib import Path
@ -16,10 +17,16 @@ from reader import StorageError
from reader import make_reader
from discord_rss_bot import feeds
from discord_rss_bot.feeds import capture_full_page_screenshot
from discord_rss_bot.feeds import create_feed
from discord_rss_bot.feeds import create_screenshot_webhook
from discord_rss_bot.feeds import execute_webhook
from discord_rss_bot.feeds import extract_domain
from discord_rss_bot.feeds import get_entry_delivery_mode
from discord_rss_bot.feeds import get_screenshot_layout
from discord_rss_bot.feeds import get_webhook_url
from discord_rss_bot.feeds import is_youtube_feed
from discord_rss_bot.feeds import screenshot_filename_for_entry
from discord_rss_bot.feeds import send_discord_quest_notification
from discord_rss_bot.feeds import send_entry_to_discord
from discord_rss_bot.feeds import send_to_discord
@ -153,6 +160,450 @@ def test_should_send_embed_check_normal_feeds(mock_logger: MagicMock) -> None:
assert result is False, "Normal feeds should not use embeds when disabled"
def test_get_entry_delivery_mode_prefers_delivery_mode_tag() -> None:
reader = MagicMock()
entry = MagicMock()
entry.feed.url = "https://example.com/feed.xml"
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
"delivery_mode": "screenshot",
"should_send_embed": True,
}.get(key, default)
result = get_entry_delivery_mode(reader, entry)
assert result == "screenshot"
def test_get_entry_delivery_mode_falls_back_to_legacy_embed_flag() -> None:
reader = MagicMock()
entry = MagicMock()
entry.feed.url = "https://example.com/feed.xml"
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
"delivery_mode": "",
"should_send_embed": False,
}.get(key, default)
result = get_entry_delivery_mode(reader, entry)
assert result == "text"
@patch("discord_rss_bot.feeds.execute_webhook")
@patch("discord_rss_bot.feeds.create_text_webhook")
@patch("discord_rss_bot.feeds.create_hoyolab_webhook")
@patch("discord_rss_bot.feeds.fetch_hoyolab_post")
def test_send_entry_to_discord_hoyolab_text_mode_uses_text_webhook(
mock_fetch_hoyolab_post: MagicMock,
mock_create_hoyolab_webhook: MagicMock,
mock_create_text_webhook: MagicMock,
mock_execute_webhook: MagicMock,
) -> None:
entry = MagicMock()
entry.id = "entry-1"
entry.feed.url = "https://feeds.c3kay.de/hoyolab.xml"
entry.feed_url = "https://feeds.c3kay.de/hoyolab.xml"
entry.link = "https://www.hoyolab.com/article/38588239"
reader = MagicMock()
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
"webhook": "https://discord.test/webhook",
"delivery_mode": "text",
}.get(key, default)
text_webhook = MagicMock()
mock_create_text_webhook.return_value = text_webhook
result = send_entry_to_discord(entry, reader)
assert result is None
mock_fetch_hoyolab_post.assert_not_called()
mock_create_hoyolab_webhook.assert_not_called()
mock_create_text_webhook.assert_called_once_with(
"https://discord.test/webhook",
entry,
reader=reader,
use_default_message_on_empty=False,
)
mock_execute_webhook.assert_called_once_with(text_webhook, entry, reader=reader)
@patch("discord_rss_bot.feeds.execute_webhook")
@patch("discord_rss_bot.feeds.create_screenshot_webhook")
@patch("discord_rss_bot.feeds.create_hoyolab_webhook")
@patch("discord_rss_bot.feeds.fetch_hoyolab_post")
def test_send_entry_to_discord_hoyolab_screenshot_mode_uses_screenshot_webhook(
mock_fetch_hoyolab_post: MagicMock,
mock_create_hoyolab_webhook: MagicMock,
mock_create_screenshot_webhook: MagicMock,
mock_execute_webhook: MagicMock,
) -> None:
entry = MagicMock()
entry.id = "entry-2"
entry.feed.url = "https://feeds.c3kay.de/hoyolab.xml"
entry.feed_url = "https://feeds.c3kay.de/hoyolab.xml"
entry.link = "https://www.hoyolab.com/article/38588239"
reader = MagicMock()
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
"webhook": "https://discord.test/webhook",
"delivery_mode": "screenshot",
}.get(key, default)
screenshot_webhook = MagicMock()
mock_create_screenshot_webhook.return_value = screenshot_webhook
result = send_entry_to_discord(entry, reader)
assert result is None
mock_fetch_hoyolab_post.assert_not_called()
mock_create_hoyolab_webhook.assert_not_called()
mock_create_screenshot_webhook.assert_called_once_with(
"https://discord.test/webhook",
entry,
reader=reader,
)
mock_execute_webhook.assert_called_once_with(screenshot_webhook, entry, reader=reader)
@patch("discord_rss_bot.feeds.execute_webhook")
@patch("discord_rss_bot.feeds.create_embed_webhook")
@patch("discord_rss_bot.feeds.create_hoyolab_webhook")
@patch("discord_rss_bot.feeds.fetch_hoyolab_post")
def test_send_entry_to_discord_hoyolab_embed_mode_uses_hoyolab_webhook(
mock_fetch_hoyolab_post: MagicMock,
mock_create_hoyolab_webhook: MagicMock,
mock_create_embed_webhook: MagicMock,
mock_execute_webhook: MagicMock,
) -> None:
entry = MagicMock()
entry.id = "entry-3"
entry.feed.url = "https://feeds.c3kay.de/hoyolab.xml"
entry.feed_url = "https://feeds.c3kay.de/hoyolab.xml"
entry.link = "https://www.hoyolab.com/article/38588239"
reader = MagicMock()
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
"webhook": "https://discord.test/webhook",
"delivery_mode": "embed",
}.get(key, default)
mock_fetch_hoyolab_post.return_value = {"post": {"subject": "News"}}
hoyolab_webhook = MagicMock()
mock_create_hoyolab_webhook.return_value = hoyolab_webhook
result = send_entry_to_discord(entry, reader)
assert result is None
mock_fetch_hoyolab_post.assert_called_once_with("38588239")
mock_create_hoyolab_webhook.assert_called_once_with(
"https://discord.test/webhook",
entry,
{"post": {"subject": "News"}},
)
mock_create_embed_webhook.assert_not_called()
mock_execute_webhook.assert_called_once_with(hoyolab_webhook, entry, reader=reader)
def test_get_screenshot_layout_prefers_mobile_tag() -> None:
reader = MagicMock()
feed = MagicMock()
feed.url = "https://example.com/feed.xml"
reader.get_tag.return_value = "mobile"
result = get_screenshot_layout(reader, feed)
assert result == "mobile"
def test_get_screenshot_layout_defaults_to_desktop() -> None:
reader = MagicMock()
feed = MagicMock()
feed.url = "https://example.com/feed.xml"
reader.get_tag.return_value = "unknown"
result = get_screenshot_layout(reader, feed)
assert result == "desktop"
def test_create_feed_inherits_global_screenshot_layout() -> None:
reader = MagicMock()
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
"webhooks": [{"name": "Main", "url": "https://discord.com/api/webhooks/123/abc"}],
"screenshot_layout": "mobile",
}.get(key, default)
create_feed(reader, "https://example.com/feed.xml", "Main")
reader.set_tag.assert_any_call("https://example.com/feed.xml", "screenshot_layout", "mobile")
def test_create_feed_inherits_global_text_delivery_mode() -> None:
reader = MagicMock()
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
"webhooks": [{"name": "Main", "url": "https://discord.com/api/webhooks/123/abc"}],
"screenshot_layout": "desktop",
"delivery_mode": "text",
}.get(key, default)
create_feed(reader, "https://example.com/feed.xml", "Main")
reader.set_tag.assert_any_call("https://example.com/feed.xml", "delivery_mode", "text")
reader.set_tag.assert_any_call("https://example.com/feed.xml", "should_send_embed", False)
def test_create_feed_falls_back_to_embed_when_global_delivery_mode_is_invalid() -> None:
reader = MagicMock()
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
"webhooks": [{"name": "Main", "url": "https://discord.com/api/webhooks/123/abc"}],
"screenshot_layout": "desktop",
"delivery_mode": "invalid",
}.get(key, default)
create_feed(reader, "https://example.com/feed.xml", "Main")
reader.set_tag.assert_any_call("https://example.com/feed.xml", "delivery_mode", "embed")
reader.set_tag.assert_any_call("https://example.com/feed.xml", "should_send_embed", True)
@patch("discord_rss_bot.feeds.capture_full_page_screenshot")
@patch("discord_rss_bot.feeds.DiscordWebhook")
def test_create_screenshot_webhook_adds_image_file(
mock_discord_webhook: MagicMock,
mock_capture: MagicMock,
) -> None:
mock_capture.return_value = b"png-bytes"
webhook = MagicMock()
mock_discord_webhook.return_value = webhook
entry = MagicMock()
entry.id = "entry-abc"
entry.link = "https://example.com/article"
reader = MagicMock()
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
"screenshot_layout": "mobile",
}.get(key, default)
result = create_screenshot_webhook("https://discord.com/api/webhooks/123/abc", entry, reader)
assert result == webhook
mock_discord_webhook.assert_called_once_with(
url="https://discord.com/api/webhooks/123/abc",
content="<https://example.com/article>",
rate_limit_retry=True,
)
mock_capture.assert_called_once_with(
"https://example.com/article",
screenshot_layout="mobile",
screenshot_type="png",
)
webhook.add_file.assert_called_once()
@patch("discord_rss_bot.feeds.capture_full_page_screenshot")
@patch("discord_rss_bot.feeds.DiscordWebhook")
def test_create_screenshot_webhook_retries_jpeg_when_png_too_large(
mock_discord_webhook: MagicMock,
mock_capture: MagicMock,
) -> None:
oversized_png = b"x" * (8 * 1024 * 1024 + 1024)
compressed_jpeg = b"y" * (7 * 1024 * 1024)
mock_capture.side_effect = [oversized_png, compressed_jpeg]
webhook = MagicMock()
mock_discord_webhook.return_value = webhook
entry = MagicMock()
entry.id = "entry-large"
entry.link = "https://example.com/large-article"
reader = MagicMock()
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
"screenshot_layout": "desktop",
}.get(key, default)
result = create_screenshot_webhook("https://discord.com/api/webhooks/123/abc", entry, reader)
assert result == webhook
assert mock_capture.call_count == 2
assert mock_capture.call_args_list[0].kwargs == {
"screenshot_layout": "desktop",
"screenshot_type": "png",
}
assert mock_capture.call_args_list[1].kwargs == {
"screenshot_layout": "desktop",
"screenshot_type": "jpeg",
"jpeg_quality": 85,
}
webhook.add_file.assert_called_once()
@patch("discord_rss_bot.feeds.create_text_webhook")
@patch("discord_rss_bot.feeds.capture_full_page_screenshot")
def test_create_screenshot_webhook_falls_back_when_all_formats_too_large(
mock_capture: MagicMock,
mock_create_text_webhook: MagicMock,
) -> None:
oversized_bytes = b"z" * (9 * 1024 * 1024)
# 1 PNG attempt + 4 JPEG quality attempts
mock_capture.side_effect = [oversized_bytes, oversized_bytes, oversized_bytes, oversized_bytes, oversized_bytes]
fallback_webhook = MagicMock()
mock_create_text_webhook.return_value = fallback_webhook
entry = MagicMock()
entry.id = "entry-too-large"
entry.link = "https://example.com/very-large"
reader = MagicMock()
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
"screenshot_layout": "desktop",
}.get(key, default)
result = create_screenshot_webhook("https://discord.com/api/webhooks/123/abc", entry, reader)
assert result == fallback_webhook
assert mock_capture.call_count == 5
mock_create_text_webhook.assert_called_once_with(
"https://discord.com/api/webhooks/123/abc",
entry,
reader=reader,
use_default_message_on_empty=True,
)
@patch("discord_rss_bot.feeds.capture_full_page_screenshot")
@patch("discord_rss_bot.feeds.create_text_webhook")
def test_create_screenshot_webhook_falls_back_when_entry_has_no_link(
mock_create_text_webhook: MagicMock,
mock_capture: MagicMock,
) -> None:
entry = MagicMock()
entry.id = "entry-no-link"
entry.link = None
reader = MagicMock()
fallback_webhook = MagicMock()
mock_create_text_webhook.return_value = fallback_webhook
result = create_screenshot_webhook("https://discord.com/api/webhooks/123/abc", entry, reader)
assert result == fallback_webhook
mock_capture.assert_not_called()
mock_create_text_webhook.assert_called_once_with(
"https://discord.com/api/webhooks/123/abc",
entry,
reader=reader,
use_default_message_on_empty=True,
)
def test_screenshot_filename_for_entry_custom_extension() -> None:
entry = MagicMock()
entry.id = "hello/world?id=123"
filename = screenshot_filename_for_entry(entry, extension="JPG")
assert filename.endswith(".jpg")
assert "/" not in filename
assert "?" not in filename
@patch("discord_rss_bot.feeds._capture_full_page_screenshot_sync", return_value=b"jpeg-bytes")
def test_capture_full_page_screenshot_forwards_jpeg_options(mock_capture_sync: MagicMock) -> None:
result = capture_full_page_screenshot(
"https://example.com/article",
screenshot_layout="mobile",
screenshot_type="jpeg",
jpeg_quality=55,
)
assert result == b"jpeg-bytes"
mock_capture_sync.assert_called_once_with(
"https://example.com/article",
screenshot_layout="mobile",
screenshot_type="jpeg",
jpeg_quality=55,
)
@patch("discord_rss_bot.feeds.create_text_webhook")
@patch("discord_rss_bot.feeds.capture_full_page_screenshot")
def test_create_screenshot_webhook_falls_back_to_text_on_failure(
mock_capture: MagicMock,
mock_create_text_webhook: MagicMock,
) -> None:
mock_capture.return_value = None
fallback_webhook = MagicMock()
mock_create_text_webhook.return_value = fallback_webhook
entry = MagicMock()
entry.id = "entry-def"
entry.link = "https://example.com/article"
reader = MagicMock()
result = create_screenshot_webhook("https://discord.com/api/webhooks/123/abc", entry, reader)
assert result == fallback_webhook
mock_create_text_webhook.assert_called_once_with(
"https://discord.com/api/webhooks/123/abc",
entry,
reader=reader,
use_default_message_on_empty=True,
)
def test_capture_full_page_screenshot_uses_thread_when_loop_running() -> None:
"""Capture should offload sync Playwright work when called from an active event loop."""
with patch("discord_rss_bot.feeds._capture_full_page_screenshot_sync", return_value=b"png") as mock_capture_sync:
async def run_capture() -> bytes | None:
return feeds.capture_full_page_screenshot(
"https://example.com/article",
screenshot_layout="desktop",
screenshot_type="png",
)
result = asyncio.run(run_capture())
assert result == b"png"
mock_capture_sync.assert_called_once_with(
"https://example.com/article",
screenshot_layout="desktop",
screenshot_type="png",
jpeg_quality=85,
)
@patch("discord_rss_bot.feeds.get_entry_delivery_mode")
@patch("discord_rss_bot.feeds.create_screenshot_webhook")
@patch("discord_rss_bot.feeds.execute_webhook")
def test_send_entry_to_discord_uses_screenshot_mode(
mock_execute_webhook: MagicMock,
mock_create_screenshot_webhook: MagicMock,
mock_get_entry_delivery_mode: MagicMock,
) -> None:
reader = MagicMock()
entry = MagicMock()
entry.feed.url = "https://example.com/feed.xml"
entry.feed_url = "https://example.com/feed.xml"
reader.get_tag.side_effect = lambda resource, key, default=None: { # noqa: ARG005
"webhook": "https://discord.com/api/webhooks/123/abc",
}.get(key, default)
mock_get_entry_delivery_mode.return_value = "screenshot"
screenshot_webhook = MagicMock()
mock_create_screenshot_webhook.return_value = screenshot_webhook
send_entry_to_discord(entry, reader)
mock_create_screenshot_webhook.assert_called_once_with(
"https://discord.com/api/webhooks/123/abc",
entry,
reader=reader,
)
mock_execute_webhook.assert_called_once_with(screenshot_webhook, entry, reader=reader)
@patch("discord_rss_bot.feeds.get_reader")
@patch("discord_rss_bot.feeds.get_custom_message")
@patch("discord_rss_bot.feeds.replace_tags_in_text_message")

View file

@ -436,6 +436,77 @@ def test_post_use_text_triggers_backup(monkeypatch: pytest.MonkeyPatch, tmp_path
assert test_feed_url in commit_message
def test_post_use_screenshot_triggers_backup(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""Posting to /use_screenshot should trigger a git backup."""
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
with patch("discord_rss_bot.main.commit_state_change") as mock_commit:
response = client.post(url="/use_screenshot", data={"feed_url": test_feed_url})
assert response.status_code == 200, f"Failed to enable screenshot mode: {response.text}"
mock_commit.assert_called_once()
call_args = mock_commit.call_args
assert call_args is not None
commit_message: str = call_args[0][1]
assert "Enable screenshot mode" in commit_message
assert test_feed_url in commit_message
def test_post_use_screenshot_mobile_triggers_backup(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""Posting to /use_screenshot_mobile should trigger a git backup."""
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
with patch("discord_rss_bot.main.commit_state_change") as mock_commit:
response = client.post(url="/use_screenshot_mobile", data={"feed_url": test_feed_url})
assert response.status_code == 200, f"Failed to enable screenshot mobile layout: {response.text}"
mock_commit.assert_called_once()
call_args = mock_commit.call_args
assert call_args is not None
commit_message: str = call_args[0][1]
assert "Enable screenshot mobile layout" in commit_message
assert test_feed_url in commit_message
def test_post_use_screenshot_desktop_triggers_backup(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""Posting to /use_screenshot_desktop should trigger a git backup."""
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
with patch("discord_rss_bot.main.commit_state_change") as mock_commit:
response = client.post(url="/use_screenshot_desktop", data={"feed_url": test_feed_url})
assert response.status_code == 200, f"Failed to enable screenshot desktop layout: {response.text}"
mock_commit.assert_called_once()
call_args = mock_commit.call_args
assert call_args is not None
commit_message: str = call_args[0][1]
assert "Enable screenshot desktop layout" in commit_message
assert test_feed_url in commit_message
def test_post_set_global_screenshot_layout_triggers_backup(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""Posting to /set_global_screenshot_layout should trigger a git backup."""
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
with patch("discord_rss_bot.main.commit_state_change") as mock_commit:
response = client.post(url="/set_global_screenshot_layout", data={"screenshot_layout": "mobile"})
assert response.status_code == 200, f"Failed to set global screenshot layout: {response.text}"
mock_commit.assert_called_once()
call_args = mock_commit.call_args
assert call_args is not None
commit_message: str = call_args[0][1]
assert "Set global screenshot layout to mobile" in commit_message
def test_post_custom_message_triggers_backup(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""Posting to /custom should trigger a git backup."""
backup_path: Path = tmp_path / "backup"

View file

@ -1,5 +1,6 @@
from __future__ import annotations
import contextlib
import re
import urllib.parse
from dataclasses import dataclass
@ -169,6 +170,66 @@ def test_get() -> None:
assert response.status_code == 200, f"/whitelist failed: {response.text}"
def test_settings_page_shows_screenshot_layout_setting() -> None:
response: Response = client.get(url="/settings")
assert response.status_code == 200, f"/settings failed: {response.text}"
assert "Default delivery mode for new feeds" in response.text
assert "Default screenshot layout for new feeds" in response.text
assert "uv run playwright install chromium" in response.text
def test_set_global_delivery_mode() -> None:
response: Response = client.post(url="/set_global_delivery_mode", data={"delivery_mode": "text"})
assert response.status_code == 200, f"Failed to set global delivery mode: {response.text}"
response = client.get(url="/settings")
assert response.status_code == 200, f"/settings failed after setting delivery mode: {response.text}"
assert re.search(r"<option\s+value=\"text\"[^>]*\bselected\b", response.text)
def test_add_page_shows_global_default_delivery_mode_hint() -> None:
response: Response = client.post(url="/set_global_delivery_mode", data={"delivery_mode": "text"})
assert response.status_code == 200, f"Failed to set global delivery mode: {response.text}"
response = client.get(url="/add")
assert response.status_code == 200, f"/add failed: {response.text}"
assert "New feeds currently default to" in response.text
assert "text" in response.text
def test_c3kay_feed_delivery_mode_toggle_routes_update_stored_tags() -> None:
reader = get_reader_dependency()
c3kay_feed_url = "https://feeds.c3kay.de/hoyolab-ui-toggle-test.xml"
with contextlib.suppress(Exception):
reader.add_feed(c3kay_feed_url)
response: Response = client.post(url="/use_text", data={"feed_url": c3kay_feed_url})
assert response.status_code == 200, f"Failed to set text mode: {response.text}"
assert reader.get_tag(c3kay_feed_url, "delivery_mode") == "text"
assert reader.get_tag(c3kay_feed_url, "should_send_embed") is False
response = client.post(url="/use_screenshot_mobile", data={"feed_url": c3kay_feed_url})
assert response.status_code == 200, f"Failed to set screenshot mobile mode: {response.text}"
assert reader.get_tag(c3kay_feed_url, "delivery_mode") == "screenshot"
assert reader.get_tag(c3kay_feed_url, "screenshot_layout") == "mobile"
assert reader.get_tag(c3kay_feed_url, "should_send_embed") is False
response = client.post(url="/use_embed", data={"feed_url": c3kay_feed_url})
assert response.status_code == 200, f"Failed to set embed mode: {response.text}"
assert reader.get_tag(c3kay_feed_url, "delivery_mode") == "embed"
assert reader.get_tag(c3kay_feed_url, "should_send_embed") is True
def test_set_global_screenshot_layout() -> None:
response: Response = client.post(url="/set_global_screenshot_layout", data={"screenshot_layout": "mobile"})
assert response.status_code == 200, f"Failed to set global screenshot layout: {response.text}"
response = client.get(url="/settings")
assert response.status_code == 200, f"/settings failed after setting layout: {response.text}"
assert re.search(r"<option\s+value=\"mobile\"[^>]*\bselected\b", response.text)
def test_pause_feed() -> None:
"""Test the /pause_feed page."""
# Ensure webhook exists for this test regardless of test order.

View file

@ -61,3 +61,83 @@ def test_get_webhook_for_entry() -> None:
# Close the reader, so we can delete the directory.
reader.close()
def test_get_reader_sets_default_global_screenshot_layout() -> None:
"""get_reader should initialize global screenshot layout to desktop when missing."""
get_reader.cache_clear()
with tempfile.TemporaryDirectory() as temp_dir:
Path.mkdir(Path(temp_dir), exist_ok=True)
custom_loc: pathlib.Path = pathlib.Path(temp_dir, "screenshot_default_db.sqlite")
reader: Reader = get_reader(custom_location=custom_loc)
screenshot_layout = reader.get_tag((), "screenshot_layout", None)
assert screenshot_layout == "desktop", (
f"Expected default global screenshot layout to be 'desktop', got: {screenshot_layout}"
)
reader.close()
get_reader.cache_clear()
def test_get_reader_preserves_existing_global_screenshot_layout() -> None:
"""get_reader should not overwrite an existing global screenshot layout value."""
get_reader.cache_clear()
with tempfile.TemporaryDirectory() as temp_dir:
Path.mkdir(Path(temp_dir), exist_ok=True)
custom_loc: pathlib.Path = pathlib.Path(temp_dir, "screenshot_existing_db.sqlite")
first_reader: Reader = get_reader(custom_location=custom_loc)
first_reader.set_tag((), "screenshot_layout", "mobile") # pyright: ignore[reportArgumentType]
first_reader.close()
get_reader.cache_clear()
second_reader: Reader = get_reader(custom_location=custom_loc)
screenshot_layout = second_reader.get_tag((), "screenshot_layout", None)
assert screenshot_layout == "mobile", (
f"Expected existing global screenshot layout to stay 'mobile', got: {screenshot_layout}"
)
second_reader.close()
get_reader.cache_clear()
def test_get_reader_sets_default_global_delivery_mode() -> None:
"""get_reader should initialize global delivery mode to embed when missing."""
get_reader.cache_clear()
with tempfile.TemporaryDirectory() as temp_dir:
Path.mkdir(Path(temp_dir), exist_ok=True)
custom_loc: pathlib.Path = pathlib.Path(temp_dir, "delivery_mode_default_db.sqlite")
reader: Reader = get_reader(custom_location=custom_loc)
delivery_mode = reader.get_tag((), "delivery_mode", None)
assert delivery_mode == "embed", f"Expected default global delivery mode to be 'embed', got: {delivery_mode}"
reader.close()
get_reader.cache_clear()
def test_get_reader_preserves_existing_global_delivery_mode() -> None:
"""get_reader should not overwrite an existing global delivery mode value."""
get_reader.cache_clear()
with tempfile.TemporaryDirectory() as temp_dir:
Path.mkdir(Path(temp_dir), exist_ok=True)
custom_loc: pathlib.Path = pathlib.Path(temp_dir, "delivery_mode_existing_db.sqlite")
first_reader: Reader = get_reader(custom_location=custom_loc)
first_reader.set_tag((), "delivery_mode", "text") # pyright: ignore[reportArgumentType]
first_reader.close()
get_reader.cache_clear()
second_reader: Reader = get_reader(custom_location=custom_loc)
delivery_mode = second_reader.get_tag((), "delivery_mode", None)
assert delivery_mode == "text", f"Expected existing global delivery mode to stay 'text', got: {delivery_mode}"
second_reader.close()
get_reader.cache_clear()