Compare commits

..

10 commits

Author SHA1 Message Date
24d4d7a293
Add support for changing the update interval for feeds
Some checks failed
Test and build Docker image / docker (push) Has been cancelled
2026-03-07 05:50:20 +01:00
567273678e
Only show backup button in navbar if backups are enabled 2026-03-07 05:50:20 +01:00
renovate[bot]
82bcd27cdc
chore(deps): update docker/setup-qemu-action action to v4 (#424)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2026-03-07 01:25:05 +00:00
renovate[bot]
3b034c15f5
chore(deps): update docker/metadata-action action to v6 (#426)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2026-03-07 01:04:31 +00:00
renovate[bot]
cae5619915
chore(deps): update docker/login-action action to v4 (#423)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2026-03-07 01:01:13 +00:00
renovate[bot]
8e5d3170d7
chore(deps): update docker/setup-buildx-action action to v4 (#425)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2026-03-07 00:51:31 +00:00
renovate[bot]
e21449c09e
chore(deps): update docker/build-push-action action to v7 (#427)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2026-03-07 00:03:33 +00:00
e8bd528def
Add git backup functionality
Fixes: https://github.com/TheLovinator1/discord-rss-bot/issues/421
Merges: https://github.com/TheLovinator1/discord-rss-bot/pull/422
2026-03-07 01:01:09 +01:00
9378dac0fa
Unescape HTML entities in summary and content before markdown conversion 2025-12-08 17:47:45 +01:00
renovate[bot]
86cbad98b0
chore(deps): update actions/checkout action to v6 (#419)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2025-11-20 17:38:56 +00:00
28 changed files with 1929 additions and 213 deletions

19
.env.example Normal file
View file

@ -0,0 +1,19 @@
# You can optionally store backups of your bot's configuration in a git repository.
# This allows you to track changes by subscribing to the repository or using a RSS feed.
# Local path for the backup git repository (e.g., /data/backup or /home/user/backups/discord-rss-bot)
# When set, the bot will initialize a git repo here and commit state.json after every configuration change
# GIT_BACKUP_PATH=
# Remote URL for pushing backup commits (e.g., git@github.com:username/private-config.git)
# Optional - only set if you want automatic pushes to a remote repository
# Leave empty to keep git history local only
# GIT_BACKUP_REMOTE=
# Sentry Configuration (Optional)
# Sentry DSN for error tracking and monitoring
# Leave empty to disable Sentry integration
# SENTRY_DSN=
# Testing Configuration
# Discord webhook URL used for testing (optional, only needed when running tests)
# TEST_WEBHOOK_URL=

View file

@ -17,7 +17,7 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
# GitHub Container Registry # GitHub Container Registry
- uses: docker/login-action@v3 - uses: docker/login-action@v4
if: github.event_name != 'pull_request' if: github.event_name != 'pull_request'
with: with:
registry: ghcr.io registry: ghcr.io
@ -25,18 +25,18 @@ jobs:
password: ${{ secrets.GITHUB_TOKEN }} password: ${{ secrets.GITHUB_TOKEN }}
# Download the latest commit from the master branch # Download the latest commit from the master branch
- uses: actions/checkout@v5 - uses: actions/checkout@v6
# Set up QEMU # Set up QEMU
- id: qemu - id: qemu
uses: docker/setup-qemu-action@v3 uses: docker/setup-qemu-action@v4
with: with:
image: tonistiigi/binfmt:master image: tonistiigi/binfmt:master
platforms: linux/amd64,linux/arm64 platforms: linux/amd64,linux/arm64
cache-image: false cache-image: false
# Set up Buildx so we can build multi-arch images # Set up Buildx so we can build multi-arch images
- uses: docker/setup-buildx-action@v3 - uses: docker/setup-buildx-action@v4
# Install the latest version of ruff # Install the latest version of ruff
- uses: astral-sh/ruff-action@v3 - uses: astral-sh/ruff-action@v3
@ -68,7 +68,7 @@ jobs:
# Extract metadata (tags, labels) from Git reference and GitHub events for Docker # Extract metadata (tags, labels) from Git reference and GitHub events for Docker
- id: meta - id: meta
uses: docker/metadata-action@v5 uses: docker/metadata-action@v6
env: env:
DOCKER_METADATA_ANNOTATIONS_LEVELS: manifest,index DOCKER_METADATA_ANNOTATIONS_LEVELS: manifest,index
with: with:
@ -79,7 +79,7 @@ jobs:
type=raw,value=master,enable=${{ github.ref == format('refs/heads/{0}', 'master') }} type=raw,value=master,enable=${{ github.ref == format('refs/heads/{0}', 'master') }}
# Build and push the Docker image # Build and push the Docker image
- uses: docker/build-push-action@v6 - uses: docker/build-push-action@v7
with: with:
context: . context: .
platforms: linux/amd64,linux/arm64 platforms: linux/amd64,linux/arm64

View file

@ -2,6 +2,10 @@
Subscribe to RSS feeds and get updates to a Discord webhook. Subscribe to RSS feeds and get updates to a Discord webhook.
Email: [tlovinator@gmail.com](mailto:tlovinator@gmail.com)
Discord: TheLovinator#9276
## Features ## Features
- Subscribe to RSS feeds and get updates to a Discord webhook. - Subscribe to RSS feeds and get updates to a Discord webhook.
@ -10,6 +14,7 @@ Subscribe to RSS feeds and get updates to a Discord webhook.
- Choose between Discord embed or plain text. - Choose between Discord embed or plain text.
- Regex filters for RSS feeds. - Regex filters for RSS feeds.
- Blacklist/whitelist words in the title/description/author/etc. - Blacklist/whitelist words in the title/description/author/etc.
- Set different update frequencies for each feed or use a global default.
- Gets extra information from APIs if available, currently for: - Gets extra information from APIs if available, currently for:
- [https://feeds.c3kay.de/](https://feeds.c3kay.de/) - [https://feeds.c3kay.de/](https://feeds.c3kay.de/)
- Genshin Impact News - Genshin Impact News
@ -58,8 +63,49 @@ or [install directly on your computer](#install-directly-on-your-computer).
- Use [Windows Task Scheduler](https://en.wikipedia.org/wiki/Windows_Task_Scheduler). - Use [Windows Task Scheduler](https://en.wikipedia.org/wiki/Windows_Task_Scheduler).
- Or add a shortcut to `%userprofile%\AppData\Roaming\Microsoft\Windows\Start Menu\Programs\Startup`. - Or add a shortcut to `%userprofile%\AppData\Roaming\Microsoft\Windows\Start Menu\Programs\Startup`.
## Contact ## Git Backup (State Version Control)
Email: [tlovinator@gmail.com](mailto:tlovinator@gmail.com) The bot can commit every configuration change (adding/removing feeds, webhook
changes, blacklist/whitelist updates) to a separate private Git repository so
you get a full, auditable history of state changes — similar to `etckeeper`.
Discord: TheLovinator#9276 ### Configuration
Set the following environment variables (e.g. in `docker-compose.yml` or a
`.env` file):
| Variable | Required | Description |
| ------------------- | -------- | ----------------------------------------------------------------------------------------------------------------------------------- |
| `GIT_BACKUP_PATH` | Yes | Local path where the backup git repository is stored. The bot will initialise it automatically if it does not yet exist. |
| `GIT_BACKUP_REMOTE` | No | Remote URL to push to after each commit (e.g. `git@github.com:you/private-config.git`). Leave unset to keep the history local only. |
### What is backed up
After every relevant change a `state.json` file is written and committed.
The file contains:
- All feed URLs together with their webhook URL, custom message, embed
settings, and any blacklist/whitelist filters.
- The global list of Discord webhooks.
### Docker example
```yaml
services:
discord-rss-bot:
image: ghcr.io/thelovinator1/discord-rss-bot:latest
volumes:
- ./data:/data
environment:
- GIT_BACKUP_PATH=/data/backup
- GIT_BACKUP_REMOTE=git@github.com:you/private-config.git
```
For SSH-based remotes mount your SSH key into the container and make sure the
host key is trusted, e.g.:
```yaml
volumes:
- ./data:/data
- ~/.ssh:/root/.ssh:ro
```

View file

@ -4,12 +4,15 @@ import urllib.parse
from functools import lru_cache from functools import lru_cache
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from discord_rss_bot.filter.blacklist import entry_should_be_skipped, feed_has_blacklist_tags from discord_rss_bot.filter.blacklist import entry_should_be_skipped
from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent from discord_rss_bot.filter.blacklist import feed_has_blacklist_tags
from discord_rss_bot.filter.whitelist import has_white_tags
from discord_rss_bot.filter.whitelist import should_be_sent
from discord_rss_bot.settings import get_reader from discord_rss_bot.settings import get_reader
if TYPE_CHECKING: if TYPE_CHECKING:
from reader import Entry, Reader from reader import Entry
from reader import Reader
# Our reader # Our reader
reader: Reader = get_reader() reader: Reader = get_reader()

View file

@ -1,12 +1,17 @@
from __future__ import annotations from __future__ import annotations
import html
import json import json
import logging import logging
from dataclasses import dataclass from dataclasses import dataclass
from bs4 import BeautifulSoup, Tag from bs4 import BeautifulSoup
from bs4 import Tag
from markdownify import markdownify from markdownify import markdownify
from reader import Entry, Feed, Reader, TagNotFoundError from reader import Entry
from reader import Feed
from reader import Reader
from reader import TagNotFoundError
from discord_rss_bot.is_url_valid import is_url_valid from discord_rss_bot.is_url_valid import is_url_valid
from discord_rss_bot.settings import get_reader from discord_rss_bot.settings import get_reader
@ -68,6 +73,10 @@ def replace_tags_in_text_message(entry: Entry) -> str:
first_image: str = get_first_image(summary, content) first_image: str = get_first_image(summary, content)
# Unescape HTML entities (e.g., &lt;h1&gt; becomes <h1>) before converting to markdown
summary = html.unescape(summary)
content = html.unescape(content)
summary = markdownify( summary = markdownify(
html=summary, html=summary,
strip=["img", "table", "td", "tr", "tbody", "thead"], strip=["img", "table", "td", "tr", "tbody", "thead"],
@ -199,6 +208,10 @@ def replace_tags_in_embed(feed: Feed, entry: Entry) -> CustomEmbed:
first_image: str = get_first_image(summary, content) first_image: str = get_first_image(summary, content)
# Unescape HTML entities (e.g., &lt;h1&gt; becomes <h1>) before converting to markdown
summary = html.unescape(summary)
content = html.unescape(content)
summary = markdownify( summary = markdownify(
html=summary, html=summary,
strip=["img", "table", "td", "tr", "tbody", "thead"], strip=["img", "table", "td", "tr", "tbody", "thead"],

View file

@ -5,42 +5,41 @@ import logging
import os import os
import pprint import pprint
import re import re
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING
from urllib.parse import ParseResult, urlparse from typing import Any
from urllib.parse import ParseResult
from urllib.parse import urlparse
import tldextract import tldextract
from discord_webhook import DiscordEmbed, DiscordWebhook from discord_webhook import DiscordEmbed
from discord_webhook import DiscordWebhook
from fastapi import HTTPException from fastapi import HTTPException
from markdownify import markdownify from markdownify import markdownify
from reader import ( from reader import Entry
Entry, from reader import EntryNotFoundError
EntryNotFoundError, from reader import Feed
Feed, from reader import FeedExistsError
FeedExistsError, from reader import FeedNotFoundError
FeedNotFoundError, from reader import Reader
Reader, from reader import ReaderError
ReaderError, from reader import StorageError
StorageError, from reader import TagNotFoundError
TagNotFoundError,
)
from discord_rss_bot.custom_message import ( from discord_rss_bot.custom_message import CustomEmbed
CustomEmbed, from discord_rss_bot.custom_message import get_custom_message
get_custom_message, from discord_rss_bot.custom_message import replace_tags_in_embed
replace_tags_in_embed, from discord_rss_bot.custom_message import replace_tags_in_text_message
replace_tags_in_text_message,
)
from discord_rss_bot.filter.blacklist import entry_should_be_skipped from discord_rss_bot.filter.blacklist import entry_should_be_skipped
from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent from discord_rss_bot.filter.whitelist import has_white_tags
from discord_rss_bot.hoyolab_api import ( from discord_rss_bot.filter.whitelist import should_be_sent
create_hoyolab_webhook, from discord_rss_bot.hoyolab_api import create_hoyolab_webhook
extract_post_id_from_hoyolab_url, from discord_rss_bot.hoyolab_api import extract_post_id_from_hoyolab_url
fetch_hoyolab_post, from discord_rss_bot.hoyolab_api import fetch_hoyolab_post
is_c3kay_feed, from discord_rss_bot.hoyolab_api import is_c3kay_feed
)
from discord_rss_bot.is_url_valid import is_url_valid from discord_rss_bot.is_url_valid import is_url_valid
from discord_rss_bot.missing_tags import add_missing_tags from discord_rss_bot.missing_tags import add_missing_tags
from discord_rss_bot.settings import default_custom_message, get_reader from discord_rss_bot.settings import default_custom_message
from discord_rss_bot.settings import get_reader
if TYPE_CHECKING: if TYPE_CHECKING:
from collections.abc import Iterable from collections.abc import Iterable
@ -99,7 +98,7 @@ def extract_domain(url: str) -> str: # noqa: PLR0911
return "Other" return "Other"
def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> str | None: # noqa: PLR0912 def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> str | None: # noqa: C901, PLR0912
"""Send a single entry to Discord. """Send a single entry to Discord.
Args: Args:
@ -241,7 +240,7 @@ def set_title(custom_embed: CustomEmbed, discord_embed: DiscordEmbed) -> None:
discord_embed.set_title(embed_title) if embed_title else None discord_embed.set_title(embed_title) if embed_title else None
def create_embed_webhook(webhook_url: str, entry: Entry) -> DiscordWebhook: def create_embed_webhook(webhook_url: str, entry: Entry) -> DiscordWebhook: # noqa: C901
"""Create a webhook with an embed. """Create a webhook with an embed.
Args: Args:
@ -342,7 +341,7 @@ def set_entry_as_read(reader: Reader, entry: Entry) -> None:
logger.exception("Error setting entry to read: %s", entry.id) logger.exception("Error setting entry to read: %s", entry.id)
def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: # noqa: PLR0912 def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: # noqa: C901, PLR0912
"""Send entries to Discord. """Send entries to Discord.
If response was not ok, we will log the error and mark the entry as unread, so it will be sent again next time. If response was not ok, we will log the error and mark the entry as unread, so it will be sent again next time.
@ -521,7 +520,7 @@ def truncate_webhook_message(webhook_message: str) -> str:
return webhook_message return webhook_message
def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None: def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None: # noqa: C901
"""Add a new feed, update it and mark every entry as read. """Add a new feed, update it and mark every entry as read.
Args: Args:

View file

@ -0,0 +1,252 @@
"""Git backup module for committing bot state changes to a private repository.
Configure the backup by setting these environment variables:
- ``GIT_BACKUP_PATH``: Local filesystem path for the backup git repository.
When set, the bot will initialise a git repo there (if one doesn't exist)
and commit an export of its state after every relevant change.
- ``GIT_BACKUP_REMOTE``: Optional remote URL (e.g. ``git@github.com:you/private-repo.git``).
When set, every commit is followed by a ``git push`` to this remote.
The exported state is written as ``state.json`` inside the backup repo. It
contains the list of feeds together with their webhook URL, filter settings
(blacklist / whitelist, regex variants), custom messages and embed settings.
Global webhooks are also included.
Example docker-compose snippet::
environment:
- GIT_BACKUP_PATH=/data/backup
- GIT_BACKUP_REMOTE=git@github.com:you/private-config.git
"""
from __future__ import annotations
import json
import logging
import os
import shutil
import subprocess # noqa: S404
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
from reader import TagNotFoundError
if TYPE_CHECKING:
from reader import Reader
logger: logging.Logger = logging.getLogger(__name__)
GIT_EXECUTABLE: str = shutil.which("git") or "git"
type TAG_VALUE = (
dict[str, str | int | float | bool | dict[str, Any] | list[Any] | None]
| list[str | int | float | bool | dict[str, Any] | list[Any] | None]
| None
)
"""Type alias for the value of a feed tag, which can be a nested structure of dicts and lists, or None."""
# Tags that are exported per-feed (empty values are omitted).
_FEED_TAGS: tuple[str, ...] = (
"webhook",
"custom_message",
"should_send_embed",
"embed",
"blacklist_title",
"blacklist_summary",
"blacklist_content",
"blacklist_author",
"regex_blacklist_title",
"regex_blacklist_summary",
"regex_blacklist_content",
"regex_blacklist_author",
"whitelist_title",
"whitelist_summary",
"whitelist_content",
"whitelist_author",
"regex_whitelist_title",
"regex_whitelist_summary",
"regex_whitelist_content",
"regex_whitelist_author",
".reader.update",
)
def get_backup_path() -> Path | None:
"""Return the configured backup path, or *None* if not configured.
Returns:
Path to the backup repository, or None if ``GIT_BACKUP_PATH`` is unset.
"""
raw: str = os.environ.get("GIT_BACKUP_PATH", "").strip()
return Path(raw) if raw else None
def get_backup_remote() -> str:
"""Return the configured remote URL, or an empty string if not set.
Returns:
The remote URL string from ``GIT_BACKUP_REMOTE``, or ``""`` if unset.
"""
return os.environ.get("GIT_BACKUP_REMOTE", "").strip()
def setup_backup_repo(backup_path: Path) -> bool:
"""Ensure the backup directory exists and contains a git repository.
If the directory does not yet contain a ``.git`` folder a new repository is
initialised. A basic git identity is configured locally so that commits
succeed even in environments where a global ``~/.gitconfig`` is absent.
Args:
backup_path: Local path for the backup repository.
Returns:
``True`` if the repository is ready, ``False`` on any error.
"""
try:
backup_path.mkdir(parents=True, exist_ok=True)
git_dir: Path = backup_path / ".git"
if not git_dir.exists():
subprocess.run([GIT_EXECUTABLE, "init", str(backup_path)], check=True, capture_output=True) # noqa: S603
logger.info("Initialised git backup repository at %s", backup_path)
# Ensure a local identity exists so that `git commit` always works.
for key, value in (("user.email", "discord-rss-bot@localhost"), ("user.name", "discord-rss-bot")):
result: subprocess.CompletedProcess[bytes] = subprocess.run( # noqa: S603
[GIT_EXECUTABLE, "-C", str(backup_path), "config", "--local", key],
check=False,
capture_output=True,
)
if result.returncode != 0:
subprocess.run( # noqa: S603
[GIT_EXECUTABLE, "-C", str(backup_path), "config", "--local", key, value],
check=True,
capture_output=True,
)
# Configure the remote if GIT_BACKUP_REMOTE is set.
remote_url: str = get_backup_remote()
if remote_url:
# Check if remote "origin" already exists.
check_remote: subprocess.CompletedProcess[bytes] = subprocess.run( # noqa: S603
[GIT_EXECUTABLE, "-C", str(backup_path), "remote", "get-url", "origin"],
check=False,
capture_output=True,
)
if check_remote.returncode != 0:
# Remote doesn't exist, add it.
subprocess.run( # noqa: S603
[GIT_EXECUTABLE, "-C", str(backup_path), "remote", "add", "origin", remote_url],
check=True,
capture_output=True,
)
logger.info("Added remote 'origin' with URL: %s", remote_url)
else:
# Remote exists, update it if the URL has changed.
current_url: str = check_remote.stdout.decode().strip()
if current_url != remote_url:
subprocess.run( # noqa: S603
[GIT_EXECUTABLE, "-C", str(backup_path), "remote", "set-url", "origin", remote_url],
check=True,
capture_output=True,
)
logger.info("Updated remote 'origin' URL from %s to %s", current_url, remote_url)
except Exception:
logger.exception("Failed to set up git backup repository at %s", backup_path)
return False
return True
def export_state(reader: Reader, backup_path: Path) -> None:
"""Serialise the current bot state to ``state.json`` inside *backup_path*.
Args:
reader: The :class:`reader.Reader` instance to read state from.
backup_path: Destination directory for the exported ``state.json``.
"""
feeds_state: list[dict] = []
for feed in reader.get_feeds():
feed_data: dict = {"url": feed.url}
for tag in _FEED_TAGS:
try:
value: TAG_VALUE = reader.get_tag(feed, tag, None)
if value is not None and value != "": # noqa: PLC1901
feed_data[tag] = value
except Exception:
logger.exception("Failed to read tag '%s' for feed '%s' during state export", tag, feed.url)
feeds_state.append(feed_data)
try:
webhooks: list[str | int | float | bool | dict[str, Any] | list[Any] | None] = list(
reader.get_tag((), "webhooks", [])
)
except TagNotFoundError:
webhooks = []
# Export global update interval if set
global_update_interval: dict[str, Any] | None = None
try:
global_update_config = reader.get_tag((), ".reader.update", None)
if isinstance(global_update_config, dict):
global_update_interval = global_update_config
except TagNotFoundError:
pass
state: dict = {"feeds": feeds_state, "webhooks": webhooks}
if global_update_interval is not None:
state["global_update_interval"] = global_update_interval
state_file: Path = backup_path / "state.json"
state_file.write_text(json.dumps(state, indent=2, default=str), encoding="utf-8")
def commit_state_change(reader: Reader, message: str) -> None:
"""Export current state and commit it to the backup repository.
This is a no-op when ``GIT_BACKUP_PATH`` is not configured. Errors are
logged but never raised so that a backup failure never interrupts normal
bot operation.
Args:
reader: The :class:`reader.Reader` instance to read state from.
message: Commit message describing the change (e.g. ``"Add feed example.com/rss.xml"``).
"""
backup_path: Path | None = get_backup_path()
if backup_path is None:
return
if not setup_backup_repo(backup_path):
return
try:
export_state(reader, backup_path)
subprocess.run([GIT_EXECUTABLE, "-C", str(backup_path), "add", "-A"], check=True, capture_output=True) # noqa: S603
# Only create a commit if there are staged changes.
diff_result: subprocess.CompletedProcess[bytes] = subprocess.run( # noqa: S603
[GIT_EXECUTABLE, "-C", str(backup_path), "diff", "--cached", "--exit-code"],
check=False,
capture_output=True,
)
if diff_result.returncode == 0:
logger.debug("No state changes to commit for: %s", message)
return
subprocess.run( # noqa: S603
[GIT_EXECUTABLE, "-C", str(backup_path), "commit", "-m", message],
check=True,
capture_output=True,
)
logger.info("Committed state change to backup repo: %s", message)
# Push to remote if configured.
if get_backup_remote():
subprocess.run( # noqa: S603
[GIT_EXECUTABLE, "-C", str(backup_path), "push", "origin", "HEAD"],
check=True,
capture_output=True,
)
logger.info("Pushed state change to remote 'origin': %s", message)
except Exception:
logger.exception("Failed to commit state change '%s' to backup repo", message)

View file

@ -4,10 +4,12 @@ import contextlib
import json import json
import logging import logging
import re import re
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING
from typing import Any
import requests import requests
from discord_webhook import DiscordEmbed, DiscordWebhook from discord_webhook import DiscordEmbed
from discord_webhook import DiscordWebhook
if TYPE_CHECKING: if TYPE_CHECKING:
from reader import Entry from reader import Entry

View file

@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
from urllib.parse import ParseResult, urlparse from urllib.parse import ParseResult
from urllib.parse import urlparse
def is_url_valid(url: str) -> bool: def is_url_valid(url: str) -> bool:

View file

@ -7,48 +7,62 @@ import typing
import urllib.parse import urllib.parse
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass from dataclasses import dataclass
from datetime import UTC, datetime from datetime import UTC
from datetime import datetime
from functools import lru_cache from functools import lru_cache
from typing import TYPE_CHECKING, Annotated, cast from typing import TYPE_CHECKING
from typing import Annotated
from typing import Any
from typing import cast
import httpx import httpx
import sentry_sdk import sentry_sdk
import uvicorn import uvicorn
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import FastAPI, Form, HTTPException, Request from fastapi import FastAPI
from fastapi import Form
from fastapi import HTTPException
from fastapi import Request
from fastapi.responses import HTMLResponse from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
from httpx import Response from httpx import Response
from markdownify import markdownify from markdownify import markdownify
from reader import Entry, EntryNotFoundError, Feed, FeedNotFoundError, Reader, TagNotFoundError from reader import Entry
from reader import EntryNotFoundError
from reader import Feed
from reader import FeedNotFoundError
from reader import Reader
from reader import TagNotFoundError
from starlette.responses import RedirectResponse from starlette.responses import RedirectResponse
from discord_rss_bot import settings from discord_rss_bot import settings
from discord_rss_bot.custom_filters import ( from discord_rss_bot.custom_filters import entry_is_blacklisted
entry_is_blacklisted, from discord_rss_bot.custom_filters import entry_is_whitelisted
entry_is_whitelisted, from discord_rss_bot.custom_message import CustomEmbed
) from discord_rss_bot.custom_message import get_custom_message
from discord_rss_bot.custom_message import ( from discord_rss_bot.custom_message import get_embed
CustomEmbed, from discord_rss_bot.custom_message import get_first_image
get_custom_message, from discord_rss_bot.custom_message import replace_tags_in_text_message
get_embed, from discord_rss_bot.custom_message import save_embed
get_first_image, from discord_rss_bot.feeds import create_feed
replace_tags_in_text_message, from discord_rss_bot.feeds import extract_domain
save_embed, from discord_rss_bot.feeds import send_entry_to_discord
) from discord_rss_bot.feeds import send_to_discord
from discord_rss_bot.feeds import create_feed, extract_domain, send_entry_to_discord, send_to_discord from discord_rss_bot.git_backup import commit_state_change
from discord_rss_bot.git_backup import get_backup_path
from discord_rss_bot.missing_tags import add_missing_tags from discord_rss_bot.missing_tags import add_missing_tags
from discord_rss_bot.search import create_search_context from discord_rss_bot.search import create_search_context
from discord_rss_bot.settings import get_reader from discord_rss_bot.settings import get_reader
if TYPE_CHECKING: if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Iterable from collections.abc import AsyncGenerator
from collections.abc import Iterable
from reader.types import JSONType from reader.types import JSONType
LOGGING_CONFIG = { LOGGING_CONFIG: dict[str, Any] = {
"version": 1, "version": 1,
"disable_existing_loggers": True, "disable_existing_loggers": True,
"formatters": { "formatters": {
@ -86,6 +100,46 @@ logging.config.dictConfig(LOGGING_CONFIG)
logger: logging.Logger = logging.getLogger(__name__) logger: logging.Logger = logging.getLogger(__name__)
reader: Reader = get_reader() reader: Reader = get_reader()
# Time constants for relative time formatting
SECONDS_PER_MINUTE = 60
SECONDS_PER_HOUR = 3600
SECONDS_PER_DAY = 86400
def relative_time(dt: datetime | None) -> str:
"""Convert a datetime to a relative time string (e.g., '2 hours ago', 'in 5 minutes').
Args:
dt: The datetime to convert (should be timezone-aware).
Returns:
A human-readable relative time string.
"""
if dt is None:
return "Never"
now = datetime.now(tz=UTC)
diff = dt - now
seconds = int(abs(diff.total_seconds()))
is_future = diff.total_seconds() > 0
# Determine the appropriate unit and value
if seconds < SECONDS_PER_MINUTE:
value = seconds
unit = "s"
elif seconds < SECONDS_PER_HOUR:
value = seconds // SECONDS_PER_MINUTE
unit = "m"
elif seconds < SECONDS_PER_DAY:
value = seconds // SECONDS_PER_HOUR
unit = "h"
else:
value = seconds // SECONDS_PER_DAY
unit = "d"
# Format based on future or past
return f"in {value}{unit}" if is_future else f"{value}{unit} ago"
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None]: async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
@ -117,6 +171,8 @@ templates.env.filters["encode_url"] = lambda url: urllib.parse.quote(url) if url
templates.env.filters["entry_is_whitelisted"] = entry_is_whitelisted templates.env.filters["entry_is_whitelisted"] = entry_is_whitelisted
templates.env.filters["entry_is_blacklisted"] = entry_is_blacklisted templates.env.filters["entry_is_blacklisted"] = entry_is_blacklisted
templates.env.filters["discord_markdown"] = markdownify templates.env.filters["discord_markdown"] = markdownify
templates.env.filters["relative_time"] = relative_time
templates.env.globals["get_backup_path"] = get_backup_path
@app.post("/add_webhook") @app.post("/add_webhook")
@ -130,11 +186,11 @@ async def post_add_webhook(
webhook_name: The name of the webhook. webhook_name: The name of the webhook.
webhook_url: The url of the webhook. webhook_url: The url of the webhook.
Raises:
HTTPException: If the webhook already exists.
Returns: Returns:
RedirectResponse: Redirect to the index page. RedirectResponse: Redirect to the index page.
Raises:
HTTPException: If the webhook already exists.
""" """
# Get current webhooks from the database if they exist otherwise use an empty list. # Get current webhooks from the database if they exist otherwise use an empty list.
webhooks = list(reader.get_tag((), "webhooks", [])) webhooks = list(reader.get_tag((), "webhooks", []))
@ -151,6 +207,8 @@ async def post_add_webhook(
reader.set_tag((), "webhooks", webhooks) # pyright: ignore[reportArgumentType] reader.set_tag((), "webhooks", webhooks) # pyright: ignore[reportArgumentType]
commit_state_change(reader, f"Add webhook {webhook_name.strip()}")
return RedirectResponse(url="/", status_code=303) return RedirectResponse(url="/", status_code=303)
# TODO(TheLovinator): Show this error on the page. # TODO(TheLovinator): Show this error on the page.
@ -165,11 +223,12 @@ async def post_delete_webhook(webhook_url: Annotated[str, Form()]) -> RedirectRe
Args: Args:
webhook_url: The url of the webhook. webhook_url: The url of the webhook.
Returns:
RedirectResponse: Redirect to the index page.
Raises: Raises:
HTTPException: If the webhook could not be deleted HTTPException: If the webhook could not be deleted
Returns:
RedirectResponse: Redirect to the index page.
""" """
# TODO(TheLovinator): Check if the webhook is in use by any feeds before deleting it. # TODO(TheLovinator): Check if the webhook is in use by any feeds before deleting it.
# TODO(TheLovinator): Replace HTTPException with a custom exception for both of these. # TODO(TheLovinator): Replace HTTPException with a custom exception for both of these.
@ -196,6 +255,8 @@ async def post_delete_webhook(webhook_url: Annotated[str, Form()]) -> RedirectRe
# Add our new list of webhooks to the database. # Add our new list of webhooks to the database.
reader.set_tag((), "webhooks", webhooks) # pyright: ignore[reportArgumentType] reader.set_tag((), "webhooks", webhooks) # pyright: ignore[reportArgumentType]
commit_state_change(reader, f"Delete webhook {webhook_url.strip()}")
return RedirectResponse(url="/", status_code=303) return RedirectResponse(url="/", status_code=303)
@ -215,6 +276,7 @@ async def post_create_feed(
""" """
clean_feed_url: str = feed_url.strip() clean_feed_url: str = feed_url.strip()
create_feed(reader, feed_url, webhook_dropdown) create_feed(reader, feed_url, webhook_dropdown)
commit_state_change(reader, f"Add feed {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@ -286,6 +348,8 @@ async def post_set_whitelist(
reader.set_tag(clean_feed_url, "regex_whitelist_content", regex_whitelist_content) # pyright: ignore[reportArgumentType][call-overload] reader.set_tag(clean_feed_url, "regex_whitelist_content", regex_whitelist_content) # pyright: ignore[reportArgumentType][call-overload]
reader.set_tag(clean_feed_url, "regex_whitelist_author", regex_whitelist_author) # pyright: ignore[reportArgumentType][call-overload] reader.set_tag(clean_feed_url, "regex_whitelist_author", regex_whitelist_author) # pyright: ignore[reportArgumentType][call-overload]
commit_state_change(reader, f"Update whitelist for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@ -367,6 +431,7 @@ async def post_set_blacklist(
reader.set_tag(clean_feed_url, "regex_blacklist_summary", regex_blacklist_summary) # pyright: ignore[reportArgumentType][call-overload] reader.set_tag(clean_feed_url, "regex_blacklist_summary", regex_blacklist_summary) # pyright: ignore[reportArgumentType][call-overload]
reader.set_tag(clean_feed_url, "regex_blacklist_content", regex_blacklist_content) # pyright: ignore[reportArgumentType][call-overload] reader.set_tag(clean_feed_url, "regex_blacklist_content", regex_blacklist_content) # pyright: ignore[reportArgumentType][call-overload]
reader.set_tag(clean_feed_url, "regex_blacklist_author", regex_blacklist_author) # pyright: ignore[reportArgumentType][call-overload] reader.set_tag(clean_feed_url, "regex_blacklist_author", regex_blacklist_author) # pyright: ignore[reportArgumentType][call-overload]
commit_state_change(reader, f"Update blacklist for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@ -433,6 +498,7 @@ async def post_set_custom(
reader.set_tag(feed_url, "custom_message", default_custom_message) reader.set_tag(feed_url, "custom_message", default_custom_message)
clean_feed_url: str = feed_url.strip() clean_feed_url: str = feed_url.strip()
commit_state_change(reader, f"Update custom message for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@ -552,6 +618,7 @@ async def post_embed(
# Save the data. # Save the data.
save_embed(reader, feed, custom_embed) save_embed(reader, feed, custom_embed)
commit_state_change(reader, f"Update embed settings for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@ -567,6 +634,7 @@ async def post_use_embed(feed_url: Annotated[str, Form()]) -> RedirectResponse:
""" """
clean_feed_url: str = feed_url.strip() clean_feed_url: str = feed_url.strip()
reader.set_tag(clean_feed_url, "should_send_embed", True) # pyright: ignore[reportArgumentType] reader.set_tag(clean_feed_url, "should_send_embed", True) # pyright: ignore[reportArgumentType]
commit_state_change(reader, f"Enable embed mode for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@ -582,9 +650,106 @@ async def post_use_text(feed_url: Annotated[str, Form()]) -> RedirectResponse:
""" """
clean_feed_url: str = feed_url.strip() clean_feed_url: str = feed_url.strip()
reader.set_tag(clean_feed_url, "should_send_embed", False) # pyright: ignore[reportArgumentType] reader.set_tag(clean_feed_url, "should_send_embed", False) # pyright: ignore[reportArgumentType]
commit_state_change(reader, f"Disable embed mode for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.post("/set_update_interval")
async def post_set_update_interval(
feed_url: Annotated[str, Form()],
interval_minutes: Annotated[int | None, Form()] = None,
redirect_to: Annotated[str, Form()] = "",
) -> RedirectResponse:
"""Set the update interval for a feed.
Args:
feed_url: The feed to change.
interval_minutes: The update interval in minutes (None to reset to global default).
redirect_to: Optional redirect URL (defaults to feed page).
Returns:
RedirectResponse: Redirect to the specified page or feed page.
"""
clean_feed_url: str = feed_url.strip()
# If no interval specified, reset to global default
if interval_minutes is None:
try:
reader.delete_tag(clean_feed_url, ".reader.update")
commit_state_change(reader, f"Reset update interval to default for {clean_feed_url}")
except TagNotFoundError:
pass
else:
# Validate interval (minimum 1 minute, no maximum)
interval_minutes = max(interval_minutes, 1)
reader.set_tag(clean_feed_url, ".reader.update", {"interval": interval_minutes}) # pyright: ignore[reportArgumentType]
commit_state_change(reader, f"Set update interval to {interval_minutes} minutes for {clean_feed_url}")
# Update the feed immediately to recalculate update_after with the new interval
try:
reader.update_feed(clean_feed_url)
logger.info("Updated feed after interval change: %s", clean_feed_url)
except Exception:
logger.exception("Failed to update feed after interval change: %s", clean_feed_url)
if redirect_to:
return RedirectResponse(url=redirect_to, status_code=303)
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.post("/reset_update_interval")
async def post_reset_update_interval(
feed_url: Annotated[str, Form()],
redirect_to: Annotated[str, Form()] = "",
) -> RedirectResponse:
"""Reset the update interval for a feed to use the global default.
Args:
feed_url: The feed to change.
redirect_to: Optional redirect URL (defaults to feed page).
Returns:
RedirectResponse: Redirect to the specified page or feed page.
"""
clean_feed_url: str = feed_url.strip()
try:
reader.delete_tag(clean_feed_url, ".reader.update")
commit_state_change(reader, f"Reset update interval to default for {clean_feed_url}")
except TagNotFoundError:
# Tag doesn't exist, which is fine
pass
# Update the feed immediately to recalculate update_after with the new interval
try:
reader.update_feed(clean_feed_url)
logger.info("Updated feed after interval reset: %s", clean_feed_url)
except Exception:
logger.exception("Failed to update feed after interval reset: %s", clean_feed_url)
if redirect_to:
return RedirectResponse(url=redirect_to, status_code=303)
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.post("/set_global_update_interval")
async def post_set_global_update_interval(interval_minutes: Annotated[int, Form()]) -> RedirectResponse:
"""Set the global default update interval.
Args:
interval_minutes: The update interval in minutes.
Returns:
RedirectResponse: Redirect to the settings page.
"""
# Validate interval (minimum 1 minute, no maximum)
interval_minutes = max(interval_minutes, 1)
reader.set_tag((), ".reader.update", {"interval": interval_minutes}) # pyright: ignore[reportArgumentType]
commit_state_change(reader, f"Set global update interval to {interval_minutes} minutes")
return RedirectResponse(url="/settings", status_code=303)
@app.get("/add", response_class=HTMLResponse) @app.get("/add", response_class=HTMLResponse)
def get_add(request: Request): def get_add(request: Request):
"""Page for adding a new feed. """Page for adding a new feed.
@ -603,7 +768,7 @@ def get_add(request: Request):
@app.get("/feed", response_class=HTMLResponse) @app.get("/feed", response_class=HTMLResponse)
async def get_feed(feed_url: str, request: Request, starting_after: str = ""): async def get_feed(feed_url: str, request: Request, starting_after: str = ""): # noqa: C901, PLR0912, PLR0914, PLR0915
"""Get a feed by URL. """Get a feed by URL.
Args: Args:
@ -611,11 +776,11 @@ async def get_feed(feed_url: str, request: Request, starting_after: str = ""):
request: The request object. request: The request object.
starting_after: The entry to start after. Used for pagination. starting_after: The entry to start after. Used for pagination.
Raises:
HTTPException: If the feed is not found.
Returns: Returns:
HTMLResponse: The feed page. HTMLResponse: The feed page.
Raises:
HTTPException: If the feed is not found.
""" """
entries_per_page: int = 20 entries_per_page: int = 20
@ -628,7 +793,7 @@ async def get_feed(feed_url: str, request: Request, starting_after: str = ""):
# Only show button if more than 10 entries. # Only show button if more than 10 entries.
total_entries: int = reader.get_entry_counts(feed=feed).total or 0 total_entries: int = reader.get_entry_counts(feed=feed).total or 0
show_more_entires_button: bool = total_entries > entries_per_page is_show_more_entries_button_visible: bool = total_entries > entries_per_page
# Get entries from the feed. # Get entries from the feed.
if starting_after: if starting_after:
@ -641,6 +806,27 @@ async def get_feed(feed_url: str, request: Request, starting_after: str = ""):
msg: str = f"{e}\n\n{[entry.id for entry in current_entries]}" msg: str = f"{e}\n\n{[entry.id for entry in current_entries]}"
html: str = create_html_for_feed(current_entries) html: str = create_html_for_feed(current_entries)
# Get feed and global intervals for error case too
feed_interval: int | None = None
try:
feed_update_config = reader.get_tag(feed, ".reader.update")
if isinstance(feed_update_config, dict) and "interval" in feed_update_config:
interval_value = feed_update_config["interval"]
if isinstance(interval_value, int):
feed_interval = interval_value
except TagNotFoundError:
pass
global_interval: int = 60
try:
global_update_config = reader.get_tag((), ".reader.update")
if isinstance(global_update_config, dict) and "interval" in global_update_config:
interval_value = global_update_config["interval"]
if isinstance(interval_value, int):
global_interval = interval_value
except TagNotFoundError:
pass
context = { context = {
"request": request, "request": request,
"feed": feed, "feed": feed,
@ -650,8 +836,10 @@ async def get_feed(feed_url: str, request: Request, starting_after: str = ""):
"should_send_embed": False, "should_send_embed": False,
"last_entry": None, "last_entry": None,
"messages": msg, "messages": msg,
"show_more_entires_button": show_more_entires_button, "is_show_more_entries_button_visible": is_show_more_entries_button_visible,
"total_entries": total_entries, "total_entries": total_entries,
"feed_interval": feed_interval,
"global_interval": global_interval,
} }
return templates.TemplateResponse(request=request, name="feed.html", context=context) return templates.TemplateResponse(request=request, name="feed.html", context=context)
@ -680,6 +868,29 @@ async def get_feed(feed_url: str, request: Request, starting_after: str = ""):
add_missing_tags(reader) add_missing_tags(reader)
should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed")) should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed"))
# Get the update interval for this feed
feed_interval: int | None = None
try:
feed_update_config = reader.get_tag(feed, ".reader.update")
if isinstance(feed_update_config, dict) and "interval" in feed_update_config:
interval_value = feed_update_config["interval"]
if isinstance(interval_value, int):
feed_interval = interval_value
except TagNotFoundError:
# No custom interval set for this feed, will use global default
pass
# Get the global default update interval
global_interval: int = 60 # Default to 60 minutes if not set
try:
global_update_config = reader.get_tag((), ".reader.update")
if isinstance(global_update_config, dict) and "interval" in global_update_config:
interval_value = global_update_config["interval"]
if isinstance(interval_value, int):
global_interval = interval_value
except TagNotFoundError:
pass
context = { context = {
"request": request, "request": request,
"feed": feed, "feed": feed,
@ -688,8 +899,10 @@ async def get_feed(feed_url: str, request: Request, starting_after: str = ""):
"html": html, "html": html,
"should_send_embed": should_send_embed, "should_send_embed": should_send_embed,
"last_entry": last_entry, "last_entry": last_entry,
"show_more_entires_button": show_more_entires_button, "is_show_more_entries_button_visible": is_show_more_entries_button_visible,
"total_entries": total_entries, "total_entries": total_entries,
"feed_interval": feed_interval,
"global_interval": global_interval,
} }
return templates.TemplateResponse(request=request, name="feed.html", context=context) return templates.TemplateResponse(request=request, name="feed.html", context=context)
@ -819,6 +1032,56 @@ def get_data_from_hook_url(hook_name: str, hook_url: str) -> WebhookInfo:
return our_hook return our_hook
@app.get("/settings", response_class=HTMLResponse)
async def get_settings(request: Request):
"""Settings page.
Args:
request: The request object.
Returns:
HTMLResponse: The settings page.
"""
# Get the global default update interval
global_interval: int = 60 # Default to 60 minutes if not set
try:
global_update_config = reader.get_tag((), ".reader.update")
if isinstance(global_update_config, dict) and "interval" in global_update_config:
interval_value = global_update_config["interval"]
if isinstance(interval_value, int):
global_interval = interval_value
except TagNotFoundError:
pass
# Get all feeds with their intervals
feeds: Iterable[Feed] = reader.get_feeds()
feed_intervals = []
for feed in feeds:
feed_interval: int | None = None
try:
feed_update_config = reader.get_tag(feed, ".reader.update")
if isinstance(feed_update_config, dict) and "interval" in feed_update_config:
interval_value = feed_update_config["interval"]
if isinstance(interval_value, int):
feed_interval = interval_value
except TagNotFoundError:
pass
feed_intervals.append({
"feed": feed,
"interval": feed_interval,
"effective_interval": feed_interval or global_interval,
"domain": extract_domain(feed.url),
})
context = {
"request": request,
"global_interval": global_interval,
"feed_intervals": feed_intervals,
}
return templates.TemplateResponse(request=request, name="settings.html", context=context)
@app.get("/webhooks", response_class=HTMLResponse) @app.get("/webhooks", response_class=HTMLResponse)
async def get_webhooks(request: Request): async def get_webhooks(request: Request):
"""Page for adding a new webhook. """Page for adding a new webhook.
@ -845,23 +1108,25 @@ async def get_webhooks(request: Request):
@app.get("/", response_class=HTMLResponse) @app.get("/", response_class=HTMLResponse)
def get_index(request: Request): def get_index(request: Request, message: str = ""):
"""This is the root of the website. """This is the root of the website.
Args: Args:
request: The request object. request: The request object.
message: Optional message to display to the user.
Returns: Returns:
HTMLResponse: The index page. HTMLResponse: The index page.
""" """
return templates.TemplateResponse(request=request, name="index.html", context=make_context_index(request)) return templates.TemplateResponse(request=request, name="index.html", context=make_context_index(request, message))
def make_context_index(request: Request): def make_context_index(request: Request, message: str = ""):
"""Create the needed context for the index page. """Create the needed context for the index page.
Args: Args:
request: The request object. request: The request object.
message: Optional message to display to the user.
Returns: Returns:
dict: The context for the index page. dict: The context for the index page.
@ -894,6 +1159,7 @@ def make_context_index(request: Request):
"webhooks": hooks, "webhooks": hooks,
"broken_feeds": broken_feeds, "broken_feeds": broken_feeds,
"feeds_without_attached_webhook": feeds_without_attached_webhook, "feeds_without_attached_webhook": feeds_without_attached_webhook,
"messages": message or None,
} }
@ -904,17 +1170,20 @@ async def remove_feed(feed_url: Annotated[str, Form()]):
Args: Args:
feed_url: The feed to add. feed_url: The feed to add.
Raises:
HTTPException: Feed not found
Returns: Returns:
RedirectResponse: Redirect to the index page. RedirectResponse: Redirect to the index page.
Raises:
HTTPException: Feed not found
""" """
try: try:
reader.delete_feed(urllib.parse.unquote(feed_url)) reader.delete_feed(urllib.parse.unquote(feed_url))
except FeedNotFoundError as e: except FeedNotFoundError as e:
raise HTTPException(status_code=404, detail="Feed not found") from e raise HTTPException(status_code=404, detail="Feed not found") from e
commit_state_change(reader, f"Remove feed {urllib.parse.unquote(feed_url)}")
return RedirectResponse(url="/", status_code=303) return RedirectResponse(url="/", status_code=303)
@ -926,11 +1195,12 @@ async def update_feed(request: Request, feed_url: str):
request: The request object. request: The request object.
feed_url: The feed URL to update. feed_url: The feed URL to update.
Raises:
HTTPException: If the feed is not found.
Returns: Returns:
RedirectResponse: Redirect to the feed page. RedirectResponse: Redirect to the feed page.
Raises:
HTTPException: If the feed is not found.
""" """
try: try:
reader.update_feed(urllib.parse.unquote(feed_url)) reader.update_feed(urllib.parse.unquote(feed_url))
@ -941,6 +1211,33 @@ async def update_feed(request: Request, feed_url: str):
return RedirectResponse(url="/feed?feed_url=" + urllib.parse.quote(feed_url), status_code=303) return RedirectResponse(url="/feed?feed_url=" + urllib.parse.quote(feed_url), status_code=303)
@app.post("/backup")
async def manual_backup(request: Request) -> RedirectResponse:
"""Manually trigger a git backup of the current state.
Args:
request: The request object.
Returns:
RedirectResponse: Redirect to the index page with a success or error message.
"""
backup_path = get_backup_path()
if backup_path is None:
message = "Git backup is not configured. Set GIT_BACKUP_PATH environment variable to enable backups."
logger.warning("Manual git backup attempted but GIT_BACKUP_PATH is not configured")
return RedirectResponse(url=f"/?message={urllib.parse.quote(message)}", status_code=303)
try:
commit_state_change(reader, "Manual backup triggered from web UI")
message = "Successfully created git backup!"
logger.info("Manual git backup completed successfully")
except Exception as e:
message = f"Failed to create git backup: {e}"
logger.exception("Manual git backup failed")
return RedirectResponse(url=f"/?message={urllib.parse.quote(message)}", status_code=303)
@app.get("/search", response_class=HTMLResponse) @app.get("/search", response_class=HTMLResponse)
async def search(request: Request, query: str): async def search(request: Request, query: str):
"""Get entries matching a full-text search query. """Get entries matching a full-text search query.
@ -988,11 +1285,12 @@ def modify_webhook(old_hook: Annotated[str, Form()], new_hook: Annotated[str, Fo
old_hook: The webhook to modify. old_hook: The webhook to modify.
new_hook: The new webhook. new_hook: The new webhook.
Returns:
RedirectResponse: Redirect to the webhook page.
Raises: Raises:
HTTPException: Webhook could not be modified. HTTPException: Webhook could not be modified.
Returns:
RedirectResponse: Redirect to the webhook page.
""" """
# Get current webhooks from the database if they exist otherwise use an empty list. # Get current webhooks from the database if they exist otherwise use an empty list.
webhooks = list(reader.get_tag((), "webhooks", [])) webhooks = list(reader.get_tag((), "webhooks", []))
@ -1042,11 +1340,11 @@ def extract_youtube_video_id(url: str) -> str | None:
# Handle standard YouTube URLs (youtube.com/watch?v=VIDEO_ID) # Handle standard YouTube URLs (youtube.com/watch?v=VIDEO_ID)
if "youtube.com/watch" in url and "v=" in url: if "youtube.com/watch" in url and "v=" in url:
return url.split("v=")[1].split("&")[0] return url.split("v=")[1].split("&", maxsplit=1)[0]
# Handle shortened YouTube URLs (youtu.be/VIDEO_ID) # Handle shortened YouTube URLs (youtu.be/VIDEO_ID)
if "youtu.be/" in url: if "youtu.be/" in url:
return url.split("youtu.be/")[1].split("?")[0] return url.split("youtu.be/")[1].split("?", maxsplit=1)[0]
return None return None

View file

@ -1,8 +1,11 @@
from __future__ import annotations from __future__ import annotations
from reader import Feed, Reader, TagNotFoundError from reader import Feed
from reader import Reader
from reader import TagNotFoundError
from discord_rss_bot.settings import default_custom_embed, default_custom_message from discord_rss_bot.settings import default_custom_embed
from discord_rss_bot.settings import default_custom_message
def add_custom_message(reader: Reader, feed: Feed) -> None: def add_custom_message(reader: Reader, feed: Feed) -> None:

View file

@ -8,7 +8,10 @@ from discord_rss_bot.settings import get_reader
if TYPE_CHECKING: if TYPE_CHECKING:
from collections.abc import Iterable from collections.abc import Iterable
from reader import EntrySearchResult, Feed, HighlightedString, Reader from reader import EntrySearchResult
from reader import Feed
from reader import HighlightedString
from reader import Reader
def create_search_context(query: str, custom_reader: Reader | None = None) -> dict: def create_search_context(query: str, custom_reader: Reader | None = None) -> dict:

View file

@ -5,7 +5,9 @@ from functools import lru_cache
from pathlib import Path from pathlib import Path
from platformdirs import user_data_dir from platformdirs import user_data_dir
from reader import Reader, make_reader from reader import Reader
from reader import TagNotFoundError
from reader import make_reader
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from reader.types import JSONType from reader.types import JSONType
@ -38,7 +40,12 @@ def get_reader(custom_location: Path | None = None) -> Reader:
reader: Reader = make_reader(url=str(db_location)) reader: Reader = make_reader(url=str(db_location))
# https://reader.readthedocs.io/en/latest/api.html#reader.types.UpdateConfig # https://reader.readthedocs.io/en/latest/api.html#reader.types.UpdateConfig
# Set the update interval to 15 minutes # Set the default update interval to 15 minutes if not already configured
# Users can change this via the Settings page or per-feed in the feed page
try:
reader.get_tag((), ".reader.update")
except TagNotFoundError:
# Set default
reader.set_tag((), ".reader.update", {"interval": 15}) reader.set_tag((), ".reader.update", {"interval": 15})
return reader return reader

View file

@ -13,3 +13,7 @@ body {
.form-text { .form-text {
color: #acabab; color: #acabab;
} }
.interval-input {
max-width: 120px;
}

View file

@ -8,83 +8,138 @@
<h2> <h2>
<a class="text-muted" href="{{ feed.url }}">{{ feed.title }}</a> ({{ total_entries }} entries) <a class="text-muted" href="{{ feed.url }}">{{ feed.title }}</a> ({{ total_entries }} entries)
</h2> </h2>
{% if not feed.updates_enabled %} {% if not feed.updates_enabled %}<span class="badge bg-danger">Disabled</span>{% endif %}
<span class="badge bg-danger">Disabled</span>
{% endif %}
{% if feed.last_exception %} {% if feed.last_exception %}
<div class="mt-3"> <div class="mt-3">
<h5 class="text-danger">{{ feed.last_exception.type_name }}:</h5> <h5 class="text-danger">{{ feed.last_exception.type_name }}:</h5>
<code class="d-block">{{ feed.last_exception.value_str }}</code> <code class="d-block">{{ feed.last_exception.value_str }}</code>
<button class="btn btn-secondary btn-sm mt-2" type="button" data-bs-toggle="collapse" <button class="btn btn-secondary btn-sm mt-2"
data-bs-target="#exceptionDetails" aria-expanded="false" aria-controls="exceptionDetails"> type="button"
Show Traceback data-bs-toggle="collapse"
</button> data-bs-target="#exceptionDetails"
aria-expanded="false"
aria-controls="exceptionDetails">Show Traceback</button>
<div class="collapse" id="exceptionDetails"> <div class="collapse" id="exceptionDetails">
<pre><code>{{ feed.last_exception.traceback_str }}</code></pre> <pre><code>{{ feed.last_exception.traceback_str }}</code></pre>
</div> </div>
</div> </div>
{% endif %} {% endif %}
<!-- Feed Actions --> <!-- Feed Actions -->
<div class="mt-3 d-flex flex-wrap gap-2"> <div class="mt-3 d-flex flex-wrap gap-2">
<a href="/update?feed_url={{ feed.url|encode_url }}" class="btn btn-primary btn-sm">Update</a> <a href="/update?feed_url={{ feed.url|encode_url }}"
class="btn btn-primary btn-sm">Update</a>
<form action="/remove" method="post" class="d-inline"> <form action="/remove" method="post" class="d-inline">
<button class="btn btn-danger btn-sm" name="feed_url" value="{{ feed.url }}" <button class="btn btn-danger btn-sm"
name="feed_url"
value="{{ feed.url }}"
onclick="return confirm('Are you sure you want to delete this feed?')">Remove</button> onclick="return confirm('Are you sure you want to delete this feed?')">Remove</button>
</form> </form>
{% if not feed.updates_enabled %} {% if not feed.updates_enabled %}
<form action="/unpause" method="post" class="d-inline"> <form action="/unpause" method="post" class="d-inline">
<button class="btn btn-secondary btn-sm" name="feed_url" value="{{ feed.url }}">Unpause</button> <button class="btn btn-secondary btn-sm"
name="feed_url"
value="{{ feed.url }}">Unpause</button>
</form> </form>
{% else %} {% else %}
<form action="/pause" method="post" class="d-inline"> <form action="/pause" method="post" class="d-inline">
<button class="btn btn-danger btn-sm" name="feed_url" value="{{ feed.url }}">Pause</button> <button class="btn btn-danger btn-sm" name="feed_url" value="{{ feed.url }}">Pause</button>
</form> </form>
{% endif %} {% endif %}
{% if not "youtube.com/feeds/videos.xml" in feed.url %} {% if not "youtube.com/feeds/videos.xml" in feed.url %}
{% if should_send_embed %} {% if should_send_embed %}
<form action="/use_text" method="post" class="d-inline"> <form action="/use_text" method="post" class="d-inline">
<button class="btn btn-dark btn-sm" name="feed_url" value="{{ feed.url }}"> <button class="btn btn-dark btn-sm" name="feed_url" value="{{ feed.url }}">Send text message instead of embed</button>
Send text message instead of embed
</button>
</form> </form>
{% else %} {% else %}
<form action="/use_embed" method="post" class="d-inline"> <form action="/use_embed" method="post" class="d-inline">
<button class="btn btn-dark btn-sm" name="feed_url" value="{{ feed.url }}"> <button class="btn btn-dark btn-sm" name="feed_url" value="{{ feed.url }}">Send embed instead of text message</button>
Send embed instead of text message
</button>
</form> </form>
{% endif %} {% endif %}
{% endif %} {% endif %}
</div> </div>
<!-- Additional Links --> <!-- Additional Links -->
<div class="mt-3"> <div class="mt-3">
<a class="text-muted d-block" href="/whitelist?feed_url={{ feed.url|encode_url }}">Whitelist</a> <a class="text-muted d-block"
<a class="text-muted d-block" href="/blacklist?feed_url={{ feed.url|encode_url }}">Blacklist</a> href="/whitelist?feed_url={{ feed.url|encode_url }}">Whitelist</a>
<a class="text-muted d-block" href="/custom?feed_url={{ feed.url|encode_url }}"> <a class="text-muted d-block"
Customize message {% if not should_send_embed %}(Currently active){% endif %} href="/blacklist?feed_url={{ feed.url|encode_url }}">Blacklist</a>
<a class="text-muted d-block"
href="/custom?feed_url={{ feed.url|encode_url }}">
Customize message
{% if not should_send_embed %}(Currently active){% endif %}
</a> </a>
{% if not "youtube.com/feeds/videos.xml" in feed.url %} {% if not "youtube.com/feeds/videos.xml" in feed.url %}
<a class="text-muted d-block" href="/embed?feed_url={{ feed.url|encode_url }}"> <a class="text-muted d-block"
Customize embed {% if should_send_embed %}(Currently active){% endif %} href="/embed?feed_url={{ feed.url|encode_url }}">
Customize embed
{% if should_send_embed %}(Currently active){% endif %}
</a> </a>
{% endif %} {% endif %}
</div> </div>
<!-- Feed Metadata -->
<div class="mt-4 border-top border-secondary pt-3">
<h5 class="mb-3">Feed Information</h5>
<div class="row text-muted">
<div class="col-md-6 mb-2">
<small><strong>Added:</strong> {{ feed.added | relative_time }}</small>
</div>
<div class="col-md-6 mb-2">
<small><strong>Last Updated:</strong> {{ feed.last_updated | relative_time }}</small>
</div>
<div class="col-md-6 mb-2">
<small><strong>Last Retrieved:</strong> {{ feed.last_retrieved | relative_time }}</small>
</div>
<div class="col-md-6 mb-2">
<small><strong>Next Update:</strong> {{ feed.update_after | relative_time }}</small>
</div>
<div class="col-md-6 mb-2">
<small><strong>Updates:</strong> <span class="badge {{ 'bg-success' if feed.updates_enabled else 'bg-danger' }}">{{ 'Enabled' if feed.updates_enabled else 'Disabled' }}</span></small>
</div>
</div>
</div>
<!-- Update Interval Configuration -->
<div class="mt-4 border-top border-secondary pt-3">
<h5 class="mb-3">Update Interval</h5>
{% if feed_interval %}
<p class="text-muted mb-2">
Current: <strong>{{ feed_interval }} minutes</strong>
{% if feed_interval >= 60 %}({{ (feed_interval / 60) | round(1) }} hours){% endif %}
<span class="badge bg-info">Custom</span>
</p>
{% else %}
<p class="text-muted mb-2">
Current: <strong>{{ global_interval }} minutes</strong>
{% if global_interval >= 60 %}({{ (global_interval / 60) | round(1) }} hours){% endif %}
<span class="badge bg-secondary">Using global default</span>
</p>
{% endif %}
<form action="/set_update_interval" method="post" class="mb-2">
<input type="hidden" name="feed_url" value="{{ feed.url }}" />
<div class="input-group input-group-sm mb-2">
<input type="number"
class="form-control form-control-sm interval-input"
name="interval_minutes"
placeholder="Minutes"
min="1"
value="{{ feed_interval if feed_interval else global_interval }}"
required />
<button class="btn btn-primary" type="submit">Set Interval</button>
</div>
</form>
{% if feed_interval %}
<form action="/reset_update_interval" method="post" class="d-inline">
<input type="hidden" name="feed_url" value="{{ feed.url }}" />
<button class="btn btn-secondary btn-sm" type="submit">Reset to Global Default</button>
</form>
{% endif %}
</div>
</div> </div>
{# Rendered HTML content #} {# Rendered HTML content #}
<pre>{{ html|safe }}</pre> <pre>{{ html|safe }}</pre>
{% if is_show_more_entries_button_visible %}
{% if show_more_entires_button %}
<a class="btn btn-dark mt-3" <a class="btn btn-dark mt-3"
href="/feed?feed_url={{ feed.url|encode_url }}&starting_after={{ last_entry.id|encode_url }}"> href="/feed?feed_url={{ feed.url|encode_url }}&starting_after={{ last_entry.id|encode_url }}">
Show more entries Show more entries
</a> </a>
{% endif %} {% endif %}
{% endblock content %} {% endblock content %}

View file

@ -1,6 +1,9 @@
<nav class="navbar navbar-expand-md navbar-dark p-2 mb-3 border-bottom border-warning"> <nav class="navbar navbar-expand-md navbar-dark p-2 mb-3 border-bottom border-warning">
<div class="container-fluid"> <div class="container-fluid">
<button class="navbar-toggler ms-auto" type="button" data-bs-toggle="collapse" data-bs-target="#collapseNavbar"> <button class="navbar-toggler ms-auto"
type="button"
data-bs-toggle="collapse"
data-bs-target="#collapseNavbar">
<span class="navbar-toggler-icon"></span> <span class="navbar-toggler-icon"></span>
</button> </button>
<div class="navbar-collapse collapse" id="collapseNavbar"> <div class="navbar-collapse collapse" id="collapseNavbar">
@ -16,10 +19,28 @@
<li class="nav-item"> <li class="nav-item">
<a class="nav-link" href="/webhooks">Webhooks</a> <a class="nav-link" href="/webhooks">Webhooks</a>
</li> </li>
<li class="nav-item nav-link d-none d-md-block">|</li>
<li class="nav-item">
<a class="nav-link" href="/settings">Settings</a>
</li>
{% if get_backup_path() %}
<li class="nav-item nav-link d-none d-md-block">|</li>
<li class="nav-item">
<form action="/backup" method="post" class="d-inline">
<button type="submit"
class="nav-link btn btn-link text-decoration-none"
onclick="return confirm('Create a manual git backup of the current state?');">
Backup
</button>
</form>
</li>
{% endif %}
</ul> </ul>
{# Search #} {# Search #}
<form action="/search" method="get" class="ms-auto w-50 input-group"> <form action="/search" method="get" class="ms-auto w-50 input-group">
<input name="query" class="form-control bg-dark border-dark text-muted" type="search" <input name="query"
class="form-control bg-dark border-dark text-muted"
type="search"
placeholder="Search" /> placeholder="Search" />
</form> </form>
{# Donate button #} {# Donate button #}

View file

@ -0,0 +1,122 @@
{% extends "base.html" %}
{% block title %}
| Settings
{% endblock title %}
{% block content %}
<section>
<div class="text-light">
<div class="d-flex flex-wrap justify-content-between align-items-center gap-2">
<h2 class="mb-0">Global Settings</h2>
</div>
<p class="text-muted mt-2 mb-4">
Set a default interval for all feeds. Individual feeds can still override this value.
</p>
<div class="mb-4">
<div>
Current default is {{ global_interval }} min.
Even though we check ETags and Last-Modified headers, choosing a very low interval may cause issues with some feeds or cause excessive load on the server hosting the feed. Remember to be kind.
</div>
</div>
</div>
<form action="/set_global_update_interval" method="post" class="mb-2">
<div class="settings-form-row mb-2">
<label for="interval_minutes" class="form-label mb-1">Default interval (minutes)</label>
<div class="input-group input-group-lg">
<input id="interval_minutes"
type="number"
class="form-control settings-input"
name="interval_minutes"
placeholder="Minutes"
min="1"
value="{{ global_interval }}"
required />
<button class="btn btn-primary px-4" type="submit">Save</button>
</div>
</div>
</form>
</section>
<section class="mt-5">
<div class="text-light">
<div class="d-flex flex-wrap justify-content-between align-items-center gap-2">
<h2 class="mb-0">Feed Update Intervals</h2>
</div>
<p class="text-muted mt-2 mb-4">
Customize the update interval for individual feeds. Leave empty or reset to use the global default.
</p>
</div>
{% if feed_intervals %}
<div class="table-responsive">
<table class="table table-dark table-hover">
<thead>
<tr>
<th>Feed</th>
<th>Domain</th>
<th>Status</th>
<th>Interval</th>
<th>Last Updated</th>
<th>Next Update</th>
<th>Set Interval (min)</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
{% for item in feed_intervals %}
<tr>
<td>
<a href="/feed?feed_url={{ item.feed.url|encode_url }}"
class="text-light text-decoration-none">{{ item.feed.title }}</a>
</td>
<td>
<span class="text-muted small">{{ item.domain }}</span>
</td>
<td>
<span class="badge {{ 'bg-success' if item.feed.updates_enabled else 'bg-danger' }}">
{{ 'Enabled' if item.feed.updates_enabled else 'Disabled' }}
</span>
</td>
<td>
<span>{{ item.effective_interval }} min</span>
{% if item.interval %}
<span class="badge bg-info ms-1">Custom</span>
{% else %}
<span class="badge bg-secondary ms-1">Global</span>
{% endif %}
</td>
<td>
<small class="text-muted">{{ item.feed.last_updated | relative_time }}</small>
</td>
<td>
<small class="text-muted">{{ item.feed.update_after | relative_time }}</small>
</td>
<td>
<form action="/set_update_interval" method="post" class="d-flex gap-2">
<input type="hidden" name="feed_url" value="{{ item.feed.url }}" />
<input type="hidden" name="redirect_to" value="/settings" />
<input type="number"
class="form-control form-control-sm interval-input"
name="interval_minutes"
placeholder="Minutes"
min="1"
value="{{ item.interval if item.interval else global_interval }}" />
<button class="btn btn-primary btn-sm" type="submit">Set</button>
</form>
</td>
<td>
{% if item.interval %}
<form action="/reset_update_interval" method="post" class="d-inline">
<input type="hidden" name="feed_url" value="{{ item.feed.url }}" />
<input type="hidden" name="redirect_to" value="/settings" />
<button class="btn btn-outline-secondary btn-sm" type="submit">Reset</button>
</form>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% else %}
<p class="text-muted">No feeds added yet.</p>
{% endif %}
</section>
{% endblock content %}

View file

@ -28,14 +28,19 @@ dev = ["djlint", "pytest"]
requires = ["poetry-core>=1.0.0"] requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api" build-backend = "poetry.core.masonry.api"
[tool.ruff] [tool.ruff]
preview = true preview = true
unsafe-fixes = true
fix = true
line-length = 120 line-length = 120
lint.select = ["ALL"] lint.select = ["ALL"]
lint.unfixable = ["F841"] # Don't automatically remove unused variables
lint.pydocstyle.convention = "google" lint.pydocstyle.convention = "google"
lint.isort.required-imports = ["from __future__ import annotations"] lint.isort.required-imports = ["from __future__ import annotations"]
lint.pycodestyle.ignore-overlong-task-comments = true lint.isort.force-single-line = true
lint.ignore = [ lint.ignore = [
"ANN201", # Checks that public functions and methods have return type annotations. "ANN201", # Checks that public functions and methods have return type annotations.
@ -81,15 +86,7 @@ lint.ignore = [
[tool.ruff.lint.per-file-ignores] [tool.ruff.lint.per-file-ignores]
"tests/*" = ["S101", "D103", "PLR2004"] "tests/*" = ["S101", "D103", "PLR2004"]
[tool.ruff.lint.mccabe]
max-complexity = 15 # Don't judge lol
[tool.pytest.ini_options] [tool.pytest.ini_options]
python_files = ["test_*.py"]
log_cli = true
log_cli_level = "DEBUG"
log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)"
log_cli_date_format = "%Y-%m-%d %H:%M:%S"
filterwarnings = [ filterwarnings = [
"ignore::bs4.GuessedAtParserWarning", "ignore::bs4.GuessedAtParserWarning",
"ignore:functools\\.partial will be a method descriptor in future Python versions; wrap it in staticmethod\\(\\) if you want to preserve the old behavior:FutureWarning", "ignore:functools\\.partial will be a method descriptor in future Python versions; wrap it in staticmethod\\(\\) if you want to preserve the old behavior:FutureWarning",

View file

@ -4,9 +4,13 @@ import tempfile
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from reader import Entry, Feed, Reader, make_reader from reader import Entry
from reader import Feed
from reader import Reader
from reader import make_reader
from discord_rss_bot.filter.blacklist import entry_should_be_skipped, feed_has_blacklist_tags from discord_rss_bot.filter.blacklist import entry_should_be_skipped
from discord_rss_bot.filter.blacklist import feed_has_blacklist_tags
if TYPE_CHECKING: if TYPE_CHECKING:
from collections.abc import Iterable from collections.abc import Iterable

View file

@ -5,7 +5,9 @@ import tempfile
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from discord_rss_bot.custom_filters import encode_url, entry_is_blacklisted, entry_is_whitelisted from discord_rss_bot.custom_filters import encode_url
from discord_rss_bot.custom_filters import entry_is_blacklisted
from discord_rss_bot.custom_filters import entry_is_whitelisted
from discord_rss_bot.settings import get_reader from discord_rss_bot.settings import get_reader
if TYPE_CHECKING: if TYPE_CHECKING:

View file

@ -4,19 +4,20 @@ import os
import tempfile import tempfile
from pathlib import Path from pathlib import Path
from typing import LiteralString from typing import LiteralString
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock
from unittest.mock import patch
import pytest import pytest
from reader import Feed, Reader, make_reader from reader import Feed
from reader import Reader
from reader import make_reader
from discord_rss_bot.feeds import ( from discord_rss_bot.feeds import extract_domain
extract_domain, from discord_rss_bot.feeds import is_youtube_feed
is_youtube_feed, from discord_rss_bot.feeds import send_entry_to_discord
send_entry_to_discord, from discord_rss_bot.feeds import send_to_discord
send_to_discord, from discord_rss_bot.feeds import should_send_embed_check
should_send_embed_check, from discord_rss_bot.feeds import truncate_webhook_message
truncate_webhook_message,
)
from discord_rss_bot.missing_tags import add_missing_tags from discord_rss_bot.missing_tags import add_missing_tags

570
tests/test_git_backup.py Normal file
View file

@ -0,0 +1,570 @@
from __future__ import annotations
import contextlib
import json
import shutil
import subprocess # noqa: S404
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
from unittest.mock import MagicMock
from unittest.mock import patch
import pytest
from fastapi.testclient import TestClient
from discord_rss_bot.git_backup import commit_state_change
from discord_rss_bot.git_backup import export_state
from discord_rss_bot.git_backup import get_backup_path
from discord_rss_bot.git_backup import get_backup_remote
from discord_rss_bot.git_backup import setup_backup_repo
from discord_rss_bot.main import app
if TYPE_CHECKING:
from pathlib import Path
SKIP_IF_NO_GIT: pytest.MarkDecorator = pytest.mark.skipif(
shutil.which("git") is None, reason="git executable not found"
)
def test_get_backup_path_unset(monkeypatch: pytest.MonkeyPatch) -> None:
"""get_backup_path returns None when GIT_BACKUP_PATH is not set."""
monkeypatch.delenv("GIT_BACKUP_PATH", raising=False)
assert get_backup_path() is None
def test_get_backup_path_set(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""get_backup_path returns a Path when GIT_BACKUP_PATH is set."""
monkeypatch.setenv("GIT_BACKUP_PATH", str(tmp_path))
result: Path | None = get_backup_path()
assert result == tmp_path
def test_get_backup_path_strips_whitespace(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""get_backup_path strips surrounding whitespace from the env var value."""
monkeypatch.setenv("GIT_BACKUP_PATH", f" {tmp_path} ")
result: Path | None = get_backup_path()
assert result == tmp_path
def test_get_backup_remote_unset(monkeypatch: pytest.MonkeyPatch) -> None:
"""get_backup_remote returns empty string when GIT_BACKUP_REMOTE is not set."""
monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
assert not get_backup_remote()
def test_get_backup_remote_set(monkeypatch: pytest.MonkeyPatch) -> None:
"""get_backup_remote returns the configured remote URL."""
monkeypatch.setenv("GIT_BACKUP_REMOTE", "git@github.com:user/repo.git")
assert get_backup_remote() == "git@github.com:user/repo.git"
@SKIP_IF_NO_GIT
def test_setup_backup_repo_creates_git_repo(tmp_path: Path) -> None:
"""setup_backup_repo initialises a git repo in a fresh directory."""
backup_path: Path = tmp_path / "backup"
result: bool = setup_backup_repo(backup_path)
assert result is True
assert (backup_path / ".git").exists()
@SKIP_IF_NO_GIT
def test_setup_backup_repo_idempotent(tmp_path: Path) -> None:
"""setup_backup_repo does not fail when called on an existing repo."""
backup_path: Path = tmp_path / "backup"
assert setup_backup_repo(backup_path) is True
assert setup_backup_repo(backup_path) is True
def test_setup_backup_repo_adds_origin_remote(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""setup_backup_repo adds remote 'origin' when GIT_BACKUP_REMOTE is set."""
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_REMOTE", "git@github.com:user/private.git")
with patch("discord_rss_bot.git_backup.subprocess.run") as mock_run:
# git config --local queries fail initially so setup writes defaults.
mock_run.side_effect = [
MagicMock(returncode=0), # git init
MagicMock(returncode=1), # config user.email read
MagicMock(returncode=0), # config user.email write
MagicMock(returncode=1), # config user.name read
MagicMock(returncode=0), # config user.name write
MagicMock(returncode=1), # remote get-url origin (missing)
MagicMock(returncode=0), # remote add origin <url>
]
assert setup_backup_repo(backup_path) is True
called_commands: list[list[str]] = [call.args[0] for call in mock_run.call_args_list]
assert ["remote", "add", "origin", "git@github.com:user/private.git"] in [
cmd[-4:] for cmd in called_commands if len(cmd) >= 4
]
def test_setup_backup_repo_updates_origin_remote(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""setup_backup_repo updates existing origin when URL differs."""
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_REMOTE", "git@github.com:user/new-private.git")
with patch("discord_rss_bot.git_backup.subprocess.run") as mock_run:
# Existing repo path: no git init call.
(backup_path / ".git").mkdir(parents=True)
mock_run.side_effect = [
MagicMock(returncode=0), # config user.email read
MagicMock(returncode=0), # config user.name read
MagicMock(returncode=0, stdout=b"git@github.com:user/old-private.git\n"), # remote get-url origin
MagicMock(returncode=0), # remote set-url origin <new>
]
assert setup_backup_repo(backup_path) is True
called_commands: list[list[str]] = [call.args[0] for call in mock_run.call_args_list]
assert ["remote", "set-url", "origin", "git@github.com:user/new-private.git"] in [
cmd[-4:] for cmd in called_commands if len(cmd) >= 4
]
def test_export_state_creates_state_json(tmp_path: Path) -> None:
"""export_state writes a valid state.json to the backup directory."""
mock_reader = MagicMock()
# Feeds
feed1 = MagicMock()
feed1.url = "https://example.com/feed.rss"
mock_reader.get_feeds.return_value = [feed1]
# Tag values: webhook present, everything else absent (returns None)
def get_tag_side_effect(
feed_or_key: tuple | str,
tag: str | None = None,
default: str | None = None,
) -> list[Any] | str | None:
if feed_or_key == () and tag is None:
# Called for global webhooks list
return []
if tag == "webhook":
return "https://discord.com/api/webhooks/123/abc"
return default
mock_reader.get_tag.side_effect = get_tag_side_effect
backup_path: Path = tmp_path / "backup"
backup_path.mkdir()
export_state(mock_reader, backup_path)
state_file: Path = backup_path / "state.json"
assert state_file.exists(), "state.json should be created by export_state"
data: dict[str, Any] = json.loads(state_file.read_text(encoding="utf-8"))
assert "feeds" in data
assert "webhooks" in data
assert data["feeds"][0]["url"] == "https://example.com/feed.rss"
assert data["feeds"][0]["webhook"] == "https://discord.com/api/webhooks/123/abc"
def test_export_state_omits_empty_tags(tmp_path: Path) -> None:
"""export_state does not include tags with empty-string or None values."""
mock_reader = MagicMock()
feed1 = MagicMock()
feed1.url = "https://example.com/feed.rss"
mock_reader.get_feeds.return_value = [feed1]
def get_tag_side_effect(
feed_or_key: tuple | str,
tag: str | None = None,
default: str | None = None,
) -> list[Any] | str | None:
if feed_or_key == ():
return []
# Return empty string for all tags
return default # default is None
mock_reader.get_tag.side_effect = get_tag_side_effect
backup_path: Path = tmp_path / "backup"
backup_path.mkdir()
export_state(mock_reader, backup_path)
data: dict[str, Any] = json.loads((backup_path / "state.json").read_text())
# Only "url" key should be present (no empty-value tags)
assert list(data["feeds"][0].keys()) == ["url"]
def test_commit_state_change_noop_when_not_configured(monkeypatch: pytest.MonkeyPatch) -> None:
"""commit_state_change does nothing when GIT_BACKUP_PATH is not set."""
monkeypatch.delenv("GIT_BACKUP_PATH", raising=False)
mock_reader = MagicMock()
# Should not raise and should not call reader methods for export
commit_state_change(mock_reader, "Add feed example.com/rss")
mock_reader.get_feeds.assert_not_called()
@SKIP_IF_NO_GIT
def test_commit_state_change_commits(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""commit_state_change creates a commit in the backup repo."""
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
mock_reader = MagicMock()
mock_reader.get_feeds.return_value = []
mock_reader.get_tag.return_value = []
commit_state_change(mock_reader, "Add feed https://example.com/rss")
# Verify a commit was created in the backup repo
git_executable: str | None = shutil.which("git")
assert git_executable is not None, "git executable not found"
result: subprocess.CompletedProcess[str] = subprocess.run( # noqa: S603
[git_executable, "-C", str(backup_path), "log", "--oneline"],
capture_output=True,
text=True,
check=False,
)
assert result.returncode == 0
assert "Add feed https://example.com/rss" in result.stdout
@SKIP_IF_NO_GIT
def test_commit_state_change_no_double_commit(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""commit_state_change does not create a commit when state has not changed."""
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
mock_reader = MagicMock()
mock_reader.get_feeds.return_value = []
mock_reader.get_tag.return_value = []
commit_state_change(mock_reader, "First commit")
commit_state_change(mock_reader, "Should not appear")
git_executable: str | None = shutil.which("git")
assert git_executable is not None, "git executable not found"
result: subprocess.CompletedProcess[str] = subprocess.run( # noqa: S603
[git_executable, "-C", str(backup_path), "log", "--oneline"],
capture_output=True,
text=True,
check=False,
)
assert result.returncode == 0
assert "First commit" in result.stdout
assert "Should not appear" not in result.stdout
def test_commit_state_change_push_when_remote_set(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""commit_state_change calls git push when GIT_BACKUP_REMOTE is configured."""
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.setenv("GIT_BACKUP_REMOTE", "git@github.com:user/private.git")
mock_reader = MagicMock()
mock_reader.get_feeds.return_value = []
mock_reader.get_tag.return_value = []
with patch("discord_rss_bot.git_backup.subprocess.run") as mock_run:
# Make all subprocess calls succeed
mock_run.return_value = MagicMock(returncode=1) # returncode=1 means staged changes exist
commit_state_change(mock_reader, "Add feed https://example.com/rss")
called_commands: list[list[str]] = [call.args[0] for call in mock_run.call_args_list]
push_calls: list[list[str]] = [cmd for cmd in called_commands if "push" in cmd]
assert push_calls, "git push should have been called when GIT_BACKUP_REMOTE is set"
assert any(cmd[-3:] == ["push", "origin", "HEAD"] for cmd in called_commands), (
"git push should target configured remote name 'origin'"
)
def test_commit_state_change_no_push_when_remote_unset(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""commit_state_change does not call git push when GIT_BACKUP_REMOTE is not set."""
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
mock_reader = MagicMock()
mock_reader.get_feeds.return_value = []
mock_reader.get_tag.return_value = []
with patch("discord_rss_bot.git_backup.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=1)
commit_state_change(mock_reader, "Add feed https://example.com/rss")
called_commands: list[list[str]] = [call.args[0] for call in mock_run.call_args_list]
push_calls: list[list[str]] = [cmd for cmd in called_commands if "push" in cmd]
assert not push_calls, "git push should NOT be called when GIT_BACKUP_REMOTE is not set"
@SKIP_IF_NO_GIT
def test_commit_state_change_e2e_push_to_bare_repo(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""End-to-end test: commit_state_change pushes to a real bare git repository."""
git_executable: str | None = shutil.which("git")
assert git_executable is not None, "git executable not found"
# Create a bare remote repository
bare_repo_path: Path = tmp_path / "remote.git"
subprocess.run([git_executable, "init", "--bare", str(bare_repo_path)], check=True, capture_output=True) # noqa: S603
# Configure backup with remote pointing to bare repo
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.setenv("GIT_BACKUP_REMOTE", str(bare_repo_path))
# Create mock reader with some state
mock_reader = MagicMock()
feed1 = MagicMock()
feed1.url = "https://example.com/feed.rss"
mock_reader.get_feeds.return_value = [feed1]
def get_tag_side_effect(
feed_or_key: tuple | str,
tag: str | None = None,
default: str | None = None,
) -> list[Any] | str | None:
if feed_or_key == ():
return []
if tag == "webhook":
return "https://discord.com/api/webhooks/123/abc"
return default
mock_reader.get_tag.side_effect = get_tag_side_effect
# Perform backup with commit and push
commit_state_change(mock_reader, "Initial backup")
# Verify commit exists in local backup repo
result: subprocess.CompletedProcess[str] = subprocess.run( # noqa: S603
[git_executable, "-C", str(backup_path), "log", "--oneline"],
capture_output=True,
text=True,
check=True,
)
assert "Initial backup" in result.stdout
# Verify origin remote is configured correctly
result = subprocess.run( # noqa: S603
[git_executable, "-C", str(backup_path), "remote", "get-url", "origin"],
capture_output=True,
text=True,
check=True,
)
assert result.stdout.strip() == str(bare_repo_path)
# Verify commit was pushed to the bare remote
result = subprocess.run( # noqa: S603
[git_executable, "-C", str(bare_repo_path), "log", "--oneline", "master"],
capture_output=True,
text=True,
check=True,
)
assert "Initial backup" in result.stdout
# Verify state.json content in the remote
result = subprocess.run( # noqa: S603
[git_executable, "-C", str(bare_repo_path), "show", "master:state.json"],
capture_output=True,
text=True,
check=True,
)
state_data: dict[str, Any] = json.loads(result.stdout)
assert state_data["feeds"][0]["url"] == "https://example.com/feed.rss"
assert state_data["feeds"][0]["webhook"] == "https://discord.com/api/webhooks/123/abc"
# Perform a second backup to verify subsequent pushes work
feed2 = MagicMock()
feed2.url = "https://another.com/feed.xml"
mock_reader.get_feeds.return_value = [feed1, feed2]
commit_state_change(mock_reader, "Add second feed")
# Verify both commits are in the remote
result = subprocess.run( # noqa: S603
[git_executable, "-C", str(bare_repo_path), "log", "--oneline", "master"],
capture_output=True,
text=True,
check=True,
)
assert "Initial backup" in result.stdout
assert "Add second feed" in result.stdout
# Integration tests for embed-related endpoint backups
client: TestClient = TestClient(app)
test_webhook_name: str = "Test Backup Webhook"
test_webhook_url: str = "https://discord.com/api/webhooks/999999999/testbackupwebhook"
test_feed_url: str = "https://lovinator.space/rss_test.xml"
def setup_test_feed() -> None:
"""Set up a test webhook and feed for endpoint tests."""
# Clean up existing test data
with contextlib.suppress(Exception):
client.post(url="/remove", data={"feed_url": test_feed_url})
with contextlib.suppress(Exception):
client.post(url="/delete_webhook", data={"webhook_url": test_webhook_url})
# Create webhook and feed
client.post(
url="/add_webhook",
data={"webhook_name": test_webhook_name, "webhook_url": test_webhook_url},
)
client.post(url="/add", data={"feed_url": test_feed_url, "webhook_dropdown": test_webhook_name})
def test_post_embed_triggers_backup(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""Posting to /embed should trigger a git backup with appropriate message."""
# Set up git backup
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
setup_test_feed()
with patch("discord_rss_bot.main.commit_state_change") as mock_commit:
response = client.post(
url="/embed",
data={
"feed_url": test_feed_url,
"title": "Custom Title",
"description": "Custom Description",
"color": "#FF5733",
},
)
assert response.status_code == 200, f"Failed to post embed: {response.text}"
mock_commit.assert_called_once()
# Verify the commit message contains the feed URL
call_args = mock_commit.call_args
assert call_args is not None
commit_message: str = call_args[0][1]
assert "Update embed settings" in commit_message
assert test_feed_url in commit_message
def test_post_use_embed_triggers_backup(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""Posting to /use_embed should trigger a git backup."""
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
setup_test_feed()
with patch("discord_rss_bot.main.commit_state_change") as mock_commit:
response = client.post(url="/use_embed", data={"feed_url": test_feed_url})
assert response.status_code == 200, f"Failed to enable embed: {response.text}"
mock_commit.assert_called_once()
# Verify the commit message
call_args = mock_commit.call_args
assert call_args is not None
commit_message: str = call_args[0][1]
assert "Enable embed mode" in commit_message
assert test_feed_url in commit_message
def test_post_use_text_triggers_backup(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""Posting to /use_text should trigger a git backup."""
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
setup_test_feed()
with patch("discord_rss_bot.main.commit_state_change") as mock_commit:
response = client.post(url="/use_text", data={"feed_url": test_feed_url})
assert response.status_code == 200, f"Failed to disable embed: {response.text}"
mock_commit.assert_called_once()
# Verify the commit message
call_args = mock_commit.call_args
assert call_args is not None
commit_message: str = call_args[0][1]
assert "Disable embed mode" in commit_message
assert test_feed_url in commit_message
def test_post_custom_message_triggers_backup(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""Posting to /custom should trigger a git backup."""
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
setup_test_feed()
with patch("discord_rss_bot.main.commit_state_change") as mock_commit:
response = client.post(
url="/custom",
data={
"feed_url": test_feed_url,
"custom_message": "Check out this entry: {entry.title}",
},
)
assert response.status_code == 200, f"Failed to set custom message: {response.text}"
mock_commit.assert_called_once()
# Verify the commit message
call_args = mock_commit.call_args
assert call_args is not None
commit_message: str = call_args[0][1]
assert "Update custom message" in commit_message
assert test_feed_url in commit_message
@SKIP_IF_NO_GIT
def test_embed_backup_end_to_end(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""End-to-end test: customizing embed creates a real commit in the backup repo."""
git_executable: str | None = shutil.which("git")
assert git_executable is not None, "git executable not found"
backup_path: Path = tmp_path / "backup"
monkeypatch.setenv("GIT_BACKUP_PATH", str(backup_path))
monkeypatch.delenv("GIT_BACKUP_REMOTE", raising=False)
setup_test_feed()
# Post embed customization
response = client.post(
url="/embed",
data={
"feed_url": test_feed_url,
"title": "{entry.title}",
"description": "{entry.summary}",
"color": "#0099FF",
"image_url": "{entry.image}",
},
)
assert response.status_code == 200, f"Failed to customize embed: {response.text}"
# Verify a commit was created
result: subprocess.CompletedProcess[str] = subprocess.run( # noqa: S603
[git_executable, "-C", str(backup_path), "log", "--oneline"],
capture_output=True,
text=True,
check=False,
)
assert result.returncode == 0, f"Failed to read git log: {result.stderr}"
assert "Update embed settings" in result.stdout, f"Commit not found in log: {result.stdout}"
# Verify state.json contains embed data
state_file: Path = backup_path / "state.json"
assert state_file.exists(), "state.json should exist in backup repo"
state_data: dict[str, Any] = json.loads(state_file.read_text(encoding="utf-8"))
# Find our test feed in the state
test_feed_data = next((feed for feed in state_data["feeds"] if feed["url"] == test_feed_url), None)
assert test_feed_data is not None, f"Test feed not found in state.json: {state_data}"
# The embed settings are stored as a nested dict under custom_embed tag
# This verifies the embed customization was persisted
assert "webhook" in test_feed_data, "Feed should have webhook set"

View file

@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import re
import urllib.parse import urllib.parse
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
@ -8,6 +9,9 @@ from fastapi.testclient import TestClient
from discord_rss_bot.main import app from discord_rss_bot.main import app
if TYPE_CHECKING: if TYPE_CHECKING:
from pathlib import Path
import pytest
from httpx import Response from httpx import Response
client: TestClient = TestClient(app) client: TestClient = TestClient(app)
@ -242,3 +246,196 @@ def test_update_feed_not_found() -> None:
# Check that it returns a 404 status code # Check that it returns a 404 status code
assert response.status_code == 404, f"Expected 404 for non-existent feed, got: {response.status_code}" assert response.status_code == 404, f"Expected 404 for non-existent feed, got: {response.status_code}"
assert "Feed not found" in response.text assert "Feed not found" in response.text
def test_navbar_backup_link_hidden_when_not_configured(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that the backup link is not shown in the navbar when GIT_BACKUP_PATH is not set."""
# Ensure GIT_BACKUP_PATH is not set
monkeypatch.delenv("GIT_BACKUP_PATH", raising=False)
# Get the index page
response: Response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
# Check that the backup button is not in the response
assert "Backup" not in response.text or 'action="/backup"' not in response.text, (
"Backup button should not be visible when GIT_BACKUP_PATH is not configured"
)
def test_navbar_backup_link_visible_when_configured(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""Test that the backup link is shown in the navbar when GIT_BACKUP_PATH is set."""
# Set GIT_BACKUP_PATH
monkeypatch.setenv("GIT_BACKUP_PATH", str(tmp_path))
# Get the index page
response: Response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}"
# Check that the backup button is in the response
assert "Backup" in response.text, "Backup button text should be visible when GIT_BACKUP_PATH is configured"
assert 'action="/backup"' in response.text, "Backup form should be visible when GIT_BACKUP_PATH is configured"
def test_backup_endpoint_returns_error_when_not_configured(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that the backup endpoint returns an error when GIT_BACKUP_PATH is not set."""
# Ensure GIT_BACKUP_PATH is not set
monkeypatch.delenv("GIT_BACKUP_PATH", raising=False)
# Try to trigger a backup
response: Response = client.post(url="/backup")
# Should redirect to index with error message
assert response.status_code == 200, f"Failed to post /backup: {response.text}"
assert "Git backup is not configured" in response.text or "GIT_BACKUP_PATH" in response.text, (
"Error message about backup not being configured should be shown"
)
def test_show_more_entries_button_visible_when_many_entries() -> None:
"""Test that the 'Show more entries' button is visible when there are more than 20 entries."""
# Add the webhook first
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists
feeds: Response = client.get(url="/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
# Add the feed
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Get the feed page
response: Response = client.get(url="/feed", params={"feed_url": feed_url})
assert response.status_code == 200, f"Failed to get /feed: {response.text}"
# Check if the feed has more than 20 entries by looking at the response
# The button should be visible if there are more than 20 entries
# We check for both the button text and the link structure
if "Show more entries" in response.text:
# Button is visible - verify it has the correct structure
assert "starting_after=" in response.text, "Show more entries button should contain starting_after parameter"
# The button should be a link to the feed page with pagination
assert (
f'href="/feed?feed_url={urllib.parse.quote(feed_url)}' in response.text
or f'href="/feed?feed_url={encoded_feed_url(feed_url)}' in response.text
), "Show more entries button should link back to the feed page"
def test_show_more_entries_button_not_visible_when_few_entries() -> None:
"""Test that the 'Show more entries' button is not visible when there are 20 or fewer entries."""
# Use a feed with very few entries
small_feed_url = "https://lovinator.space/rss_test_small.xml"
# Clean up if exists
client.post(url="/remove", data={"feed_url": small_feed_url})
# Add a small feed (this may not exist, so this test is conditional)
response: Response = client.post(url="/add", data={"feed_url": small_feed_url, "webhook_dropdown": webhook_name})
if response.status_code == 200:
# Get the feed page
response: Response = client.get(url="/feed", params={"feed_url": small_feed_url})
assert response.status_code == 200, f"Failed to get /feed: {response.text}"
# If the feed has 20 or fewer entries, the button should not be visible
# We check the total entry count in the page
if "0 entries" in response.text or " entries)" in response.text:
# Extract entry count and verify button visibility
match: re.Match[str] | None = re.search(r"\((\d+) entries\)", response.text)
if match:
entry_count = int(match.group(1))
if entry_count <= 20:
assert "Show more entries" not in response.text, (
f"Show more entries button should not be visible when there are {entry_count} entries"
)
# Clean up
client.post(url="/remove", data={"feed_url": small_feed_url})
def test_show_more_entries_pagination_works() -> None:
"""Test that pagination with starting_after parameter works correctly."""
# Add the webhook first
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists
feeds: Response = client.get(url="/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
# Add the feed
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Get the first page
response: Response = client.get(url="/feed", params={"feed_url": feed_url})
assert response.status_code == 200, f"Failed to get /feed: {response.text}"
# Check if pagination is available
if "Show more entries" in response.text and "starting_after=" in response.text:
# Extract the starting_after parameter from the button link
match: re.Match[str] | None = re.search(r'starting_after=([^"&]+)', response.text)
if match:
starting_after_id: str = match.group(1)
# Request the second page
response: Response = client.get(
url="/feed", params={"feed_url": feed_url, "starting_after": starting_after_id}
)
assert response.status_code == 200, f"Failed to get paginated feed: {response.text}"
# Verify we got a valid response (the page should contain entries)
assert "entries)" in response.text, "Paginated page should show entry count"
def test_show_more_entries_button_context_variable() -> None:
"""Test that the button visibility variable is correctly passed to the template context."""
# Add the webhook first
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
# Remove the feed if it already exists
feeds: Response = client.get(url="/")
if feed_url in feeds.text:
client.post(url="/remove", data={"feed_url": feed_url})
# Add the feed
response: Response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Get the feed page
response: Response = client.get(url="/feed", params={"feed_url": feed_url})
assert response.status_code == 200, f"Failed to get /feed: {response.text}"
# Extract the total entries count from the page
match: re.Match[str] | None = re.search(r"\((\d+) entries\)", response.text)
if match:
entry_count = int(match.group(1))
# If more than 20 entries, button should be visible
if entry_count > 20:
assert "Show more entries" in response.text, (
f"Button should be visible when there are {entry_count} entries (more than 20)"
)
# If 20 or fewer entries, button should not be visible
else:
assert "Show more entries" not in response.text, (
f"Button should not be visible when there are {entry_count} entries (20 or fewer)"
)

View file

@ -4,7 +4,9 @@ import tempfile
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from reader import Feed, Reader, make_reader from reader import Feed
from reader import Reader
from reader import make_reader
from discord_rss_bot.search import create_search_context from discord_rss_bot.search import create_search_context

View file

@ -6,7 +6,9 @@ from pathlib import Path
from reader import Reader from reader import Reader
from discord_rss_bot.settings import data_dir, default_custom_message, get_reader from discord_rss_bot.settings import data_dir
from discord_rss_bot.settings import default_custom_message
from discord_rss_bot.settings import get_reader
def test_reader() -> None: def test_reader() -> None:

View file

@ -0,0 +1,88 @@
from __future__ import annotations
import urllib.parse
from typing import TYPE_CHECKING
from fastapi.testclient import TestClient
from discord_rss_bot.main import app
if TYPE_CHECKING:
from httpx import Response
client: TestClient = TestClient(app)
webhook_name: str = "Test Webhook for Update Interval"
webhook_url: str = "https://discord.com/api/webhooks/1234567890/test_update_interval"
feed_url: str = "https://lovinator.space/rss_test.xml"
def test_global_update_interval() -> None:
"""Test setting the global update interval."""
# Set global update interval to 30 minutes
response: Response = client.post("/set_global_update_interval", data={"interval_minutes": "30"})
assert response.status_code == 200, f"Failed to set global interval: {response.text}"
# Check that the settings page shows the new interval
response = client.get("/settings")
assert response.status_code == 200, f"Failed to get settings page: {response.text}"
assert "30" in response.text, "Global interval not updated on settings page"
def test_per_feed_update_interval() -> None:
"""Test setting per-feed update interval."""
# Clean up any existing feed/webhook
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})
client.post(url="/remove", data={"feed_url": feed_url})
# Add webhook and feed
response: Response = client.post(
url="/add_webhook",
data={"webhook_name": webhook_name, "webhook_url": webhook_url},
)
assert response.status_code == 200, f"Failed to add webhook: {response.text}"
response = client.post(url="/add", data={"feed_url": feed_url, "webhook_dropdown": webhook_name})
assert response.status_code == 200, f"Failed to add feed: {response.text}"
# Set feed-specific update interval to 15 minutes
response = client.post("/set_update_interval", data={"feed_url": feed_url, "interval_minutes": "15"})
assert response.status_code == 200, f"Failed to set feed interval: {response.text}"
# Check that the feed page shows the custom interval
encoded_url = urllib.parse.quote(feed_url)
response = client.get(f"/feed?feed_url={encoded_url}")
assert response.status_code == 200, f"Failed to get feed page: {response.text}"
assert "15" in response.text, "Feed interval not displayed on feed page"
assert "Custom" in response.text, "Custom badge not shown for feed-specific interval"
def test_reset_feed_update_interval() -> None:
"""Test resetting feed update interval to global default."""
# First set a custom interval
response: Response = client.post("/set_update_interval", data={"feed_url": feed_url, "interval_minutes": "15"})
assert response.status_code == 200, f"Failed to set feed interval: {response.text}"
# Reset to global default
response = client.post("/reset_update_interval", data={"feed_url": feed_url})
assert response.status_code == 200, f"Failed to reset feed interval: {response.text}"
# Check that the feed page shows global default
encoded_url = urllib.parse.quote(feed_url)
response = client.get(f"/feed?feed_url={encoded_url}")
assert response.status_code == 200, f"Failed to get feed page: {response.text}"
assert "Using global default" in response.text, "Global default badge not shown after reset"
def test_update_interval_validation() -> None:
"""Test that update interval validation works."""
# Try to set an interval below minimum (should be clamped to 1)
response: Response = client.post("/set_global_update_interval", data={"interval_minutes": "0"})
assert response.status_code == 200, f"Failed to handle minimum interval: {response.text}"
# Try to set an interval above maximum (should be clamped to 10080)
response = client.post("/set_global_update_interval", data={"interval_minutes": "20000"})
assert response.status_code == 200, f"Failed to handle maximum interval: {response.text}"
# Clean up
client.post(url="/remove", data={"feed_url": feed_url})
client.post(url="/delete_webhook", data={"webhook_url": webhook_url})

View file

@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
from discord_rss_bot.filter.utils import is_regex_match, is_word_in_text from discord_rss_bot.filter.utils import is_regex_match
from discord_rss_bot.filter.utils import is_word_in_text
def test_is_word_in_text() -> None: def test_is_word_in_text() -> None:

View file

@ -4,9 +4,13 @@ import tempfile
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from reader import Entry, Feed, Reader, make_reader from reader import Entry
from reader import Feed
from reader import Reader
from reader import make_reader
from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent from discord_rss_bot.filter.whitelist import has_white_tags
from discord_rss_bot.filter.whitelist import should_be_sent
if TYPE_CHECKING: if TYPE_CHECKING:
from collections.abc import Iterable from collections.abc import Iterable