Joakim Hellsén 2026-03-07 01:01:09 +01:00
commit e8bd528def
Signed by: Joakim Hellsén
SSH key fingerprint: SHA256:/9h/CsExpFp+PRhsfA0xznFx2CGfTT5R/kpuFfUgEQk
16 changed files with 1062 additions and 89 deletions

19
.env.example Normal file
View file

@ -0,0 +1,19 @@
# You can optionally store backups of your bot's configuration in a git repository.
# This allows you to track changes by subscribing to the repository or using a RSS feed.
# Local path for the backup git repository (e.g., /data/backup or /home/user/backups/discord-rss-bot)
# When set, the bot will initialize a git repo here and commit state.json after every configuration change
# GIT_BACKUP_PATH=
# Remote URL for pushing backup commits (e.g., git@github.com:username/private-config.git)
# Optional - only set if you want automatic pushes to a remote repository
# Leave empty to keep git history local only
# GIT_BACKUP_REMOTE=
# Sentry Configuration (Optional)
# Sentry DSN for error tracking and monitoring
# Leave empty to disable Sentry integration
# SENTRY_DSN=
# Testing Configuration
# Discord webhook URL used for testing (optional, only needed when running tests)
# TEST_WEBHOOK_URL=

View file

@ -2,6 +2,10 @@
Subscribe to RSS feeds and get updates to a Discord webhook.
Email: [tlovinator@gmail.com](mailto:tlovinator@gmail.com)
Discord: TheLovinator#9276
## Features
- Subscribe to RSS feeds and get updates to a Discord webhook.
@ -58,8 +62,49 @@ or [install directly on your computer](#install-directly-on-your-computer).
- Use [Windows Task Scheduler](https://en.wikipedia.org/wiki/Windows_Task_Scheduler).
- Or add a shortcut to `%userprofile%\AppData\Roaming\Microsoft\Windows\Start Menu\Programs\Startup`.
## Contact
## Git Backup (State Version Control)
Email: [tlovinator@gmail.com](mailto:tlovinator@gmail.com)
The bot can commit every configuration change (adding/removing feeds, webhook
changes, blacklist/whitelist updates) to a separate private Git repository so
you get a full, auditable history of state changes — similar to `etckeeper`.
Discord: TheLovinator#9276
### Configuration
Set the following environment variables (e.g. in `docker-compose.yml` or a
`.env` file):
| Variable | Required | Description |
| ------------------- | -------- | ----------------------------------------------------------------------------------------------------------------------------------- |
| `GIT_BACKUP_PATH` | Yes | Local path where the backup git repository is stored. The bot will initialise it automatically if it does not yet exist. |
| `GIT_BACKUP_REMOTE` | No | Remote URL to push to after each commit (e.g. `git@github.com:you/private-config.git`). Leave unset to keep the history local only. |
### What is backed up
After every relevant change a `state.json` file is written and committed.
The file contains:
- All feed URLs together with their webhook URL, custom message, embed
settings, and any blacklist/whitelist filters.
- The global list of Discord webhooks.
### Docker example
```yaml
services:
discord-rss-bot:
image: ghcr.io/thelovinator1/discord-rss-bot:latest
volumes:
- ./data:/data
environment:
- GIT_BACKUP_PATH=/data/backup
- GIT_BACKUP_REMOTE=git@github.com:you/private-config.git
```
For SSH-based remotes mount your SSH key into the container and make sure the
host key is trusted, e.g.:
```yaml
volumes:
- ./data:/data
- ~/.ssh:/root/.ssh:ro
```

View file

@ -4,12 +4,15 @@ import urllib.parse
from functools import lru_cache
from typing import TYPE_CHECKING
from discord_rss_bot.filter.blacklist import entry_should_be_skipped, feed_has_blacklist_tags
from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent
from discord_rss_bot.filter.blacklist import entry_should_be_skipped
from discord_rss_bot.filter.blacklist import feed_has_blacklist_tags
from discord_rss_bot.filter.whitelist import has_white_tags
from discord_rss_bot.filter.whitelist import should_be_sent
from discord_rss_bot.settings import get_reader
if TYPE_CHECKING:
from reader import Entry, Reader
from reader import Entry
from reader import Reader
# Our reader
reader: Reader = get_reader()

View file

@ -5,9 +5,13 @@ import json
import logging
from dataclasses import dataclass
from bs4 import BeautifulSoup, Tag
from bs4 import BeautifulSoup
from bs4 import Tag
from markdownify import markdownify
from reader import Entry, Feed, Reader, TagNotFoundError
from reader import Entry
from reader import Feed
from reader import Reader
from reader import TagNotFoundError
from discord_rss_bot.is_url_valid import is_url_valid
from discord_rss_bot.settings import get_reader

View file

@ -5,42 +5,41 @@ import logging
import os
import pprint
import re
from typing import TYPE_CHECKING, Any
from urllib.parse import ParseResult, urlparse
from typing import TYPE_CHECKING
from typing import Any
from urllib.parse import ParseResult
from urllib.parse import urlparse
import tldextract
from discord_webhook import DiscordEmbed, DiscordWebhook
from discord_webhook import DiscordEmbed
from discord_webhook import DiscordWebhook
from fastapi import HTTPException
from markdownify import markdownify
from reader import (
Entry,
EntryNotFoundError,
Feed,
FeedExistsError,
FeedNotFoundError,
Reader,
ReaderError,
StorageError,
TagNotFoundError,
)
from reader import Entry
from reader import EntryNotFoundError
from reader import Feed
from reader import FeedExistsError
from reader import FeedNotFoundError
from reader import Reader
from reader import ReaderError
from reader import StorageError
from reader import TagNotFoundError
from discord_rss_bot.custom_message import (
CustomEmbed,
get_custom_message,
replace_tags_in_embed,
replace_tags_in_text_message,
)
from discord_rss_bot.custom_message import CustomEmbed
from discord_rss_bot.custom_message import get_custom_message
from discord_rss_bot.custom_message import replace_tags_in_embed
from discord_rss_bot.custom_message import replace_tags_in_text_message
from discord_rss_bot.filter.blacklist import entry_should_be_skipped
from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent
from discord_rss_bot.hoyolab_api import (
create_hoyolab_webhook,
extract_post_id_from_hoyolab_url,
fetch_hoyolab_post,
is_c3kay_feed,
)
from discord_rss_bot.filter.whitelist import has_white_tags
from discord_rss_bot.filter.whitelist import should_be_sent
from discord_rss_bot.hoyolab_api import create_hoyolab_webhook
from discord_rss_bot.hoyolab_api import extract_post_id_from_hoyolab_url
from discord_rss_bot.hoyolab_api import fetch_hoyolab_post
from discord_rss_bot.hoyolab_api import is_c3kay_feed
from discord_rss_bot.is_url_valid import is_url_valid
from discord_rss_bot.missing_tags import add_missing_tags
from discord_rss_bot.settings import default_custom_message, get_reader
from discord_rss_bot.settings import default_custom_message
from discord_rss_bot.settings import get_reader
if TYPE_CHECKING:
from collections.abc import Iterable

View file

@ -0,0 +1,238 @@
"""Git backup module for committing bot state changes to a private repository.
Configure the backup by setting these environment variables:
- ``GIT_BACKUP_PATH``: Local filesystem path for the backup git repository.
When set, the bot will initialise a git repo there (if one doesn't exist)
and commit an export of its state after every relevant change.
- ``GIT_BACKUP_REMOTE``: Optional remote URL (e.g. ``git@github.com:you/private-repo.git``).
When set, every commit is followed by a ``git push`` to this remote.
The exported state is written as ``state.json`` inside the backup repo. It
contains the list of feeds together with their webhook URL, filter settings
(blacklist / whitelist, regex variants), custom messages and embed settings.
Global webhooks are also included.
Example docker-compose snippet::
environment:
- GIT_BACKUP_PATH=/data/backup
- GIT_BACKUP_REMOTE=git@github.com:you/private-config.git
"""
from __future__ import annotations
import json
import logging
import os
import shutil
import subprocess # noqa: S404
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
if TYPE_CHECKING:
from reader import Reader
logger: logging.Logger = logging.getLogger(__name__)
GIT_EXECUTABLE: str = shutil.which("git") or "git"
type TAG_VALUE = (
dict[str, str | int | float | bool | dict[str, Any] | list[Any] | None]
| list[str | int | float | bool | dict[str, Any] | list[Any] | None]
| None
)
"""Type alias for the value of a feed tag, which can be a nested structure of dicts and lists, or None."""
# Tags that are exported per-feed (empty values are omitted).
_FEED_TAGS: tuple[str, ...] = (
"webhook",
"custom_message",
"should_send_embed",
"embed",
"blacklist_title",
"blacklist_summary",
"blacklist_content",
"blacklist_author",
"regex_blacklist_title",
"regex_blacklist_summary",
"regex_blacklist_content",
"regex_blacklist_author",
"whitelist_title",
"whitelist_summary",
"whitelist_content",
"whitelist_author",
"regex_whitelist_title",
"regex_whitelist_summary",
"regex_whitelist_content",
"regex_whitelist_author",
)
def get_backup_path() -> Path | None:
"""Return the configured backup path, or *None* if not configured.
Returns:
Path to the backup repository, or None if ``GIT_BACKUP_PATH`` is unset.
"""
raw: str = os.environ.get("GIT_BACKUP_PATH", "").strip()
return Path(raw) if raw else None
def get_backup_remote() -> str:
"""Return the configured remote URL, or an empty string if not set.
Returns:
The remote URL string from ``GIT_BACKUP_REMOTE``, or ``""`` if unset.
"""
return os.environ.get("GIT_BACKUP_REMOTE", "").strip()
def setup_backup_repo(backup_path: Path) -> bool:
"""Ensure the backup directory exists and contains a git repository.
If the directory does not yet contain a ``.git`` folder a new repository is
initialised. A basic git identity is configured locally so that commits
succeed even in environments where a global ``~/.gitconfig`` is absent.
Args:
backup_path: Local path for the backup repository.
Returns:
``True`` if the repository is ready, ``False`` on any error.
"""
try:
backup_path.mkdir(parents=True, exist_ok=True)
git_dir: Path = backup_path / ".git"
if not git_dir.exists():
subprocess.run([GIT_EXECUTABLE, "init", str(backup_path)], check=True, capture_output=True) # noqa: S603
logger.info("Initialised git backup repository at %s", backup_path)
# Ensure a local identity exists so that `git commit` always works.
for key, value in (("user.email", "discord-rss-bot@localhost"), ("user.name", "discord-rss-bot")):
result: subprocess.CompletedProcess[bytes] = subprocess.run( # noqa: S603
[GIT_EXECUTABLE, "-C", str(backup_path), "config", "--local", key],
check=False,
capture_output=True,
)
if result.returncode != 0:
subprocess.run( # noqa: S603
[GIT_EXECUTABLE, "-C", str(backup_path), "config", "--local", key, value],
check=True,
capture_output=True,
)
# Configure the remote if GIT_BACKUP_REMOTE is set.
remote_url: str = get_backup_remote()
if remote_url:
# Check if remote "origin" already exists.
check_remote: subprocess.CompletedProcess[bytes] = subprocess.run( # noqa: S603
[GIT_EXECUTABLE, "-C", str(backup_path), "remote", "get-url", "origin"],
check=False,
capture_output=True,
)
if check_remote.returncode != 0:
# Remote doesn't exist, add it.
subprocess.run( # noqa: S603
[GIT_EXECUTABLE, "-C", str(backup_path), "remote", "add", "origin", remote_url],
check=True,
capture_output=True,
)
logger.info("Added remote 'origin' with URL: %s", remote_url)
else:
# Remote exists, update it if the URL has changed.
current_url: str = check_remote.stdout.decode().strip()
if current_url != remote_url:
subprocess.run( # noqa: S603
[GIT_EXECUTABLE, "-C", str(backup_path), "remote", "set-url", "origin", remote_url],
check=True,
capture_output=True,
)
logger.info("Updated remote 'origin' URL from %s to %s", current_url, remote_url)
except Exception:
logger.exception("Failed to set up git backup repository at %s", backup_path)
return False
return True
def export_state(reader: Reader, backup_path: Path) -> None:
"""Serialise the current bot state to ``state.json`` inside *backup_path*.
Args:
reader: The :class:`reader.Reader` instance to read state from.
backup_path: Destination directory for the exported ``state.json``.
"""
feeds_state: list[dict] = []
for feed in reader.get_feeds():
feed_data: dict = {"url": feed.url}
for tag in _FEED_TAGS:
try:
value: TAG_VALUE = reader.get_tag(feed, tag, None)
if value is not None and value != "": # noqa: PLC1901
feed_data[tag] = value
except Exception:
logger.exception("Failed to read tag '%s' for feed '%s' during state export", tag, feed.url)
feeds_state.append(feed_data)
try:
webhooks: list[str | int | float | bool | dict[str, Any] | list[Any] | None] = list(
reader.get_tag((), "webhooks", [])
)
except Exception: # noqa: BLE001
webhooks = []
state: dict = {"feeds": feeds_state, "webhooks": webhooks}
state_file: Path = backup_path / "state.json"
state_file.write_text(json.dumps(state, indent=2, default=str), encoding="utf-8")
def commit_state_change(reader: Reader, message: str) -> None:
"""Export current state and commit it to the backup repository.
This is a no-op when ``GIT_BACKUP_PATH`` is not configured. Errors are
logged but never raised so that a backup failure never interrupts normal
bot operation.
Args:
reader: The :class:`reader.Reader` instance to read state from.
message: Commit message describing the change (e.g. ``"Add feed example.com/rss.xml"``).
"""
backup_path: Path | None = get_backup_path()
if backup_path is None:
return
if not setup_backup_repo(backup_path):
return
try:
export_state(reader, backup_path)
subprocess.run([GIT_EXECUTABLE, "-C", str(backup_path), "add", "-A"], check=True, capture_output=True) # noqa: S603
# Only create a commit if there are staged changes.
diff_result: subprocess.CompletedProcess[bytes] = subprocess.run( # noqa: S603
[GIT_EXECUTABLE, "-C", str(backup_path), "diff", "--cached", "--exit-code"],
check=False,
capture_output=True,
)
if diff_result.returncode == 0:
logger.debug("No state changes to commit for: %s", message)
return
subprocess.run( # noqa: S603
[GIT_EXECUTABLE, "-C", str(backup_path), "commit", "-m", message],
check=True,
capture_output=True,
)
logger.info("Committed state change to backup repo: %s", message)
# Push to remote if configured.
if get_backup_remote():
subprocess.run( # noqa: S603
[GIT_EXECUTABLE, "-C", str(backup_path), "push", "origin", "HEAD"],
check=True,
capture_output=True,
)
logger.info("Pushed state change to remote 'origin': %s", message)
except Exception:
logger.exception("Failed to commit state change '%s' to backup repo", message)

View file

@ -1,6 +1,7 @@
from __future__ import annotations
from urllib.parse import ParseResult, urlparse
from urllib.parse import ParseResult
from urllib.parse import urlparse
def is_url_valid(url: str) -> bool:

View file

@ -7,48 +7,62 @@ import typing
import urllib.parse
from contextlib import asynccontextmanager
from dataclasses import dataclass
from datetime import UTC, datetime
from datetime import UTC
from datetime import datetime
from functools import lru_cache
from typing import TYPE_CHECKING, Annotated, cast
from typing import TYPE_CHECKING
from typing import Annotated
from typing import Any
from typing import cast
import httpx
import sentry_sdk
import uvicorn
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import FastAPI, Form, HTTPException, Request
from fastapi import FastAPI
from fastapi import Form
from fastapi import HTTPException
from fastapi import Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from httpx import Response
from markdownify import markdownify
from reader import Entry, EntryNotFoundError, Feed, FeedNotFoundError, Reader, TagNotFoundError
from reader import Entry
from reader import EntryNotFoundError
from reader import Feed
from reader import FeedNotFoundError
from reader import Reader
from reader import TagNotFoundError
from starlette.responses import RedirectResponse
from discord_rss_bot import settings
from discord_rss_bot.custom_filters import (
entry_is_blacklisted,
entry_is_whitelisted,
)
from discord_rss_bot.custom_message import (
CustomEmbed,
get_custom_message,
get_embed,
get_first_image,
replace_tags_in_text_message,
save_embed,
)
from discord_rss_bot.feeds import create_feed, extract_domain, send_entry_to_discord, send_to_discord
from discord_rss_bot.custom_filters import entry_is_blacklisted
from discord_rss_bot.custom_filters import entry_is_whitelisted
from discord_rss_bot.custom_message import CustomEmbed
from discord_rss_bot.custom_message import get_custom_message
from discord_rss_bot.custom_message import get_embed
from discord_rss_bot.custom_message import get_first_image
from discord_rss_bot.custom_message import replace_tags_in_text_message
from discord_rss_bot.custom_message import save_embed
from discord_rss_bot.feeds import create_feed
from discord_rss_bot.feeds import extract_domain
from discord_rss_bot.feeds import send_entry_to_discord
from discord_rss_bot.feeds import send_to_discord
from discord_rss_bot.git_backup import commit_state_change
from discord_rss_bot.git_backup import get_backup_path
from discord_rss_bot.missing_tags import add_missing_tags
from discord_rss_bot.search import create_search_context
from discord_rss_bot.settings import get_reader
if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Iterable
from collections.abc import AsyncGenerator
from collections.abc import Iterable
from reader.types import JSONType
LOGGING_CONFIG = {
LOGGING_CONFIG: dict[str, Any] = {
"version": 1,
"disable_existing_loggers": True,
"formatters": {
@ -130,11 +144,11 @@ async def post_add_webhook(
webhook_name: The name of the webhook.
webhook_url: The url of the webhook.
Raises:
HTTPException: If the webhook already exists.
Returns:
RedirectResponse: Redirect to the index page.
Raises:
HTTPException: If the webhook already exists.
"""
# Get current webhooks from the database if they exist otherwise use an empty list.
webhooks = list(reader.get_tag((), "webhooks", []))
@ -151,6 +165,8 @@ async def post_add_webhook(
reader.set_tag((), "webhooks", webhooks) # pyright: ignore[reportArgumentType]
commit_state_change(reader, f"Add webhook {webhook_name.strip()}")
return RedirectResponse(url="/", status_code=303)
# TODO(TheLovinator): Show this error on the page.
@ -165,11 +181,12 @@ async def post_delete_webhook(webhook_url: Annotated[str, Form()]) -> RedirectRe
Args:
webhook_url: The url of the webhook.
Returns:
RedirectResponse: Redirect to the index page.
Raises:
HTTPException: If the webhook could not be deleted
Returns:
RedirectResponse: Redirect to the index page.
"""
# TODO(TheLovinator): Check if the webhook is in use by any feeds before deleting it.
# TODO(TheLovinator): Replace HTTPException with a custom exception for both of these.
@ -196,6 +213,8 @@ async def post_delete_webhook(webhook_url: Annotated[str, Form()]) -> RedirectRe
# Add our new list of webhooks to the database.
reader.set_tag((), "webhooks", webhooks) # pyright: ignore[reportArgumentType]
commit_state_change(reader, f"Delete webhook {webhook_url.strip()}")
return RedirectResponse(url="/", status_code=303)
@ -215,6 +234,7 @@ async def post_create_feed(
"""
clean_feed_url: str = feed_url.strip()
create_feed(reader, feed_url, webhook_dropdown)
commit_state_change(reader, f"Add feed {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@ -286,6 +306,8 @@ async def post_set_whitelist(
reader.set_tag(clean_feed_url, "regex_whitelist_content", regex_whitelist_content) # pyright: ignore[reportArgumentType][call-overload]
reader.set_tag(clean_feed_url, "regex_whitelist_author", regex_whitelist_author) # pyright: ignore[reportArgumentType][call-overload]
commit_state_change(reader, f"Update whitelist for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@ -367,6 +389,7 @@ async def post_set_blacklist(
reader.set_tag(clean_feed_url, "regex_blacklist_summary", regex_blacklist_summary) # pyright: ignore[reportArgumentType][call-overload]
reader.set_tag(clean_feed_url, "regex_blacklist_content", regex_blacklist_content) # pyright: ignore[reportArgumentType][call-overload]
reader.set_tag(clean_feed_url, "regex_blacklist_author", regex_blacklist_author) # pyright: ignore[reportArgumentType][call-overload]
commit_state_change(reader, f"Update blacklist for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@ -433,6 +456,7 @@ async def post_set_custom(
reader.set_tag(feed_url, "custom_message", default_custom_message)
clean_feed_url: str = feed_url.strip()
commit_state_change(reader, f"Update custom message for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@ -552,6 +576,7 @@ async def post_embed(
# Save the data.
save_embed(reader, feed, custom_embed)
commit_state_change(reader, f"Update embed settings for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@ -567,6 +592,7 @@ async def post_use_embed(feed_url: Annotated[str, Form()]) -> RedirectResponse:
"""
clean_feed_url: str = feed_url.strip()
reader.set_tag(clean_feed_url, "should_send_embed", True) # pyright: ignore[reportArgumentType]
commit_state_change(reader, f"Enable embed mode for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@ -582,6 +608,7 @@ async def post_use_text(feed_url: Annotated[str, Form()]) -> RedirectResponse:
"""
clean_feed_url: str = feed_url.strip()
reader.set_tag(clean_feed_url, "should_send_embed", False) # pyright: ignore[reportArgumentType]
commit_state_change(reader, f"Disable embed mode for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@ -611,11 +638,11 @@ async def get_feed(feed_url: str, request: Request, starting_after: str = ""):
request: The request object.
starting_after: The entry to start after. Used for pagination.
Raises:
HTTPException: If the feed is not found.
Returns:
HTMLResponse: The feed page.
Raises:
HTTPException: If the feed is not found.
"""
entries_per_page: int = 20
@ -845,23 +872,25 @@ async def get_webhooks(request: Request):
@app.get("/", response_class=HTMLResponse)
def get_index(request: Request):
def get_index(request: Request, message: str = ""):
"""This is the root of the website.
Args:
request: The request object.
message: Optional message to display to the user.
Returns:
HTMLResponse: The index page.
"""
return templates.TemplateResponse(request=request, name="index.html", context=make_context_index(request))
return templates.TemplateResponse(request=request, name="index.html", context=make_context_index(request, message))
def make_context_index(request: Request):
def make_context_index(request: Request, message: str = ""):
"""Create the needed context for the index page.
Args:
request: The request object.
message: Optional message to display to the user.
Returns:
dict: The context for the index page.
@ -894,6 +923,7 @@ def make_context_index(request: Request):
"webhooks": hooks,
"broken_feeds": broken_feeds,
"feeds_without_attached_webhook": feeds_without_attached_webhook,
"messages": message or None,
}
@ -904,17 +934,20 @@ async def remove_feed(feed_url: Annotated[str, Form()]):
Args:
feed_url: The feed to add.
Raises:
HTTPException: Feed not found
Returns:
RedirectResponse: Redirect to the index page.
Raises:
HTTPException: Feed not found
"""
try:
reader.delete_feed(urllib.parse.unquote(feed_url))
except FeedNotFoundError as e:
raise HTTPException(status_code=404, detail="Feed not found") from e
commit_state_change(reader, f"Remove feed {urllib.parse.unquote(feed_url)}")
return RedirectResponse(url="/", status_code=303)
@ -926,11 +959,12 @@ async def update_feed(request: Request, feed_url: str):
request: The request object.
feed_url: The feed URL to update.
Raises:
HTTPException: If the feed is not found.
Returns:
RedirectResponse: Redirect to the feed page.
Raises:
HTTPException: If the feed is not found.
"""
try:
reader.update_feed(urllib.parse.unquote(feed_url))
@ -941,6 +975,33 @@ async def update_feed(request: Request, feed_url: str):
return RedirectResponse(url="/feed?feed_url=" + urllib.parse.quote(feed_url), status_code=303)
@app.post("/backup")
async def manual_backup(request: Request) -> RedirectResponse:
"""Manually trigger a git backup of the current state.
Args:
request: The request object.
Returns:
RedirectResponse: Redirect to the index page with a success or error message.
"""
backup_path = get_backup_path()
if backup_path is None:
message = "Git backup is not configured. Set GIT_BACKUP_PATH environment variable to enable backups."
logger.warning("Manual git backup attempted but GIT_BACKUP_PATH is not configured")
return RedirectResponse(url=f"/?message={urllib.parse.quote(message)}", status_code=303)
try:
commit_state_change(reader, "Manual backup triggered from web UI")
message = "Successfully created git backup!"
logger.info("Manual git backup completed successfully")
except Exception as e:
message = f"Failed to create git backup: {e}"
logger.exception("Manual git backup failed")
return RedirectResponse(url=f"/?message={urllib.parse.quote(message)}", status_code=303)
@app.get("/search", response_class=HTMLResponse)
async def search(request: Request, query: str):
"""Get entries matching a full-text search query.
@ -988,11 +1049,12 @@ def modify_webhook(old_hook: Annotated[str, Form()], new_hook: Annotated[str, Fo
old_hook: The webhook to modify.
new_hook: The new webhook.
Returns:
RedirectResponse: Redirect to the webhook page.
Raises:
HTTPException: Webhook could not be modified.
Returns:
RedirectResponse: Redirect to the webhook page.
"""
# Get current webhooks from the database if they exist otherwise use an empty list.
webhooks = list(reader.get_tag((), "webhooks", []))
@ -1042,11 +1104,11 @@ def extract_youtube_video_id(url: str) -> str | None:
# Handle standard YouTube URLs (youtube.com/watch?v=VIDEO_ID)
if "youtube.com/watch" in url and "v=" in url:
return url.split("v=")[1].split("&")[0]
return url.split("v=")[1].split("&", maxsplit=1)[0]
# Handle shortened YouTube URLs (youtu.be/VIDEO_ID)
if "youtu.be/" in url:
return url.split("youtu.be/")[1].split("?")[0]
return url.split("youtu.be/")[1].split("?", maxsplit=1)[0]
return None

View file

@ -1,8 +1,11 @@
from __future__ import annotations
from reader import Feed, Reader, TagNotFoundError
from reader import Feed
from reader import Reader
from reader import TagNotFoundError
from discord_rss_bot.settings import default_custom_embed, default_custom_message
from discord_rss_bot.settings import default_custom_embed
from discord_rss_bot.settings import default_custom_message
def add_custom_message(reader: Reader, feed: Feed) -> None:

View file

@ -8,7 +8,10 @@ from discord_rss_bot.settings import get_reader
if TYPE_CHECKING:
from collections.abc import Iterable
from reader import EntrySearchResult, Feed, HighlightedString, Reader
from reader import EntrySearchResult
from reader import Feed
from reader import HighlightedString
from reader import Reader
def create_search_context(query: str, custom_reader: Reader | None = None) -> dict:

View file

@ -5,7 +5,8 @@ from functools import lru_cache
from pathlib import Path
from platformdirs import user_data_dir
from reader import Reader, make_reader
from reader import Reader
from reader import make_reader
if typing.TYPE_CHECKING:
from reader.types import JSONType

View file

@ -1,6 +1,9 @@
<nav class="navbar navbar-expand-md navbar-dark p-2 mb-3 border-bottom border-warning">
<div class="container-fluid">
<button class="navbar-toggler ms-auto" type="button" data-bs-toggle="collapse" data-bs-target="#collapseNavbar">
<button class="navbar-toggler ms-auto"
type="button"
data-bs-toggle="collapse"
data-bs-target="#collapseNavbar">
<span class="navbar-toggler-icon"></span>
</button>
<div class="navbar-collapse collapse" id="collapseNavbar">
@ -16,10 +19,22 @@
<li class="nav-item">
<a class="nav-link" href="/webhooks">Webhooks</a>
</li>
<li class="nav-item nav-link d-none d-md-block">|</li>
<li class="nav-item">
<form action="/backup" method="post" class="d-inline">
<button type="submit"
class="nav-link btn btn-link text-decoration-none"
onclick="return confirm('Create a manual git backup of the current state?');">
Backup
</button>
</form>
</li>
</ul>
{# Search #}
<form action="/search" method="get" class="ms-auto w-50 input-group">
<input name="query" class="form-control bg-dark border-dark text-muted" type="search"
<input name="query"
class="form-control bg-dark border-dark text-muted"
type="search"
placeholder="Search" />
</form>
{# Donate button #}

View file

@ -28,14 +28,19 @@ dev = ["djlint", "pytest"]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.ruff]
preview = true
unsafe-fixes = true
fix = true
line-length = 120
lint.select = ["ALL"]
lint.unfixable = ["F841"] # Don't automatically remove unused variables
lint.pydocstyle.convention = "google"
lint.isort.required-imports = ["from __future__ import annotations"]
lint.pycodestyle.ignore-overlong-task-comments = true
lint.isort.force-single-line = true
lint.ignore = [
"ANN201", # Checks that public functions and methods have return type annotations.

570
tests/test_git_backup.py Normal file
View file

@ -0,0 +1,570 @@
from __future__ import annotations
import contextlib
import json
import shutil
import subprocess # noqa: S404
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
from unittest.mock import MagicMock
from unittest.mock import patch
import pytest
from fastapi.testclient import TestClient
from discord_rss_bot.git_backup import commit_state_change
from discord_rss_bot.git_backup import export_state
from discord_rss_bot.git_backup import get_backup_path
from discord_rss_bot.git_backup import get_backup_remote
from discord_rss_bot.git_backup import setup_backup_repo
from discord_rss_bot.main import app
if TYPE_CHECKING:
from pathlib import Path
SKIP_IF_NO_GIT: pytest.MarkDecorator = pytest.mark.skipif(
shutil.which("git") is None, reason="git executable not found"
)
def test_get_backup_path_unset(monkeypatch: pytest.MonkeyPatch) -> None:
"""get_backup_path returns None when GIT_BACKUP_PATH is not set."""
monkeypatch.delenv("GIT_BACKUP_PATH", raising=False)
assert get_backup_path() is None
def test_get_backup_path_set(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""get_backup_path returns a Path when GIT_BACKUP_PATH is set."""
monkeypatch.setenv("GIT_BACKUP_PATH", str(tmp_path))
result: Path | None = get_backup_path()
assert result == tmp_path
def test_get_backup_path_strips_whitespace(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""get_backup_path strips surrounding whitespace from the env var value."""
monkeypatch.setenv("GIT_BACKUP_PATH", f" {tmp_path} ")
result: Path | None = get_backup_path()
assert result == tmp_path
def test_get_backup_remote_unset(monkeypatch: pytest.MonkeyPatch) -> None:
"""get_backup_remote returns empty string when GIT_BACKUP_REMOTE is not set."""
monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
assert not get_backup_remote()
def test_get_backup_remote_set(monkeypatch: pytest.MonkeyPatch) -> None:
"""get_backup_remote returns the configured remote URL."""
monkeypatch.setenv("GIT_BACKUP_REMOTE", "git@github.com:user/repo.git")
assert get_backup_remote() == "git@github.com:user/repo.git"
@SKIP_IF_NO_GIT
def test_setup_backup_repo_creates_git_repo(tmp_path: Path) -> None:
"""setup_backup_repo initialises a git repo in a fresh directory."""
backup_path: Path = tmp_path / "backup"
result: bool = setup_backup_repo(backup_path)
assert result is True
assert (backup_path / ".git").exists()
@SKIP_IF_NO_GIT
def test_setup_backup_repo_idempotent(tmp_path: Path) -> None:
"""setup_backup_repo does not fail when called on an existing repo."""
backup_path: Path = tmp_path / "backup"
assert setup_backup_repo(backup_path) is True
assert setup_backup_repo(backup_path) is True
def test_setup_backup_repo_adds_origin_remote(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""setup_backup_repo adds remote 'origin' when GIT_BACKUP_REMOTE is set."""
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_REMOTE", "git@github.com:user/private.git")
with patch("discord_rss_bot.git_backup.subprocess.run") as mock_run:
# git config --local queries fail initially so setup writes defaults.
mock_run.side_effect = [
MagicMock(returncode=0), # git init
MagicMock(returncode=1), # config user.email read
MagicMock(returncode=0), # config user.email write
MagicMock(returncode=1), # config user.name read
MagicMock(returncode=0), # config user.name write
MagicMock(returncode=1), # remote get-url origin (missing)
MagicMock(returncode=0), # remote add origin <url>
]
assert setup_backup_repo(backup_path) is True
called_commands: list[list[str]] = [call.args[0] for call in mock_run.call_args_list]
assert ["remote", "add", "origin", "git@github.com:user/private.git"] in [
cmd[-4:] for cmd in called_commands if len(cmd) >= 4
]
def test_setup_backup_repo_updates_origin_remote(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""setup_backup_repo updates existing origin when URL differs."""
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_REMOTE", "git@github.com:user/new-private.git")
with patch("discord_rss_bot.git_backup.subprocess.run") as mock_run:
# Existing repo path: no git init call.
(backup_path / ".git").mkdir(parents=True)
mock_run.side_effect = [
MagicMock(returncode=0), # config user.email read
MagicMock(returncode=0), # config user.name read
MagicMock(returncode=0, stdout=b"git@github.com:user/old-private.git\n"), # remote get-url origin
MagicMock(returncode=0), # remote set-url origin <new>
]
assert setup_backup_repo(backup_path) is True
called_commands: list[list[str]] = [call.args[0] for call in mock_run.call_args_list]
assert ["remote", "set-url", "origin", "git@github.com:user/new-private.git"] in [
cmd[-4:] for cmd in called_commands if len(cmd) >= 4
]
def test_export_state_creates_state_json(tmp_path: Path) -> None:
"""export_state writes a valid state.json to the backup directory."""
mock_reader = MagicMock()
# Feeds
feed1 = MagicMock()
feed1.url = "https://example.com/feed.rss"
mock_reader.get_feeds.return_value = [feed1]
# Tag values: webhook present, everything else absent (returns None)
def get_tag_side_effect(
feed_or_key: tuple | str,
tag: str | None = None,
default: str | None = None,
) -> list[Any] | str | None:
if feed_or_key == () and tag is None:
# Called for global webhooks list
return []
if tag == "webhook":
return "https://discord.com/api/webhooks/123/abc"
return default
mock_reader.get_tag.side_effect = get_tag_side_effect
backup_path: Path = tmp_path / "backup"
backup_path.mkdir()
export_state(mock_reader, backup_path)
state_file: Path = backup_path / "state.json"
assert state_file.exists(), "state.json should be created by export_state"
data: dict[str, Any] = json.loads(state_file.read_text(encoding="utf-8"))
assert "feeds" in data
assert "webhooks" in data
assert data["feeds"][0]["url"] == "https://example.com/feed.rss"
assert data["feeds"][0]["webhook"] == "https://discord.com/api/webhooks/123/abc"
def test_export_state_omits_empty_tags(tmp_path: Path) -> None:
"""export_state does not include tags with empty-string or None values."""
mock_reader = MagicMock()
feed1 = MagicMock()
feed1.url = "https://example.com/feed.rss"
mock_reader.get_feeds.return_value = [feed1]
def get_tag_side_effect(
feed_or_key: tuple | str,
tag: str | None = None,
default: str | None = None,
) -> list[Any] | str | None:
if feed_or_key == ():
return []
# Return empty string for all tags
return default # default is None
mock_reader.get_tag.side_effect = get_tag_side_effect
backup_path: Path = tmp_path / "backup"
backup_path.mkdir()
export_state(mock_reader, backup_path)
data: dict[str, Any] = json.loads((backup_path / "state.json").read_text())
# Only "url" key should be present (no empty-value tags)
assert list(data["feeds"][0].keys()) == ["url"]
def test_commit_state_change_noop_when_not_configured(monkeypatch: pytest.MonkeyPatch) -> None:
"""commit_state_change does nothing when GIT_BACKUP_PATH is not set."""
monkeypatch.delenv("GIT_BACKUP_PATH", raising=False)
mock_reader = MagicMock()
# Should not raise and should not call reader methods for export
commit_state_change(mock_reader, "Add feed example.com/rss")
mock_reader.get_feeds.assert_not_called()
@SKIP_IF_NO_GIT
def test_commit_state_change_commits(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""commit_state_change creates a commit in the backup repo."""
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
mock_reader = MagicMock()
mock_reader.get_feeds.return_value = []
mock_reader.get_tag.return_value = []
commit_state_change(mock_reader, "Add feed https://example.com/rss")
# Verify a commit was created in the backup repo
git_executable: str | None = shutil.which("git")
assert git_executable is not None, "git executable not found"
result: subprocess.CompletedProcess[str] = subprocess.run( # noqa: S603
[git_executable, "-C", str(backup_path), "log", "--oneline"],
capture_output=True,
text=True,
check=False,
)
assert result.returncode == 0
assert "Add feed https://example.com/rss" in result.stdout
@SKIP_IF_NO_GIT
def test_commit_state_change_no_double_commit(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""commit_state_change does not create a commit when state has not changed."""
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
mock_reader = MagicMock()
mock_reader.get_feeds.return_value = []
mock_reader.get_tag.return_value = []
commit_state_change(mock_reader, "First commit")
commit_state_change(mock_reader, "Should not appear")
git_executable: str | None = shutil.which("git")
assert git_executable is not None, "git executable not found"
result: subprocess.CompletedProcess[str] = subprocess.run( # noqa: S603
[git_executable, "-C", str(backup_path), "log", "--oneline"],
capture_output=True,
text=True,
check=False,
)
assert result.returncode == 0
assert "First commit" in result.stdout
assert "Should not appear" not in result.stdout
def test_commit_state_change_push_when_remote_set(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""commit_state_change calls git push when GIT_BACKUP_REMOTE is configured."""
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.setenv("GIT_BACKUP_REMOTE", "git@github.com:user/private.git")
mock_reader = MagicMock()
mock_reader.get_feeds.return_value = []
mock_reader.get_tag.return_value = []
with patch("discord_rss_bot.git_backup.subprocess.run") as mock_run:
# Make all subprocess calls succeed
mock_run.return_value = MagicMock(returncode=1) # returncode=1 means staged changes exist
commit_state_change(mock_reader, "Add feed https://example.com/rss")
called_commands: list[list[str]] = [call.args[0] for call in mock_run.call_args_list]
push_calls: list[list[str]] = [cmd for cmd in called_commands if "push" in cmd]
assert push_calls, "git push should have been called when GIT_BACKUP_REMOTE is set"
assert any(cmd[-3:] == ["push", "origin", "HEAD"] for cmd in called_commands), (
"git push should target configured remote name 'origin'"
)
def test_commit_state_change_no_push_when_remote_unset(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""commit_state_change does not call git push when GIT_BACKUP_REMOTE is not set."""
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
mock_reader = MagicMock()
mock_reader.get_feeds.return_value = []
mock_reader.get_tag.return_value = []
with patch("discord_rss_bot.git_backup.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=1)
commit_state_change(mock_reader, "Add feed https://example.com/rss")
called_commands: list[list[str]] = [call.args[0] for call in mock_run.call_args_list]
push_calls: list[list[str]] = [cmd for cmd in called_commands if "push" in cmd]
assert not push_calls, "git push should NOT be called when GIT_BACKUP_REMOTE is not set"
@SKIP_IF_NO_GIT
def test_commit_state_change_e2e_push_to_bare_repo(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""End-to-end test: commit_state_change pushes to a real bare git repository."""
git_executable: str | None = shutil.which("git")
assert git_executable is not None, "git executable not found"
# Create a bare remote repository
bare_repo_path: Path = tmp_path / "remote.git"
subprocess.run([git_executable, "init", "--bare", str(bare_repo_path)], check=True, capture_output=True) # noqa: S603
# Configure backup with remote pointing to bare repo
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.setenv("GIT_BACKUP_REMOTE", str(bare_repo_path))
# Create mock reader with some state
mock_reader = MagicMock()
feed1 = MagicMock()
feed1.url = "https://example.com/feed.rss"
mock_reader.get_feeds.return_value = [feed1]
def get_tag_side_effect(
feed_or_key: tuple | str,
tag: str | None = None,
default: str | None = None,
) -> list[Any] | str | None:
if feed_or_key == ():
return []
if tag == "webhook":
return "https://discord.com/api/webhooks/123/abc"
return default
mock_reader.get_tag.side_effect = get_tag_side_effect
# Perform backup with commit and push
commit_state_change(mock_reader, "Initial backup")
# Verify commit exists in local backup repo
result: subprocess.CompletedProcess[str] = subprocess.run( # noqa: S603
[git_executable, "-C", str(backup_path), "log", "--oneline"],
capture_output=True,
text=True,
check=True,
)
assert "Initial backup" in result.stdout
# Verify origin remote is configured correctly
result = subprocess.run( # noqa: S603
[git_executable, "-C", str(backup_path), "remote", "get-url", "origin"],
capture_output=True,
text=True,
check=True,
)
assert result.stdout.strip() == str(bare_repo_path)
# Verify commit was pushed to the bare remote
result = subprocess.run( # noqa: S603
[git_executable, "-C", str(bare_repo_path), "log", "--oneline", "master"],
capture_output=True,
text=True,
check=True,
)
assert "Initial backup" in result.stdout
# Verify state.json content in the remote
result = subprocess.run( # noqa: S603
[git_executable, "-C", str(bare_repo_path), "show", "master:state.json"],
capture_output=True,
text=True,
check=True,
)
state_data: dict[str, Any] = json.loads(result.stdout)
assert state_data["feeds"][0]["url"] == "https://example.com/feed.rss"
assert state_data["feeds"][0]["webhook"] == "https://discord.com/api/webhooks/123/abc"
# Perform a second backup to verify subsequent pushes work
feed2 = MagicMock()
feed2.url = "https://another.com/feed.xml"
mock_reader.get_feeds.return_value = [feed1, feed2]
commit_state_change(mock_reader, "Add second feed")
# Verify both commits are in the remote
result = subprocess.run( # noqa: S603
[git_executable, "-C", str(bare_repo_path), "log", "--oneline", "master"],
capture_output=True,
text=True,
check=True,
)
assert "Initial backup" in result.stdout
assert "Add second feed" in result.stdout
# Integration tests for embed-related endpoint backups
client: TestClient = TestClient(app)
test_webhook_name: str = "Test Backup Webhook"
test_webhook_url: str = "https://discord.com/api/webhooks/999999999/testbackupwebhook"
test_feed_url: str = "https://lovinator.space/rss_test.xml"
def setup_test_feed() -> None:
"""Set up a test webhook and feed for endpoint tests."""
# Clean up existing test data
with contextlib.suppress(Exception):
client.post(url="/remove", data={"feed_url": test_feed_url})
with contextlib.suppress(Exception):
client.post(url="/delete_webhook", data={"webhook_url": test_webhook_url})
# Create webhook and feed
client.post(
url="/add_webhook",
data={"webhook_name": test_webhook_name, "webhook_url": test_webhook_url},
)
client.post(url="/add", data={"feed_url": test_feed_url, "webhook_dropdown": test_webhook_name})
def test_post_embed_triggers_backup(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""Posting to /embed should trigger a git backup with appropriate message."""
# Set up git backup
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
setup_test_feed()
with patch("discord_rss_bot.main.commit_state_change") as mock_commit:
response = client.post(
url="/embed",
data={
"feed_url": test_feed_url,
"title": "Custom Title",
"description": "Custom Description",
"color": "#FF5733",
},
)
assert response.status_code == 200, f"Failed to post embed: {response.text}"
mock_commit.assert_called_once()
# Verify the commit message contains the feed URL
call_args = mock_commit.call_args
assert call_args is not None
commit_message: str = call_args[0][1]
assert "Update embed settings" in commit_message
assert test_feed_url in commit_message
def test_post_use_embed_triggers_backup(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""Posting to /use_embed should trigger a git backup."""
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
setup_test_feed()
with patch("discord_rss_bot.main.commit_state_change") as mock_commit:
response = client.post(url="/use_embed", data={"feed_url": test_feed_url})
assert response.status_code == 200, f"Failed to enable embed: {response.text}"
mock_commit.assert_called_once()
# Verify the commit message
call_args = mock_commit.call_args
assert call_args is not None
commit_message: str = call_args[0][1]
assert "Enable embed mode" in commit_message
assert test_feed_url in commit_message
def test_post_use_text_triggers_backup(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""Posting to /use_text should trigger a git backup."""
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
setup_test_feed()
with patch("discord_rss_bot.main.commit_state_change") as mock_commit:
response = client.post(url="/use_text", data={"feed_url": test_feed_url})
assert response.status_code == 200, f"Failed to disable embed: {response.text}"
mock_commit.assert_called_once()
# Verify the commit message
call_args = mock_commit.call_args
assert call_args is not None
commit_message: str = call_args[0][1]
assert "Disable embed mode" in commit_message
assert test_feed_url in commit_message
def test_post_custom_message_triggers_backup(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""Posting to /custom should trigger a git backup."""
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
setup_test_feed()
with patch("discord_rss_bot.main.commit_state_change") as mock_commit:
response = client.post(
url="/custom",
data={
"feed_url": test_feed_url,
"custom_message": "Check out this entry: {entry.title}",
},
)
assert response.status_code == 200, f"Failed to set custom message: {response.text}"
mock_commit.assert_called_once()
# Verify the commit message
call_args = mock_commit.call_args
assert call_args is not None
commit_message: str = call_args[0][1]
assert "Update custom message" in commit_message
assert test_feed_url in commit_message
@SKIP_IF_NO_GIT
def test_embed_backup_end_to_end(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""End-to-end test: customizing embed creates a real commit in the backup repo."""
git_executable: str | None = shutil.which("git")
assert git_executable is not None, "git executable not found"
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
setup_test_feed()
# Post embed customization
response = client.post(
url="/embed",
data={
"feed_url": test_feed_url,
"title": "{entry.title}",
"description": "{entry.summary}",
"color": "#0099FF",
"image_url": "{entry.image}",
},
)
assert response.status_code == 200, f"Failed to customize embed: {response.text}"
# Verify a commit was created
result: subprocess.CompletedProcess[str] = subprocess.run( # noqa: S603
[git_executable, "-C", str(backup_path), "log", "--oneline"],
capture_output=True,
text=True,
check=False,
)
assert result.returncode == 0, f"Failed to read git log: {result.stderr}"
assert "Update embed settings" in result.stdout, f"Commit not found in log: {result.stdout}"
# Verify state.json contains embed data
state_file: Path = backup_path / "state.json"
assert state_file.exists(), "state.json should exist in backup repo"
state_data: dict[str, Any] = json.loads(state_file.read_text(encoding="utf-8"))
# Find our test feed in the state
test_feed_data = next((feed for feed in state_data["feeds"] if feed["url"] == test_feed_url), None)
assert test_feed_data is not None, f"Test feed not found in state.json: {state_data}"
# The embed settings are stored as a nested dict under custom_embed tag
# This verifies the embed customization was persisted
assert "webhook" in test_feed_data, "Feed should have webhook set"

View file

@ -1,6 +1,7 @@
from __future__ import annotations
from discord_rss_bot.filter.utils import is_regex_match, is_word_in_text
from discord_rss_bot.filter.utils import is_regex_match
from discord_rss_bot.filter.utils import is_word_in_text
def test_is_word_in_text() -> None:

View file

@ -4,9 +4,13 @@ import tempfile
from pathlib import Path
from typing import TYPE_CHECKING
from reader import Entry, Feed, Reader, make_reader
from reader import Entry
from reader import Feed
from reader import Reader
from reader import make_reader
from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent
from discord_rss_bot.filter.whitelist import has_white_tags
from discord_rss_bot.filter.whitelist import should_be_sent
if TYPE_CHECKING:
from collections.abc import Iterable