diff --git a/.env.example b/.env.example
deleted file mode 100644
index 2a098da..0000000
--- a/.env.example
+++ /dev/null
@@ -1,19 +0,0 @@
-# You can optionally store backups of your bot's configuration in a git repository.
-# This allows you to track changes by subscribing to the repository or using a RSS feed.
-# Local path for the backup git repository (e.g., /data/backup or /home/user/backups/discord-rss-bot)
-# When set, the bot will initialize a git repo here and commit state.json after every configuration change
-# GIT_BACKUP_PATH=
-
-# Remote URL for pushing backup commits (e.g., git@github.com:username/private-config.git)
-# Optional - only set if you want automatic pushes to a remote repository
-# Leave empty to keep git history local only
-# GIT_BACKUP_REMOTE=
-
-# Sentry Configuration (Optional)
-# Sentry DSN for error tracking and monitoring
-# Leave empty to disable Sentry integration
-# SENTRY_DSN=
-
-# Testing Configuration
-# Discord webhook URL used for testing (optional, only needed when running tests)
-# TEST_WEBHOOK_URL=
diff --git a/.forgejo/workflows/build.yml b/.forgejo/workflows/build.yml
deleted file mode 100644
index c2d854d..0000000
--- a/.forgejo/workflows/build.yml
+++ /dev/null
@@ -1,100 +0,0 @@
----
-# Required setup for self-hosted runner:
-# 1. Install dependencies:
-# sudo pacman -S qemu-user-static qemu-user-static-binfmt docker docker-buildx
-# 2. Add runner to docker group:
-# sudo usermod -aG docker forgejo-runner
-# 3. Restart runner service to apply group membership:
-# sudo systemctl restart forgejo-runner
-# 4. Install uv and ruff for the runner user
-# 5. Login to GitHub Container Registry:
-# echo "ghp_YOUR_TOKEN_HERE" | sudo -u forgejo-runner docker login ghcr.io -u TheLovinator1 --password-stdin
-# 6. Configure sudoers for deployment (sudo EDITOR=nvim visudo):
-# forgejo-runner ALL=(discord-rss) NOPASSWD: /usr/bin/git -C /home/discord-rss/discord-rss-bot pull
-# forgejo-runner ALL=(discord-rss) NOPASSWD: /usr/bin/uv sync -U --directory /home/discord-rss/discord-rss-bot
-# forgejo-runner ALL=(root) NOPASSWD: /bin/systemctl restart discord-rss-bot
-
-name: Test and build Docker image
-on:
- push:
- branches:
- - master
- pull_request:
- workflow_dispatch:
- schedule:
- - cron: "0 0 1 * *"
-
-jobs:
- docker:
- runs-on: self-hosted
- steps:
- # Download the latest commit from the master branch
- - uses: actions/checkout@v6
-
- # Verify local tools are available on the self-hosted runner
- - name: Check local toolchain
- run: |
- python --version
- uv --version
- ruff --version
- docker version
-
- # Bootstrap a local Buildx builder for multi-arch builds
- # (requires qemu-user-static and qemu-user-static-binfmt installed via pacman)
- - name: Configure local buildx for multi-arch
- run: |
- docker buildx inspect local-multiarch-builder >/dev/null 2>&1 || \
- docker buildx create --name local-multiarch-builder --driver docker-container
- docker buildx use local-multiarch-builder
- docker buildx inspect --bootstrap
-
- - name: Lint Python code
- run: ruff check --exit-non-zero-on-fix --verbose
-
- - name: Check Python formatting
- run: ruff format --check --verbose
-
- - name: Lint Dockerfile
- run: docker build --check .
-
- - name: Install dependencies
- run: uv sync --all-extras --all-groups
-
- - name: Run tests
- run: uv run pytest
-
- - id: tags
- name: Compute image tags
- run: |
- IMAGE="ghcr.io/thelovinator1/discord-rss-bot"
- if [ "${FORGEJO_REF}" = "refs/heads/master" ]; then
- echo "tags=${IMAGE}:latest,${IMAGE}:master" >> "$FORGEJO_OUTPUT"
- else
- SHORT_SHA="$(echo "$FORGEJO_SHA" | cut -c1-12)"
- echo "tags=${IMAGE}:sha-${SHORT_SHA}" >> "$FORGEJO_OUTPUT"
- fi
-
- # Build (and optionally push) Docker image
- - name: Build and push Docker image
- env:
- TAGS: ${{ steps.tags.outputs.tags }}
- run: |
- IFS=',' read -r -a tag_array <<< "$TAGS"
- tag_args=()
- for tag in "${tag_array[@]}"; do
- tag_args+=( -t "$tag" )
- done
-
- if [ "${{ forge.event_name }}" = "pull_request" ]; then
- docker buildx build --platform linux/amd64,linux/arm64 "${tag_args[@]}" --load .
- else
- docker buildx build --platform linux/amd64,linux/arm64 "${tag_args[@]}" --push .
- fi
-
- # Deploy to production server
- - name: Deploy to Server
- if: success() && forge.ref == 'refs/heads/master'
- run: |
- sudo -u discord-rss git -C /home/discord-rss/discord-rss-bot pull
- sudo -u discord-rss uv sync -U --directory /home/discord-rss/discord-rss-bot
- sudo systemctl restart discord-rss-bot
diff --git a/.gitattributes b/.gitattributes
deleted file mode 100644
index ccb351b..0000000
--- a/.gitattributes
+++ /dev/null
@@ -1 +0,0 @@
-*.html linguist-language=jinja
diff --git a/.forgejo/renovate.json b/.github/renovate.json
similarity index 82%
rename from .forgejo/renovate.json
rename to .github/renovate.json
index 7884adb..734986c 100644
--- a/.forgejo/renovate.json
+++ b/.github/renovate.json
@@ -1,8 +1,6 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json",
- "extends": [
- "config:recommended"
- ],
+ "extends": ["config:recommended"],
"automerge": true,
"configMigration": true,
"dependencyDashboard": false,
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
new file mode 100644
index 0000000..7f0ea6d
--- /dev/null
+++ b/.github/workflows/build.yml
@@ -0,0 +1,64 @@
+---
+name: Test and build Docker image
+on:
+ push:
+ pull_request:
+ workflow_dispatch:
+ schedule:
+ - cron: "0 6 * * *"
+
+env:
+ TEST_WEBHOOK_URL: ${{ secrets.TEST_WEBHOOK_URL }}
+
+jobs:
+ test:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+ - uses: actions/setup-python@v5
+ with:
+ python-version: 3.12
+ - uses: astral-sh/setup-uv@v5
+ with:
+ version: "latest"
+ - run: uv sync --all-extras --all-groups
+ - run: uv run pytest
+ ruff:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+ - uses: astral-sh/ruff-action@v3
+ with:
+ version: "latest"
+ - run: ruff check --exit-non-zero-on-fix --verbose
+ - run: ruff format --check --verbose
+
+ build:
+ runs-on: ubuntu-latest
+ permissions:
+ contents: read
+ packages: write
+ if: github.event_name != 'pull_request'
+ concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}
+ cancel-in-progress: true
+ needs: [test, ruff]
+ steps:
+ - uses: actions/checkout@v4
+ - uses: docker/setup-qemu-action@v3
+ with:
+ platforms: all
+ - uses: docker/setup-buildx-action@v3
+ - uses: docker/login-action@v3
+ with:
+ registry: ghcr.io
+ username: ${{ github.repository_owner }}
+ password: ${{ secrets.GITHUB_TOKEN }}
+ - uses: docker/build-push-action@v6
+ with:
+ context: .
+ platforms: linux/amd64, linux/arm64
+ push: ${{ github.event_name != 'pull_request' }}
+ tags: |
+ ghcr.io/thelovinator1/discord-rss-bot:latest
+ ghcr.io/thelovinator1/discord-rss-bot:master
diff --git a/.gitignore b/.gitignore
index 6817461..1ac2c11 100644
--- a/.gitignore
+++ b/.gitignore
@@ -92,7 +92,7 @@ ipython_config.py
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
-# Pipfile.lock
+Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
@@ -105,12 +105,11 @@ uv.lock
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
-# poetry.lock
-# poetry.toml
+poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
-# pdm.lock
+#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
@@ -166,20 +165,7 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
-# .idea/
-
-# Abstra
-# Abstra is an AI-powered process automation framework.
-# Ignore directories containing user credentials, local state, and settings.
-# Learn more at https://abstra.io/docs
-.abstra/
-
-# Visual Studio Code
-# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
-# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
-# and can be added to the global gitignore or merged into this file. However, if you prefer,
-# you could uncomment the following to ignore the entire vscode folder
-# .vscode/
+#.idea/
# Ruff stuff:
.ruff_cache/
@@ -187,13 +173,6 @@ cython_debug/
# PyPI configuration file
.pypirc
-# Cursor
-# Cursor is an AI-powered code editor. `.cursorignore` specifies files/directories to
-# exclude from AI features like autocomplete and code analysis. Recommended for sensitive data
-# refer to https://docs.cursor.com/context/ignore-files
-.cursorignore
-.cursorindexingignore
-
# Database stuff
*.sqlite
*.sqlite-shm
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 16a9a4f..a3c42c0 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -1,13 +1,13 @@
repos:
# Automatically add trailing commas to calls and literals.
- repo: https://github.com/asottile/add-trailing-comma
- rev: v4.0.0
+ rev: v3.1.0
hooks:
- id: add-trailing-comma
# Some out-of-the-box hooks for pre-commit.
- repo: https://github.com/pre-commit/pre-commit-hooks
- rev: v6.0.0
+ rev: v5.0.0
hooks:
- id: check-added-large-files
- id: check-ast
@@ -31,14 +31,14 @@ repos:
# Run Pyupgrade on all Python files. This will upgrade the code to Python 3.12.
- repo: https://github.com/asottile/pyupgrade
- rev: v3.21.2
+ rev: v3.19.1
hooks:
- id: pyupgrade
args: ["--py312-plus"]
# An extremely fast Python linter and formatter.
- repo: https://github.com/astral-sh/ruff-pre-commit
- rev: v0.15.5
+ rev: v0.9.5
hooks:
- id: ruff-format
- id: ruff
@@ -46,6 +46,6 @@ repos:
# Static checker for GitHub Actions workflow files.
- repo: https://github.com/rhysd/actionlint
- rev: v1.7.11
+ rev: v1.7.7
hooks:
- id: actionlint
diff --git a/.vscode/launch.json b/.vscode/launch.json
index 266d7f2..781b0bd 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -8,11 +8,7 @@
"module": "uvicorn",
"args": [
"discord_rss_bot.main:app",
- "--reload",
- "--host",
- "0.0.0.0",
- "--port",
- "3000",
+ "--reload"
],
"jinja": true,
"justMyCode": true
diff --git a/.vscode/settings.json b/.vscode/settings.json
index 8bd0ea9..f929fff 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -1,19 +1,13 @@
{
"cSpell.words": [
- "autoexport",
"botuser",
"Genshins",
- "healthcheck",
- "Hoyolab",
"levelname",
"Lovinator",
"markdownified",
"markdownify",
"pipx",
- "pyproject",
- "thead",
- "thelovinator",
- "uvicorn"
+ "thead"
],
"python.analysis.typeCheckingMode": "basic"
}
diff --git a/Dockerfile b/Dockerfile
index f27eed9..72714a0 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,15 +1,14 @@
-FROM python:3.14-slim
+FROM python:3.13-slim
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
RUN useradd --create-home botuser && \
mkdir -p /home/botuser/discord-rss-bot/ /home/botuser/.local/share/discord_rss_bot/ && \
chown -R botuser:botuser /home/botuser/
USER botuser
WORKDIR /home/botuser/discord-rss-bot
+COPY --chown=botuser:botuser requirements.txt /home/botuser/discord-rss-bot/
RUN --mount=type=cache,target=/root/.cache/uv \
--mount=type=bind,source=pyproject.toml,target=pyproject.toml \
uv sync --no-install-project
-COPY --chown=botuser:botuser discord_rss_bot/ /home/botuser/discord-rss-bot/discord_rss_bot/
EXPOSE 5000
VOLUME ["/home/botuser/.local/share/discord_rss_bot/"]
-HEALTHCHECK --interval=10m --timeout=5s CMD ["uv", "run", "./discord_rss_bot/healthcheck.py"]
CMD ["uv", "run", "uvicorn", "discord_rss_bot.main:app", "--host=0.0.0.0", "--port=5000", "--proxy-headers", "--forwarded-allow-ips='*'", "--log-level", "debug"]
diff --git a/README.md b/README.md
index 09b6bbc..849fb98 100644
--- a/README.md
+++ b/README.md
@@ -2,25 +2,8 @@
Subscribe to RSS feeds and get updates to a Discord webhook.
-Email: [tlovinator@gmail.com](mailto:tlovinator@gmail.com)
-
-Discord: TheLovinator#9276
-
-## Features
-
-- Subscribe to RSS feeds and get updates to a Discord webhook.
-- Web interface to manage subscriptions.
-- Customizable message format for each feed.
-- Choose between Discord embed or plain text.
-- Regex filters for RSS feeds.
-- Blacklist/whitelist words in the title/description/author/etc.
-- Set different update frequencies for each feed or use a global default.
-- Gets extra information from APIs if available, currently for:
- - [https://feeds.c3kay.de/](https://feeds.c3kay.de/)
- - Genshin Impact News
- - Honkai Impact 3rd News
- - Honkai Starrail News
- - Zenless Zone Zero News
+> [!NOTE]
+> You should look at [MonitoRSS](https://github.com/synzen/monitorss) for a more feature-rich project.
## Installation
@@ -30,7 +13,9 @@ or [install directly on your computer](#install-directly-on-your-computer).
### Docker
- Open a terminal in the repository folder.
- - Shift + right-click in the folder and `Open PowerShell window here`
+ - Windows 10: Shift + right-click in the folder and select `Open PowerShell window here`
+ - Windows 11: Shift + right-click in the folder and Show more options
+ and `Open PowerShell window here`
- Run the Docker Compose file:
- `docker-compose up`
- You can stop the bot with Ctrl + c.
@@ -44,68 +29,34 @@ or [install directly on your computer](#install-directly-on-your-computer).
### Install directly on your computer
-- Install the latest of [uv](https://docs.astral.sh/uv/#installation):
- - `powershell -ExecutionPolicy ByPass -c "irm | iex"`
+This is not recommended if you don't have an init system (e.g., systemd)
+
+- Install the latest version of needed software:
+ - [Python](https://www.python.org/)
+ - You should use the latest version.
+ - You want to add Python to your PATH.
+ - Windows: Find `App execution aliases` and disable python.exe and python3.exe
+ - [Poetry](https://python-poetry.org/docs/master/#installation)
+ - Windows: You have to add `%appdata%\Python\Scripts` to your PATH for Poetry to work.
- Download the project from GitHub with Git or download
the [ZIP](https://github.com/TheLovinator1/discord-rss-bot/archive/refs/heads/master.zip).
- If you want to update the bot, you can run `git pull` in the project folder or download the ZIP again.
- Open a terminal in the repository folder.
- - Shift + right-click in the folder and `Open PowerShell window here`
+ - Windows 10: Shift + right-click in the folder and select `Open PowerShell window here`
+ - Windows 11: Shift + right-click in the folder and Show more options
+ and `Open PowerShell window here`
+- Install requirements:
+ - Type `poetry install` into the PowerShell window. Make sure you are
+ in the repository folder where the [pyproject.toml](pyproject.toml) file is located.
+ - (You may have to restart your terminal if it can't find the `poetry` command. Also double check it is in
+ your PATH.)
- Start the bot:
- - Type `uv run discord_rss_bot/main.py` into the PowerShell window.
+ - Type `poetry run python discord_rss_bot/main.py` into the PowerShell window.
- You can stop the bot with Ctrl + c.
-- Bot is now running on port 3000.
-- You should run this bot behind a reverse proxy like [Caddy](https://caddyserver.com/)
- or [Nginx](https://www.nginx.com/) if you want to access it from the internet. Remember to add authentication.
-- You can access the web interface at `http://localhost:3000/`.
-- To run automatically on boot:
- - Use [Windows Task Scheduler](https://en.wikipedia.org/wiki/Windows_Task_Scheduler).
- - Or add a shortcut to `%userprofile%\AppData\Roaming\Microsoft\Windows\Start Menu\Programs\Startup`.
+Note: You will need to run `poetry install` again if [poetry.lock](poetry.lock) has been modified.
-## Git Backup (State Version Control)
+## Contact
-The bot can commit every configuration change (adding/removing feeds, webhook
-changes, blacklist/whitelist updates) to a separate private Git repository so
-you get a full, auditable history of state changes — similar to `etckeeper`.
-
-### Configuration
-
-Set the following environment variables (e.g. in `docker-compose.yml` or a
-`.env` file):
-
-| Variable | Required | Description |
-| ------------------- | -------- | ----------------------------------------------------------------------------------------------------------------------------------- |
-| `GIT_BACKUP_PATH` | Yes | Local path where the backup git repository is stored. The bot will initialise it automatically if it does not yet exist. |
-| `GIT_BACKUP_REMOTE` | No | Remote URL to push to after each commit (e.g. `git@github.com:you/private-config.git`). Leave unset to keep the history local only. |
-
-### What is backed up
-
-After every relevant change a `state.json` file is written and committed.
-The file contains:
-
-- All feed URLs together with their webhook URL, custom message, embed
- settings, and any blacklist/whitelist filters.
-- The global list of Discord webhooks.
-
-### Docker example
-
-```yaml
-services:
- discord-rss-bot:
- image: ghcr.io/thelovinator1/discord-rss-bot:latest
- volumes:
- - ./data:/data
- environment:
- - GIT_BACKUP_PATH=/data/backup
- - GIT_BACKUP_REMOTE=git@github.com:you/private-config.git
-```
-
-For SSH-based remotes mount your SSH key into the container and make sure the
-host key is trusted, e.g.:
-
-```yaml
- volumes:
- - ./data:/data
- - ~/.ssh:/root/.ssh:ro
-```
+Email: [mailto:tlovinator@gmail.com](tlovinator@gmail.com)
+Discord: TheLovinator#9276
diff --git a/discord_rss_bot/custom_filters.py b/discord_rss_bot/custom_filters.py
index fd9461c..99fe77d 100644
--- a/discord_rss_bot/custom_filters.py
+++ b/discord_rss_bot/custom_filters.py
@@ -4,14 +4,15 @@ import urllib.parse
from functools import lru_cache
from typing import TYPE_CHECKING
-from discord_rss_bot.filter.blacklist import entry_should_be_skipped
-from discord_rss_bot.filter.blacklist import feed_has_blacklist_tags
-from discord_rss_bot.filter.whitelist import has_white_tags
-from discord_rss_bot.filter.whitelist import should_be_sent
+from discord_rss_bot.filter.blacklist import entry_should_be_skipped, feed_has_blacklist_tags
+from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent
+from discord_rss_bot.settings import get_reader
if TYPE_CHECKING:
- from reader import Entry
- from reader import Reader
+ from reader import Entry, Reader
+
+# Our reader
+reader: Reader = get_reader()
@lru_cache
@@ -30,12 +31,11 @@ def encode_url(url_to_quote: str) -> str:
return urllib.parse.quote(string=url_to_quote) if url_to_quote else ""
-def entry_is_whitelisted(entry_to_check: Entry, reader: Reader) -> bool:
+def entry_is_whitelisted(entry_to_check: Entry) -> bool:
"""Check if the entry is whitelisted.
Args:
entry_to_check: The feed to check.
- reader: Custom Reader instance.
Returns:
bool: True if the feed is whitelisted, False otherwise.
@@ -44,12 +44,11 @@ def entry_is_whitelisted(entry_to_check: Entry, reader: Reader) -> bool:
return bool(has_white_tags(reader, entry_to_check.feed) and should_be_sent(reader, entry_to_check))
-def entry_is_blacklisted(entry_to_check: Entry, reader: Reader) -> bool:
+def entry_is_blacklisted(entry_to_check: Entry) -> bool:
"""Check if the entry is blacklisted.
Args:
entry_to_check: The feed to check.
- reader: Custom Reader instance.
Returns:
bool: True if the feed is blacklisted, False otherwise.
diff --git a/discord_rss_bot/custom_message.py b/discord_rss_bot/custom_message.py
index 1626e39..9cb03e5 100644
--- a/discord_rss_bot/custom_message.py
+++ b/discord_rss_bot/custom_message.py
@@ -1,27 +1,18 @@
from __future__ import annotations
-import html
import json
import logging
-import re
from dataclasses import dataclass
-from typing import TYPE_CHECKING
-from bs4 import BeautifulSoup
-from bs4 import Tag
+from bs4 import BeautifulSoup, Tag
from markdownify import markdownify
+from reader import Entry, Feed, Reader, TagNotFoundError
from discord_rss_bot.is_url_valid import is_url_valid
-
-if TYPE_CHECKING:
- from reader import Entry
- from reader import Feed
- from reader import Reader
+from discord_rss_bot.settings import get_reader
logger: logging.Logger = logging.getLogger(__name__)
-DISCORD_TIMESTAMP_TAG_RE: re.Pattern[str] = re.compile(r"")
-
@dataclass(slots=True)
class CustomEmbed:
@@ -55,80 +46,18 @@ def try_to_replace(custom_message: str, template: str, replace_with: str) -> str
return custom_message
-def _preserve_discord_timestamp_tags(text: str) -> tuple[str, dict[str, str]]:
- """Replace Discord timestamp tags with placeholders before markdown conversion.
-
- Args:
- text: The text to replace tags in.
-
- Returns:
- The text with Discord timestamp tags replaced by placeholders and a mapping of placeholders to original tags.
- """
- replacements: dict[str, str] = {}
-
- def replace_match(match: re.Match[str]) -> str:
- placeholder: str = f"DISCORDTIMESTAMPPLACEHOLDER{len(replacements)}"
- replacements[placeholder] = match.group(0)
- return placeholder
-
- return DISCORD_TIMESTAMP_TAG_RE.sub(replace_match, text), replacements
-
-
-def _restore_discord_timestamp_tags(text: str, replacements: dict[str, str]) -> str:
- """Restore preserved Discord timestamp tags after markdown conversion.
-
- Args:
- text: The text to restore tags in.
- replacements: A mapping of placeholders to original Discord timestamp tags.
-
- Returns:
- The text with placeholders replaced by the original Discord timestamp tags.
- """
- for placeholder, original_value in replacements.items():
- text = text.replace(placeholder, original_value)
- return text
-
-
-def format_entry_html_for_discord(text: str) -> str:
- """Convert entry HTML to Discord-friendly markdown while preserving Discord timestamp tags.
-
- Args:
- text: The HTML text to format.
-
- Returns:
- The formatted text with Discord timestamp tags preserved.
- """
- if not text:
- return ""
-
- unescaped_text: str = html.unescape(text)
- protected_text, replacements = _preserve_discord_timestamp_tags(unescaped_text)
- formatted_text: str = markdownify(
- html=protected_text,
- strip=["img", "table", "td", "tr", "tbody", "thead"],
- escape_misc=False,
- heading_style="ATX",
- )
-
- if "[https://" in formatted_text or "[https://www." in formatted_text:
- formatted_text = formatted_text.replace("[https://", "[")
- formatted_text = formatted_text.replace("[https://www.", "[")
-
- return _restore_discord_timestamp_tags(formatted_text, replacements)
-
-
-def replace_tags_in_text_message(entry: Entry, reader: Reader) -> str:
+def replace_tags_in_text_message(entry: Entry) -> str:
"""Replace tags in custom_message.
Args:
entry: The entry to get the tags from.
- reader: Custom Reader instance.
Returns:
Returns the custom_message with the tags replaced.
"""
feed: Feed = entry.feed
- custom_message: str = get_custom_message(feed=feed, reader=reader)
+ custom_reader: Reader = get_reader()
+ custom_message: str = get_custom_message(feed=feed, custom_reader=custom_reader)
content = ""
if entry.content:
@@ -139,8 +68,16 @@ def replace_tags_in_text_message(entry: Entry, reader: Reader) -> str:
first_image: str = get_first_image(summary, content)
- summary = format_entry_html_for_discord(summary)
- content = format_entry_html_for_discord(content)
+ summary = markdownify(html=summary, strip=["img", "table", "td", "tr", "tbody", "thead"], escape_misc=False)
+ content = markdownify(html=content, strip=["img", "table", "td", "tr", "tbody", "thead"], escape_misc=False)
+
+ if "[https://" in content or "[https://www." in content:
+ content = content.replace("[https://", "[")
+ content = content.replace("[https://www.", "[")
+
+ if "[https://" in summary or "[https://www." in summary:
+ summary = summary.replace("[https://", "[")
+ summary = summary.replace("[https://www.", "[")
feed_added: str = feed.added.strftime("%Y-%m-%d %H:%M:%S") if feed.added else "Never"
feed_last_exception: str = feed.last_exception.value_str if feed.last_exception else ""
@@ -215,7 +152,14 @@ def get_first_image(summary: str | None, content: str | None) -> str:
logger.warning("Invalid URL: %s", src)
continue
- return str(image.attrs["src"])
+ # Genshins first image is a divider, so we ignore it.
+ # https://hyl-static-res-prod.hoyolab.com/divider_config/PC/line_3.png
+ skip_images: list[str] = [
+ "https://img-os-static.hoyolab.com/divider_config/",
+ "https://hyl-static-res-prod.hoyolab.com/divider_config/",
+ ]
+ if not str(image.attrs["src"]).startswith(tuple(skip_images)):
+ return str(image.attrs["src"])
if summary and (images := BeautifulSoup(summary, features="lxml").find_all("img")):
for image in images:
if not isinstance(image, Tag) or "src" not in image.attrs:
@@ -226,22 +170,24 @@ def get_first_image(summary: str | None, content: str | None) -> str:
logger.warning("Invalid URL: %s", image.attrs["src"])
continue
- return str(image.attrs["src"])
+ # Genshins first image is a divider, so we ignore it.
+ if not str(image.attrs["src"]).startswith("https://img-os-static.hoyolab.com/divider_config"):
+ return str(image.attrs["src"])
return ""
-def replace_tags_in_embed(feed: Feed, entry: Entry, reader: Reader) -> CustomEmbed:
+def replace_tags_in_embed(feed: Feed, entry: Entry) -> CustomEmbed:
"""Replace tags in embed.
Args:
feed: The feed to get the tags from.
entry: The entry to get the tags from.
- reader: Custom Reader instance.
Returns:
Returns the embed with the tags replaced.
"""
- embed: CustomEmbed = get_embed(feed=feed, reader=reader)
+ custom_reader: Reader = get_reader()
+ embed: CustomEmbed = get_embed(feed=feed, custom_reader=custom_reader)
content = ""
if entry.content:
@@ -252,8 +198,16 @@ def replace_tags_in_embed(feed: Feed, entry: Entry, reader: Reader) -> CustomEmb
first_image: str = get_first_image(summary, content)
- summary = format_entry_html_for_discord(summary)
- content = format_entry_html_for_discord(content)
+ summary = markdownify(html=summary, strip=["img", "table", "td", "tr", "tbody", "thead"], escape_misc=False)
+ content = markdownify(html=content, strip=["img", "table", "td", "tr", "tbody", "thead"], escape_misc=False)
+
+ if "[https://" in content or "[https://www." in content:
+ content = content.replace("[https://", "[")
+ content = content.replace("[https://www.", "[")
+
+ if "[https://" in summary or "[https://www." in summary:
+ summary = summary.replace("[https://", "[")
+ summary = summary.replace("[https://www.", "[")
feed_added: str = feed.added.strftime("%Y-%m-%d %H:%M:%S") if feed.added else "Never"
feed_last_updated: str = feed.last_updated.strftime("%Y-%m-%d %H:%M:%S") if feed.last_updated else "Never"
@@ -332,29 +286,31 @@ def _replace_embed_tags(embed: CustomEmbed, template: str, replace_with: str) ->
embed.footer_icon_url = try_to_replace(embed.footer_icon_url, template, replace_with)
-def get_custom_message(reader: Reader, feed: Feed) -> str:
+def get_custom_message(custom_reader: Reader, feed: Feed) -> str:
"""Get custom_message tag from feed.
Args:
- reader: What Reader to use.
+ custom_reader: What Reader to use.
feed: The feed to get the tag from.
Returns:
Returns the contents from the custom_message tag.
"""
try:
- custom_message: str = str(reader.get_tag(feed, "custom_message", ""))
+ custom_message: str = str(custom_reader.get_tag(feed, "custom_message"))
+ except TagNotFoundError:
+ custom_message = ""
except ValueError:
custom_message = ""
return custom_message
-def save_embed(reader: Reader, feed: Feed, embed: CustomEmbed) -> None:
+def save_embed(custom_reader: Reader, feed: Feed, embed: CustomEmbed) -> None:
"""Set embed tag in feed.
Args:
- reader: What Reader to use.
+ custom_reader: What Reader to use.
feed: The feed to set the tag in.
embed: The embed to set.
"""
@@ -370,20 +326,20 @@ def save_embed(reader: Reader, feed: Feed, embed: CustomEmbed) -> None:
"footer_text": embed.footer_text,
"footer_icon_url": embed.footer_icon_url,
}
- reader.set_tag(feed, "embed", json.dumps(embed_dict)) # pyright: ignore[reportArgumentType]
+ custom_reader.set_tag(feed, "embed", json.dumps(embed_dict)) # pyright: ignore[reportArgumentType]
-def get_embed(reader: Reader, feed: Feed) -> CustomEmbed:
+def get_embed(custom_reader: Reader, feed: Feed) -> CustomEmbed:
"""Get embed tag from feed.
Args:
- reader: What Reader to use.
+ custom_reader: What Reader to use.
feed: The feed to get the tag from.
Returns:
Returns the contents from the embed tag.
"""
- embed = reader.get_tag(feed, "embed", "")
+ embed = custom_reader.get_tag(feed, "embed", "")
if embed:
if not isinstance(embed, str):
diff --git a/discord_rss_bot/feeds.py b/discord_rss_bot/feeds.py
index 225e7ff..ccb0a14 100644
--- a/discord_rss_bot/feeds.py
+++ b/discord_rss_bot/feeds.py
@@ -1,45 +1,25 @@
from __future__ import annotations
import datetime
-import json
import logging
-import os
import pprint
-import re
from typing import TYPE_CHECKING
-from typing import Any
-from urllib.parse import ParseResult
-from urllib.parse import urlparse
-import tldextract
-from discord_webhook import DiscordEmbed
-from discord_webhook import DiscordWebhook
+from discord_webhook import DiscordEmbed, DiscordWebhook
from fastapi import HTTPException
-from markdownify import markdownify
-from reader import Entry
-from reader import EntryNotFoundError
-from reader import Feed
-from reader import FeedExistsError
-from reader import FeedNotFoundError
-from reader import Reader
-from reader import ReaderError
-from reader import StorageError
+from reader import Entry, EntryNotFoundError, Feed, FeedExistsError, Reader, ReaderError, StorageError, TagNotFoundError
-from discord_rss_bot.custom_message import CustomEmbed
-from discord_rss_bot.custom_message import get_custom_message
-from discord_rss_bot.custom_message import replace_tags_in_embed
-from discord_rss_bot.custom_message import replace_tags_in_text_message
+from discord_rss_bot.custom_message import (
+ CustomEmbed,
+ get_custom_message,
+ replace_tags_in_embed,
+ replace_tags_in_text_message,
+)
from discord_rss_bot.filter.blacklist import entry_should_be_skipped
-from discord_rss_bot.filter.whitelist import has_white_tags
-from discord_rss_bot.filter.whitelist import should_be_sent
-from discord_rss_bot.hoyolab_api import create_hoyolab_webhook
-from discord_rss_bot.hoyolab_api import extract_post_id_from_hoyolab_url
-from discord_rss_bot.hoyolab_api import fetch_hoyolab_post
-from discord_rss_bot.hoyolab_api import is_c3kay_feed
+from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent
from discord_rss_bot.is_url_valid import is_url_valid
-from discord_rss_bot.settings import default_custom_embed
-from discord_rss_bot.settings import default_custom_message
-from discord_rss_bot.settings import get_reader
+from discord_rss_bot.missing_tags import add_missing_tags
+from discord_rss_bot.settings import default_custom_message, get_reader
if TYPE_CHECKING:
from collections.abc import Iterable
@@ -49,159 +29,53 @@ if TYPE_CHECKING:
logger: logging.Logger = logging.getLogger(__name__)
-def extract_domain(url: str) -> str: # noqa: PLR0911
- """Extract the domain name from a URL.
-
- Args:
- url: The URL to extract the domain from.
-
- Returns:
- str: The domain name, formatted for display.
- """
- # Check for empty URL first
- if not url:
- return "Other"
-
- try:
- # Special handling for YouTube feeds
- if "youtube.com/feeds/videos.xml" in url:
- return "YouTube"
-
- # Special handling for Reddit feeds
- if "reddit.com" in url and ".rss" in url:
- return "Reddit"
-
- # Parse the URL and extract the domain
- parsed_url: ParseResult = urlparse(url)
- domain: str = parsed_url.netloc
-
- # If we couldn't extract a domain, return "Other"
- if not domain:
- return "Other"
-
- # Remove www. prefix if present
- domain = re.sub(r"^www\.", "", domain)
-
- # Special handling for common domains
- domain_mapping: dict[str, str] = {"github.com": "GitHub"}
-
- if domain in domain_mapping:
- return domain_mapping[domain]
-
- # Use tldextract to get the domain (SLD)
- ext = tldextract.extract(url)
- if ext.domain:
- return ext.domain.capitalize()
- return domain.capitalize()
- except (ValueError, AttributeError, TypeError) as e:
- logger.warning("Error extracting domain from %s: %s", url, e)
- return "Other"
-
-
-def send_entry_to_discord(entry: Entry, reader: Reader) -> str | None: # noqa: C901
+def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> str | None:
"""Send a single entry to Discord.
Args:
entry: The entry to send to Discord.
- reader: The reader to use.
+ custom_reader: The reader to use. If None, the default reader will be used.
Returns:
str | None: The error message if there was an error, otherwise None.
"""
+ # Get the default reader if we didn't get a custom one.
+ reader: Reader = get_reader() if custom_reader is None else custom_reader
+
# Get the webhook URL for the entry.
webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook", ""))
if not webhook_url:
return "No webhook URL found."
- # If https://discord.com/quests/ is in the URL, send a separate message with the URL.
- send_discord_quest_notification(entry, webhook_url, reader=reader)
-
- # Check if this is a c3kay feed
- if is_c3kay_feed(entry.feed.url):
- entry_link: str | None = entry.link
- if entry_link:
- post_id: str | None = extract_post_id_from_hoyolab_url(entry_link)
- if post_id:
- post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id)
- if post_data:
- webhook = create_hoyolab_webhook(webhook_url, entry, post_data)
- execute_webhook(webhook, entry, reader=reader)
- return None
- logger.warning(
- "Failed to create Hoyolab webhook for feed %s, falling back to regular processing",
- entry.feed.url,
- )
- else:
- logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url)
-
webhook_message: str = ""
# Try to get the custom message for the feed. If the user has none, we will use the default message.
# This has to be a string for some reason so don't change it to "not custom_message.get_custom_message()"
if get_custom_message(reader, entry.feed) != "": # noqa: PLC1901
- webhook_message: str = replace_tags_in_text_message(entry=entry, reader=reader)
+ webhook_message: str = replace_tags_in_text_message(entry=entry)
if not webhook_message:
webhook_message = "No message found."
# Create the webhook.
try:
- should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed", True))
+ should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed"))
+ except TagNotFoundError:
+ logger.exception("No should_send_embed tag found for feed: %s", entry.feed.url)
+ should_send_embed = True
except StorageError:
logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url)
should_send_embed = True
- # YouTube feeds should never use embeds
- if is_youtube_feed(entry.feed.url):
- should_send_embed = False
-
if should_send_embed:
- webhook = create_embed_webhook(webhook_url, entry, reader=reader)
+ webhook = create_embed_webhook(webhook_url, entry)
else:
webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True)
- execute_webhook(webhook, entry, reader=reader)
+ execute_webhook(webhook, entry)
return None
-def send_discord_quest_notification(entry: Entry, webhook_url: str, reader: Reader) -> None:
- """Send a separate message to Discord if the entry is a quest notification."""
- quest_regex: re.Pattern[str] = re.compile(r"https://discord\.com/quests/\d+")
-
- def send_notification(quest_url: str) -> None:
- """Helper function to send quest notification to Discord."""
- logger.info("Sending quest notification to Discord: %s", quest_url)
- webhook = DiscordWebhook(
- url=webhook_url,
- content=quest_url,
- rate_limit_retry=True,
- )
- execute_webhook(webhook, entry, reader=reader)
-
- # Iterate through the content of the entry
- for content in entry.content:
- if content.type == "text" and content.value:
- match = quest_regex.search(content.value)
- if match:
- send_notification(match.group(0))
- return
-
- elif content.type == "text/html" and content.value:
- # Convert HTML to text and check for quest links
- text_value = markdownify(
- html=content.value,
- strip=["img", "table", "td", "tr", "tbody", "thead"],
- escape_misc=False,
- heading_style="ATX",
- )
- match: re.Match[str] | None = quest_regex.search(text_value)
- if match:
- send_notification(match.group(0))
- return
-
- logger.info("No quest notification found in entry: %s", entry.id)
-
-
def set_description(custom_embed: CustomEmbed, discord_embed: DiscordEmbed) -> None:
"""Set the description of the embed.
@@ -234,17 +108,12 @@ def set_title(custom_embed: CustomEmbed, discord_embed: DiscordEmbed) -> None:
discord_embed.set_title(embed_title) if embed_title else None
-def create_embed_webhook( # noqa: C901
- webhook_url: str,
- entry: Entry,
- reader: Reader,
-) -> DiscordWebhook:
+def create_embed_webhook(webhook_url: str, entry: Entry) -> DiscordWebhook:
"""Create a webhook with an embed.
Args:
webhook_url (str): The webhook URL.
entry (Entry): The entry to send to Discord.
- reader (Reader): The Reader instance to use for getting embed data.
Returns:
DiscordWebhook: The webhook with the embed.
@@ -253,7 +122,7 @@ def create_embed_webhook( # noqa: C901
feed: Feed = entry.feed
# Get the embed data from the database.
- custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry, reader=reader)
+ custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry)
discord_embed: DiscordEmbed = DiscordEmbed()
@@ -315,14 +184,13 @@ def get_webhook_url(reader: Reader, entry: Entry) -> str:
str: The webhook URL.
"""
try:
- webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook", ""))
+ webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook"))
+ except TagNotFoundError:
+ logger.exception("No webhook URL found for feed: %s", entry.feed.url)
+ return ""
except StorageError:
logger.exception("Storage error getting webhook URL for feed: %s", entry.feed.url)
return ""
-
- if not webhook_url:
- logger.error("No webhook URL found for feed: %s", entry.feed.url)
- return ""
return webhook_url
@@ -341,53 +209,44 @@ def set_entry_as_read(reader: Reader, entry: Entry) -> None:
logger.exception("Error setting entry to read: %s", entry.id)
-def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: # noqa: C901, PLR0912
+def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None:
"""Send entries to Discord.
If response was not ok, we will log the error and mark the entry as unread, so it will be sent again next time.
Args:
- reader: If we should use a custom reader instead of the default one.
+ custom_reader: If we should use a custom reader instead of the default one.
feed: The feed to send to Discord.
do_once: If we should only send one entry. This is used in the test.
"""
- logger.info("Starting to send entries to Discord.")
# Get the default reader if we didn't get a custom one.
- effective_reader: Reader = get_reader() if reader is None else reader
+ reader: Reader = get_reader() if custom_reader is None else custom_reader
# Check for new entries for every feed.
- effective_reader.update_feeds(
- scheduled=True,
- workers=os.cpu_count() or 1,
- )
+ reader.update_feeds()
# Loop through the unread entries.
- entries: Iterable[Entry] = effective_reader.get_entries(feed=feed, read=False)
+ entries: Iterable[Entry] = reader.get_entries(feed=feed, read=False)
for entry in entries:
- set_entry_as_read(effective_reader, entry)
+ set_entry_as_read(reader, entry)
if entry.added < datetime.datetime.now(tz=entry.added.tzinfo) - datetime.timedelta(days=1):
logger.info("Entry is older than 24 hours: %s from %s", entry.id, entry.feed.url)
continue
- webhook_url: str = get_webhook_url(effective_reader, entry)
+ webhook_url: str = get_webhook_url(reader, entry)
if not webhook_url:
logger.info("No webhook URL found for feed: %s", entry.feed.url)
continue
- should_send_embed: bool = should_send_embed_check(effective_reader, entry)
-
- # Youtube feeds only need to send the link
- if is_youtube_feed(entry.feed.url):
- should_send_embed = False
-
+ should_send_embed: bool = should_send_embed_check(reader, entry)
if should_send_embed:
- webhook = create_embed_webhook(webhook_url, entry, reader=effective_reader)
+ webhook = create_embed_webhook(webhook_url, entry)
else:
# If the user has set the custom message to an empty string, we will use the default message, otherwise we
# will use the custom message.
- if get_custom_message(effective_reader, entry.feed) != "": # noqa: PLC1901
- webhook_message = replace_tags_in_text_message(entry, reader=effective_reader)
+ if get_custom_message(reader, entry.feed) != "": # noqa: PLC1901
+ webhook_message = replace_tags_in_text_message(entry)
else:
webhook_message: str = str(default_custom_message)
@@ -397,35 +256,19 @@ def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, d
webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True)
# Check if the entry is blacklisted, and if it is, we will skip it.
- if entry_should_be_skipped(effective_reader, entry):
+ if entry_should_be_skipped(reader, entry):
logger.info("Entry was blacklisted: %s", entry.id)
continue
# Check if the feed has a whitelist, and if it does, check if the entry is whitelisted.
- if has_white_tags(effective_reader, entry.feed) and not should_be_sent(effective_reader, entry):
- logger.info("Entry was not whitelisted: %s", entry.id)
+ if has_white_tags(reader, entry.feed):
+ if should_be_sent(reader, entry):
+ execute_webhook(webhook, entry)
+ return
continue
- # Use a custom webhook for Hoyolab feeds.
- if is_c3kay_feed(entry.feed.url):
- entry_link: str | None = entry.link
- if entry_link:
- post_id: str | None = extract_post_id_from_hoyolab_url(entry_link)
- if post_id:
- post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id)
- if post_data:
- webhook = create_hoyolab_webhook(webhook_url, entry, post_data)
- execute_webhook(webhook, entry, reader=effective_reader)
- return
- logger.warning(
- "Failed to create Hoyolab webhook for feed %s, falling back to regular processing",
- entry.feed.url,
- )
- else:
- logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url)
-
# Send the entry to Discord as it is not blacklisted or feed has a whitelist.
- execute_webhook(webhook, entry, reader=effective_reader)
+ execute_webhook(webhook, entry)
# If we only want to send one entry, we will break the loop. This is used when testing this function.
if do_once:
@@ -433,27 +276,14 @@ def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, d
break
-def execute_webhook(webhook: DiscordWebhook, entry: Entry, reader: Reader) -> None:
+def execute_webhook(webhook: DiscordWebhook, entry: Entry) -> None:
"""Execute the webhook.
Args:
webhook (DiscordWebhook): The webhook to execute.
entry (Entry): The entry to send to Discord.
- reader (Reader): The Reader instance to use for checking feed status.
"""
- # If the feed has been paused or deleted, we will not send the entry to Discord.
- entry_feed: Feed = entry.feed
- if entry_feed.updates_enabled is False:
- logger.warning("Feed is paused, not sending entry to Discord: %s", entry_feed.url)
- return
-
- try:
- reader.get_feed(entry_feed.url)
- except FeedNotFoundError:
- logger.warning("Feed not found in reader, not sending entry to Discord: %s", entry_feed.url)
- return
-
response: Response = webhook.execute()
if response.status_code not in {200, 204}:
msg: str = f"Error sending entry to Discord: {response.text}\n{pprint.pformat(webhook.json)}"
@@ -465,18 +295,6 @@ def execute_webhook(webhook: DiscordWebhook, entry: Entry, reader: Reader) -> No
logger.info("Sent entry to Discord: %s", entry.id)
-def is_youtube_feed(feed_url: str) -> bool:
- """Check if the feed is a YouTube feed.
-
- Args:
- feed_url: The feed URL to check.
-
- Returns:
- bool: True if the feed is a YouTube feed, False otherwise.
- """
- return "youtube.com/feeds/videos.xml" in feed_url
-
-
def should_send_embed_check(reader: Reader, entry: Entry) -> bool:
"""Check if we should send an embed to Discord.
@@ -487,12 +305,11 @@ def should_send_embed_check(reader: Reader, entry: Entry) -> bool:
Returns:
bool: True if we should send an embed, False otherwise.
"""
- # YouTube feeds should never use embeds - only links
- if is_youtube_feed(entry.feed.url):
- return False
-
try:
- should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed", True))
+ should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed"))
+ except TagNotFoundError:
+ logger.exception("No should_send_embed tag found for feed: %s", entry.feed.url)
+ should_send_embed = True
except ReaderError:
logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url)
should_send_embed = True
@@ -516,7 +333,7 @@ def truncate_webhook_message(webhook_message: str) -> str:
return webhook_message
-def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None: # noqa: C901
+def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None:
"""Add a new feed, update it and mark every entry as read.
Args:
@@ -547,7 +364,9 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None:
reader.add_feed(clean_feed_url)
except FeedExistsError:
# Add the webhook to an already added feed if it doesn't have a webhook instead of trying to create a new.
- if not reader.get_tag(clean_feed_url, "webhook", ""):
+ try:
+ reader.get_tag(clean_feed_url, "webhook")
+ except TagNotFoundError:
reader.set_tag(clean_feed_url, "webhook", webhook_url) # pyright: ignore[reportArgumentType]
except ReaderError as e:
raise HTTPException(status_code=404, detail=f"Error adding feed: {e}") from e
@@ -572,8 +391,7 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None:
# This is the default message that will be sent to Discord.
reader.set_tag(clean_feed_url, "custom_message", default_custom_message) # pyright: ignore[reportArgumentType]
- # Set the default embed tag when creating the feed
- reader.set_tag(clean_feed_url, "embed", json.dumps(default_custom_embed))
-
# Update the full-text search index so our new feed is searchable.
reader.update_search()
+
+ add_missing_tags(reader)
diff --git a/discord_rss_bot/filter/blacklist.py b/discord_rss_bot/filter/blacklist.py
index 8260993..808d7c9 100644
--- a/discord_rss_bot/filter/blacklist.py
+++ b/discord_rss_bot/filter/blacklist.py
@@ -2,119 +2,59 @@ from __future__ import annotations
from typing import TYPE_CHECKING
-from discord_rss_bot.filter.utils import is_regex_match
from discord_rss_bot.filter.utils import is_word_in_text
if TYPE_CHECKING:
- from reader import Entry
- from reader import Feed
- from reader import Reader
+ from reader import Entry, Feed, Reader
-def feed_has_blacklist_tags(reader: Reader, feed: Feed) -> bool:
+def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool:
"""Return True if the feed has blacklist tags.
The following tags are checked:
- - blacklist_author
- - blacklist_content
- - blacklist_summary
- blacklist_title
- - regex_blacklist_author
- - regex_blacklist_content
- - regex_blacklist_summary
- - regex_blacklist_title
+ - blacklist_summary
+ - blacklist_content.
Args:
- reader: The reader.
+ custom_reader: The reader.
feed: The feed to check.
Returns:
bool: If the feed has any of the tags.
"""
- blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", "")).strip()
- blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", "")).strip()
- blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", "")).strip()
- blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", "")).strip()
+ blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", ""))
+ blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", ""))
+ blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", ""))
- regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", "")).strip()
- regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", "")).strip()
- regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", "")).strip()
- regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", "")).strip()
-
- return bool(
- blacklist_title
- or blacklist_author
- or blacklist_content
- or blacklist_summary
- or regex_blacklist_author
- or regex_blacklist_content
- or regex_blacklist_summary
- or regex_blacklist_title,
- )
+ return bool(blacklist_title or blacklist_summary or blacklist_content)
-def entry_should_be_skipped(reader: Reader, entry: Entry) -> bool: # noqa: PLR0911
+def entry_should_be_skipped(custom_reader: Reader, entry: Entry) -> bool:
"""Return True if the entry is in the blacklist.
Args:
- reader: The reader.
+ custom_reader: The reader.
entry: The entry to check.
Returns:
bool: If the entry is in the blacklist.
"""
- feed = entry.feed
-
- blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", "")).strip()
- blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", "")).strip()
- blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", "")).strip()
- blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", "")).strip()
-
- regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", "")).strip()
- regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", "")).strip()
- regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", "")).strip()
- regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", "")).strip()
+ blacklist_title: str = str(custom_reader.get_tag(entry.feed, "blacklist_title", ""))
+ blacklist_summary: str = str(custom_reader.get_tag(entry.feed, "blacklist_summary", ""))
+ blacklist_content: str = str(custom_reader.get_tag(entry.feed, "blacklist_content", ""))
+ blacklist_author: str = str(custom_reader.get_tag(entry.feed, "blacklist_author", ""))
# TODO(TheLovinator): Also add support for entry_text and more.
- # Check regular blacklist
if entry.title and blacklist_title and is_word_in_text(blacklist_title, entry.title):
return True
if entry.summary and blacklist_summary and is_word_in_text(blacklist_summary, entry.summary):
return True
- if (
- entry.content
- and entry.content[0].value
- and blacklist_content
- and is_word_in_text(blacklist_content, entry.content[0].value)
- ):
- return True
if entry.author and blacklist_author and is_word_in_text(blacklist_author, entry.author):
return True
- if (
- entry.content
- and entry.content[0].value
- and blacklist_content
- and is_word_in_text(blacklist_content, entry.content[0].value)
- ):
- return True
-
- # Check regex blacklist
- if entry.title and regex_blacklist_title and is_regex_match(regex_blacklist_title, entry.title):
- return True
- if entry.summary and regex_blacklist_summary and is_regex_match(regex_blacklist_summary, entry.summary):
- return True
- if (
- entry.content
- and entry.content[0].value
- and regex_blacklist_content
- and is_regex_match(regex_blacklist_content, entry.content[0].value)
- ):
- return True
- if entry.author and regex_blacklist_author and is_regex_match(regex_blacklist_author, entry.author):
- return True
return bool(
entry.content
and entry.content[0].value
- and regex_blacklist_content
- and is_regex_match(regex_blacklist_content, entry.content[0].value),
+ and blacklist_content
+ and is_word_in_text(blacklist_content, entry.content[0].value),
)
diff --git a/discord_rss_bot/filter/utils.py b/discord_rss_bot/filter/utils.py
index ff93e59..090518d 100644
--- a/discord_rss_bot/filter/utils.py
+++ b/discord_rss_bot/filter/utils.py
@@ -1,10 +1,7 @@
from __future__ import annotations
-import logging
import re
-logger: logging.Logger = logging.getLogger(__name__)
-
def is_word_in_text(word_string: str, text: str) -> bool:
"""Check if any of the words are in the text.
@@ -23,50 +20,3 @@ def is_word_in_text(word_string: str, text: str) -> bool:
# Check if any pattern matches the text.
return any(pattern.search(text) for pattern in patterns)
-
-
-def is_regex_match(regex_string: str, text: str) -> bool:
- """Check if any of the regex patterns match the text.
-
- Args:
- regex_string: A string containing regex patterns, separated by newlines or commas.
- text: The text to search in.
-
- Returns:
- bool: True if any regex pattern matches the text, otherwise False.
- """
- if not regex_string or not text:
- return False
-
- # Split by newlines first, then by commas (for backward compatibility)
- regex_list: list[str] = []
-
- # First split by newlines
- lines: list[str] = regex_string.split("\n")
- for line in lines:
- stripped_line: str = line.strip()
- if stripped_line:
- # For backward compatibility, also split by commas if there are any
- if "," in stripped_line:
- regex_list.extend([part.strip() for part in stripped_line.split(",") if part.strip()])
- else:
- regex_list.append(stripped_line)
-
- # Attempt to compile and apply each regex pattern
- for pattern_str in regex_list:
- if not pattern_str:
- logger.warning("Empty regex pattern found in the list.")
- continue
-
- try:
- pattern: re.Pattern[str] = re.compile(pattern_str, re.IGNORECASE)
- if pattern.search(text):
- logger.info("Regex pattern matched: %s", pattern_str)
- return True
- except re.error:
- logger.warning("Invalid regex pattern: %s", pattern_str)
- continue
-
- logger.info("No regex patterns matched.")
-
- return False
diff --git a/discord_rss_bot/filter/whitelist.py b/discord_rss_bot/filter/whitelist.py
index bb5303d..a55a514 100644
--- a/discord_rss_bot/filter/whitelist.py
+++ b/discord_rss_bot/filter/whitelist.py
@@ -2,105 +2,59 @@ from __future__ import annotations
from typing import TYPE_CHECKING
-from discord_rss_bot.filter.utils import is_regex_match
from discord_rss_bot.filter.utils import is_word_in_text
if TYPE_CHECKING:
- from reader import Entry
- from reader import Feed
- from reader import Reader
+ from reader import Entry, Feed, Reader
-def has_white_tags(reader: Reader, feed: Feed) -> bool:
+def has_white_tags(custom_reader: Reader, feed: Feed) -> bool:
"""Return True if the feed has whitelist tags.
The following tags are checked:
- - regex_whitelist_author
- - regex_whitelist_content
- - regex_whitelist_summary
- - regex_whitelist_title
- - whitelist_author
- - whitelist_content
- - whitelist_summary
- whitelist_title
+ - whitelist_summary
+ - whitelist_content.
Args:
- reader: The reader.
+ custom_reader: The reader.
feed: The feed to check.
Returns:
bool: If the feed has any of the tags.
"""
- whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", "")).strip()
- whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", "")).strip()
- whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", "")).strip()
- whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", "")).strip()
+ whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", ""))
+ whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", ""))
+ whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", ""))
- regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", "")).strip()
- regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", "")).strip()
- regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", "")).strip()
- regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", "")).strip()
-
- return bool(
- whitelist_title
- or whitelist_author
- or whitelist_content
- or whitelist_summary
- or regex_whitelist_author
- or regex_whitelist_content
- or regex_whitelist_summary
- or regex_whitelist_title,
- )
+ return bool(whitelist_title or whitelist_summary or whitelist_content)
-def should_be_sent(reader: Reader, entry: Entry) -> bool: # noqa: PLR0911
+def should_be_sent(custom_reader: Reader, entry: Entry) -> bool:
"""Return True if the entry is in the whitelist.
Args:
- reader: The reader.
+ custom_reader: The reader.
entry: The entry to check.
Returns:
bool: If the entry is in the whitelist.
"""
feed: Feed = entry.feed
- # Regular whitelist tags
- whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", "")).strip()
- whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", "")).strip()
- whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", "")).strip()
- whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", "")).strip()
+ whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", ""))
+ whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", ""))
+ whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", ""))
+ whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", ""))
- # Regex whitelist tags
- regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", "")).strip()
- regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", "")).strip()
- regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", "")).strip()
- regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", "")).strip()
-
- # Check regular whitelist
if entry.title and whitelist_title and is_word_in_text(whitelist_title, entry.title):
return True
if entry.summary and whitelist_summary and is_word_in_text(whitelist_summary, entry.summary):
return True
if entry.author and whitelist_author and is_word_in_text(whitelist_author, entry.author):
return True
- if (
- entry.content
- and entry.content[0].value
- and whitelist_content
- and is_word_in_text(whitelist_content, entry.content[0].value)
- ):
- return True
-
- # Check regex whitelist
- if entry.title and regex_whitelist_title and is_regex_match(regex_whitelist_title, entry.title):
- return True
- if entry.summary and regex_whitelist_summary and is_regex_match(regex_whitelist_summary, entry.summary):
- return True
- if entry.author and regex_whitelist_author and is_regex_match(regex_whitelist_author, entry.author):
- return True
return bool(
entry.content
and entry.content[0].value
- and regex_whitelist_content
- and is_regex_match(regex_whitelist_content, entry.content[0].value),
+ and whitelist_content
+ and is_word_in_text(whitelist_content, entry.content[0].value),
)
diff --git a/discord_rss_bot/git_backup.py b/discord_rss_bot/git_backup.py
deleted file mode 100644
index 49528ec..0000000
--- a/discord_rss_bot/git_backup.py
+++ /dev/null
@@ -1,243 +0,0 @@
-"""Git backup module for committing bot state changes to a private repository.
-
-Configure the backup by setting these environment variables:
-- ``GIT_BACKUP_PATH``: Local filesystem path for the backup git repository.
- When set, the bot will initialise a git repo there (if one doesn't exist)
- and commit an export of its state after every relevant change.
-- ``GIT_BACKUP_REMOTE``: Optional remote URL (e.g. ``git@github.com:you/private-repo.git``).
- When set, every commit is followed by a ``git push`` to this remote.
-
-The exported state is written as ``state.json`` inside the backup repo. It
-contains the list of feeds together with their webhook URL, filter settings
-(blacklist / whitelist, regex variants), custom messages and embed settings.
-Global webhooks are also included.
-
-Example docker-compose snippet::
-
- environment:
- - GIT_BACKUP_PATH=/data/backup
- - GIT_BACKUP_REMOTE=git@github.com:you/private-config.git
-"""
-
-from __future__ import annotations
-
-import json
-import logging
-import os
-import shutil
-import subprocess # noqa: S404
-from pathlib import Path
-from typing import TYPE_CHECKING
-from typing import Any
-
-if TYPE_CHECKING:
- from reader import Reader
-
-logger: logging.Logger = logging.getLogger(__name__)
-GIT_EXECUTABLE: str = shutil.which("git") or "git"
-
-
-type TAG_VALUE = (
- dict[str, str | int | float | bool | dict[str, Any] | list[Any] | None]
- | list[str | int | float | bool | dict[str, Any] | list[Any] | None]
- | None
-)
-
-# Tags that are exported per-feed (empty values are omitted).
-_FEED_TAGS: tuple[str, ...] = (
- "webhook",
- "custom_message",
- "should_send_embed",
- "embed",
- "blacklist_title",
- "blacklist_summary",
- "blacklist_content",
- "blacklist_author",
- "regex_blacklist_title",
- "regex_blacklist_summary",
- "regex_blacklist_content",
- "regex_blacklist_author",
- "whitelist_title",
- "whitelist_summary",
- "whitelist_content",
- "whitelist_author",
- "regex_whitelist_title",
- "regex_whitelist_summary",
- "regex_whitelist_content",
- "regex_whitelist_author",
- ".reader.update",
-)
-
-
-def get_backup_path() -> Path | None:
- """Return the configured backup path, or *None* if not configured.
-
- Returns:
- Path to the backup repository, or None if ``GIT_BACKUP_PATH`` is unset.
- """
- raw: str = os.environ.get("GIT_BACKUP_PATH", "").strip()
- return Path(raw) if raw else None
-
-
-def get_backup_remote() -> str:
- """Return the configured remote URL, or an empty string if not set.
-
- Returns:
- The remote URL string from ``GIT_BACKUP_REMOTE``, or ``""`` if unset.
- """
- return os.environ.get("GIT_BACKUP_REMOTE", "").strip()
-
-
-def setup_backup_repo(backup_path: Path) -> bool:
- """Ensure the backup directory exists and contains a git repository.
-
- If the directory does not yet contain a ``.git`` folder a new repository is
- initialised. A basic git identity is configured locally so that commits
- succeed even in environments where a global ``~/.gitconfig`` is absent.
-
- Args:
- backup_path: Local path for the backup repository.
-
- Returns:
- ``True`` if the repository is ready, ``False`` on any error.
- """
- try:
- backup_path.mkdir(parents=True, exist_ok=True)
- git_dir: Path = backup_path / ".git"
- if not git_dir.exists():
- subprocess.run([GIT_EXECUTABLE, "init", str(backup_path)], check=True, capture_output=True) # noqa: S603
- logger.info("Initialised git backup repository at %s", backup_path)
-
- # Ensure a local identity exists so that `git commit` always works.
- for key, value in (("user.email", "discord-rss-bot@localhost"), ("user.name", "discord-rss-bot")):
- result: subprocess.CompletedProcess[bytes] = subprocess.run( # noqa: S603
- [GIT_EXECUTABLE, "-C", str(backup_path), "config", "--local", key],
- check=False,
- capture_output=True,
- )
- if result.returncode != 0:
- subprocess.run( # noqa: S603
- [GIT_EXECUTABLE, "-C", str(backup_path), "config", "--local", key, value],
- check=True,
- capture_output=True,
- )
-
- # Configure the remote if GIT_BACKUP_REMOTE is set.
- remote_url: str = get_backup_remote()
- if remote_url:
- # Check if remote "origin" already exists.
- check_remote: subprocess.CompletedProcess[bytes] = subprocess.run( # noqa: S603
- [GIT_EXECUTABLE, "-C", str(backup_path), "remote", "get-url", "origin"],
- check=False,
- capture_output=True,
- )
- if check_remote.returncode != 0:
- # Remote doesn't exist, add it.
- subprocess.run( # noqa: S603
- [GIT_EXECUTABLE, "-C", str(backup_path), "remote", "add", "origin", remote_url],
- check=True,
- capture_output=True,
- )
- logger.info("Added remote 'origin' with URL: %s", remote_url)
- else:
- # Remote exists, update it if the URL has changed.
- current_url: str = check_remote.stdout.decode().strip()
- if current_url != remote_url:
- subprocess.run( # noqa: S603
- [GIT_EXECUTABLE, "-C", str(backup_path), "remote", "set-url", "origin", remote_url],
- check=True,
- capture_output=True,
- )
- logger.info("Updated remote 'origin' URL from %s to %s", current_url, remote_url)
- except Exception:
- logger.exception("Failed to set up git backup repository at %s", backup_path)
- return False
- return True
-
-
-def export_state(reader: Reader, backup_path: Path) -> None:
- """Serialise the current bot state to ``state.json`` inside *backup_path*.
-
- Args:
- reader: The :class:`reader.Reader` instance to read state from.
- backup_path: Destination directory for the exported ``state.json``.
- """
- feeds_state: list[dict] = []
- for feed in reader.get_feeds():
- feed_data: dict = {"url": feed.url}
- for tag in _FEED_TAGS:
- try:
- value: TAG_VALUE = reader.get_tag(feed, tag, None)
- if value is not None and value != "": # noqa: PLC1901
- feed_data[tag] = value
- except Exception:
- logger.exception("Failed to read tag '%s' for feed '%s' during state export", tag, feed.url)
- feeds_state.append(feed_data)
-
- webhooks: list[str | int | float | bool | dict[str, Any] | list[Any] | None] = list(
- reader.get_tag((), "webhooks", []),
- )
-
- # Export global update interval if set
- global_update_interval: dict[str, Any] | None = None
- global_update_config = reader.get_tag((), ".reader.update", None)
- if isinstance(global_update_config, dict):
- global_update_interval = global_update_config
-
- state: dict = {"feeds": feeds_state, "webhooks": webhooks}
- if global_update_interval is not None:
- state["global_update_interval"] = global_update_interval
- state_file: Path = backup_path / "state.json"
- state_file.write_text(json.dumps(state, indent=2, default=str), encoding="utf-8")
-
-
-def commit_state_change(reader: Reader, message: str) -> None:
- """Export current state and commit it to the backup repository.
-
- This is a no-op when ``GIT_BACKUP_PATH`` is not configured. Errors are
- logged but never raised so that a backup failure never interrupts normal
- bot operation.
-
- Args:
- reader: The :class:`reader.Reader` instance to read state from.
- message: Commit message describing the change (e.g. ``"Add feed example.com/rss.xml"``).
- """
- backup_path: Path | None = get_backup_path()
- if backup_path is None:
- return
-
- if not setup_backup_repo(backup_path):
- return
-
- try:
- export_state(reader, backup_path)
-
- subprocess.run([GIT_EXECUTABLE, "-C", str(backup_path), "add", "-A"], check=True, capture_output=True) # noqa: S603
-
- # Only create a commit if there are staged changes.
- diff_result: subprocess.CompletedProcess[bytes] = subprocess.run( # noqa: S603
- [GIT_EXECUTABLE, "-C", str(backup_path), "diff", "--cached", "--exit-code"],
- check=False,
- capture_output=True,
- )
- if diff_result.returncode == 0:
- logger.debug("No state changes to commit for: %s", message)
- return
-
- subprocess.run( # noqa: S603
- [GIT_EXECUTABLE, "-C", str(backup_path), "commit", "-m", message],
- check=True,
- capture_output=True,
- )
- logger.info("Committed state change to backup repo: %s", message)
-
- # Push to remote if configured.
- if get_backup_remote():
- subprocess.run( # noqa: S603
- [GIT_EXECUTABLE, "-C", str(backup_path), "push", "origin", "HEAD"],
- check=True,
- capture_output=True,
- )
- logger.info("Pushed state change to remote 'origin': %s", message)
- except Exception:
- logger.exception("Failed to commit state change '%s' to backup repo", message)
diff --git a/discord_rss_bot/hoyolab_api.py b/discord_rss_bot/hoyolab_api.py
deleted file mode 100644
index 227a413..0000000
--- a/discord_rss_bot/hoyolab_api.py
+++ /dev/null
@@ -1,195 +0,0 @@
-from __future__ import annotations
-
-import contextlib
-import json
-import logging
-import re
-from typing import TYPE_CHECKING
-from typing import Any
-
-import requests
-from discord_webhook import DiscordEmbed
-from discord_webhook import DiscordWebhook
-
-if TYPE_CHECKING:
- from reader import Entry
-
-
-logger: logging.Logger = logging.getLogger(__name__)
-
-
-def is_c3kay_feed(feed_url: str) -> bool:
- """Check if the feed is from c3kay.de.
-
- Args:
- feed_url: The feed URL to check.
-
- Returns:
- bool: True if the feed is from c3kay.de, False otherwise.
- """
- return "feeds.c3kay.de" in feed_url
-
-
-def extract_post_id_from_hoyolab_url(url: str) -> str | None:
- """Extract the post ID from a Hoyolab URL.
-
- Args:
- url: The Hoyolab URL to extract the post ID from.
- For example: https://www.hoyolab.com/article/38588239
-
- Returns:
- str | None: The post ID if found, None otherwise.
- """
- try:
- match: re.Match[str] | None = re.search(r"/article/(\d+)", url)
- if match:
- return match.group(1)
- except (ValueError, AttributeError, TypeError) as e:
- logger.warning("Error extracting post ID from Hoyolab URL %s: %s", url, e)
-
- return None
-
-
-def fetch_hoyolab_post(post_id: str) -> dict[str, Any] | None:
- """Fetch post data from the Hoyolab API.
-
- Args:
- post_id: The post ID to fetch.
-
- Returns:
- dict[str, Any] | None: The post data if successful, None otherwise.
- """
- if not post_id:
- return None
-
- http_ok = 200
- try:
- url: str = f"https://bbs-api-os.hoyolab.com/community/post/wapi/getPostFull?post_id={post_id}"
- response: requests.Response = requests.get(url, timeout=10)
-
- if response.status_code == http_ok:
- data: dict[str, Any] = response.json()
- if data.get("retcode") == 0 and "data" in data and "post" in data["data"]:
- return data["data"]["post"]
-
- logger.warning("Failed to fetch Hoyolab post %s: %s", post_id, response.text)
- except (requests.RequestException, ValueError):
- logger.exception("Error fetching Hoyolab post %s", post_id)
-
- return None
-
-
-def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: dict[str, Any]) -> DiscordWebhook: # noqa: C901, PLR0912, PLR0914, PLR0915
- """Create a webhook with data from the Hoyolab API.
-
- Args:
- webhook_url: The webhook URL.
- entry: The entry to send to Discord.
- post_data: The post data from the Hoyolab API.
-
- Returns:
- DiscordWebhook: The webhook with the embed.
- """
- entry_link: str = entry.link or entry.feed.url
- webhook = DiscordWebhook(url=webhook_url, rate_limit_retry=True)
-
- # Extract relevant data from the post
- post: dict[str, Any] = post_data.get("post", {})
- subject: str = post.get("subject", "")
- content: str = post.get("content", "{}")
-
- logger.debug("Post subject: %s", subject)
- logger.debug("Post content: %s", content)
-
- content_data: dict[str, str] = {}
- with contextlib.suppress(json.JSONDecodeError, ValueError):
- content_data = json.loads(content)
-
- logger.debug("Content data: %s", content_data)
-
- description: str = content_data.get("describe", "")
- if not description:
- description = post.get("desc", "")
-
- # Create the embed
- discord_embed = DiscordEmbed()
-
- # Set title and description
- discord_embed.set_title(subject)
- discord_embed.set_url(entry_link)
-
- # Get post.image_list
- image_list: list[dict[str, Any]] = post_data.get("image_list", [])
- if image_list:
- image_url: str = str(image_list[0].get("url", ""))
- image_height: int = int(image_list[0].get("height", 1080))
- image_width: int = int(image_list[0].get("width", 1920))
-
- logger.debug("Image URL: %s, Height: %s, Width: %s", image_url, image_height, image_width)
- discord_embed.set_image(url=image_url, height=image_height, width=image_width)
-
- video: dict[str, str | int | bool] = post_data.get("video", {})
- if video and video.get("url"):
- video_url: str = str(video.get("url", ""))
- logger.debug("Video URL: %s", video_url)
- with contextlib.suppress(requests.RequestException):
- video_response: requests.Response = requests.get(video_url, stream=True, timeout=10)
- if video_response.ok:
- webhook.add_file(
- file=video_response.content,
- filename=f"{entry.id}.mp4",
- )
-
- game = post_data.get("game", {})
-
- if game and game.get("color"):
- game_color = str(game.get("color", ""))
- discord_embed.set_color(game_color.removeprefix("#"))
-
- user: dict[str, str | int | bool] = post_data.get("user", {})
- author_name: str = str(user.get("nickname", ""))
- avatar_url: str = str(user.get("avatar_url", ""))
- if author_name:
- webhook.avatar_url = avatar_url
- webhook.username = author_name
-
- classification = post_data.get("classification", {})
-
- if classification and classification.get("name"):
- footer = str(classification.get("name", ""))
- discord_embed.set_footer(text=footer)
-
- webhook.add_embed(discord_embed)
-
- # Only show Youtube URL if available
- structured_content: str = post.get("structured_content", "")
- if structured_content: # noqa: PLR1702
- try:
- structured_content_data: list[dict[str, Any]] = json.loads(structured_content)
- for item in structured_content_data:
- if item.get("insert") and isinstance(item["insert"], dict):
- video_url: str = str(item["insert"].get("video", ""))
- if video_url:
- video_id_match: re.Match[str] | None = re.search(r"embed/([a-zA-Z0-9_-]+)", video_url)
- if video_id_match:
- video_id: str = video_id_match.group(1)
- logger.debug("Video ID: %s", video_id)
- webhook.content = f"https://www.youtube.com/watch?v={video_id}"
- webhook.remove_embeds()
-
- except (json.JSONDecodeError, ValueError) as e:
- logger.warning("Error parsing structured content: %s", e)
-
- event_start_date: str = post.get("event_start_date", "")
- if event_start_date and event_start_date != "0":
- discord_embed.add_embed_field(name="Start", value=f"")
-
- event_end_date: str = post.get("event_end_date", "")
- if event_end_date and event_end_date != "0":
- discord_embed.add_embed_field(name="End", value=f"")
-
- created_at: str = post.get("created_at", "")
- if created_at and created_at != "0":
- discord_embed.set_timestamp(timestamp=created_at)
-
- return webhook
diff --git a/discord_rss_bot/is_url_valid.py b/discord_rss_bot/is_url_valid.py
index c986b4a..cca1491 100644
--- a/discord_rss_bot/is_url_valid.py
+++ b/discord_rss_bot/is_url_valid.py
@@ -1,7 +1,6 @@
from __future__ import annotations
-from urllib.parse import ParseResult
-from urllib.parse import urlparse
+from urllib.parse import ParseResult, urlparse
def is_url_valid(url: str) -> bool:
diff --git a/discord_rss_bot/main.py b/discord_rss_bot/main.py
index 1e5211b..3a1f0ca 100644
--- a/discord_rss_bot/main.py
+++ b/discord_rss_bot/main.py
@@ -7,65 +7,48 @@ import typing
import urllib.parse
from contextlib import asynccontextmanager
from dataclasses import dataclass
-from datetime import UTC
-from datetime import datetime
+from datetime import UTC, datetime
from functools import lru_cache
-from typing import TYPE_CHECKING
-from typing import Annotated
-from typing import Any
-from typing import cast
+from typing import TYPE_CHECKING, Annotated, cast
import httpx
import sentry_sdk
import uvicorn
from apscheduler.schedulers.asyncio import AsyncIOScheduler
-from fastapi import Depends
-from fastapi import FastAPI
-from fastapi import Form
-from fastapi import HTTPException
-from fastapi import Request
+from fastapi import FastAPI, Form, HTTPException, Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from httpx import Response
from markdownify import markdownify
-from reader import Entry
-from reader import EntryNotFoundError
-from reader import Feed
-from reader import FeedExistsError
-from reader import FeedNotFoundError
-from reader import Reader
-from reader import ReaderError
-from reader import TagNotFoundError
+from reader import Entry, EntryNotFoundError, Feed, FeedNotFoundError, Reader, TagNotFoundError
from starlette.responses import RedirectResponse
from discord_rss_bot import settings
-from discord_rss_bot.custom_filters import entry_is_blacklisted
-from discord_rss_bot.custom_filters import entry_is_whitelisted
-from discord_rss_bot.custom_message import CustomEmbed
-from discord_rss_bot.custom_message import get_custom_message
-from discord_rss_bot.custom_message import get_embed
-from discord_rss_bot.custom_message import get_first_image
-from discord_rss_bot.custom_message import replace_tags_in_text_message
-from discord_rss_bot.custom_message import save_embed
-from discord_rss_bot.feeds import create_feed
-from discord_rss_bot.feeds import extract_domain
-from discord_rss_bot.feeds import send_entry_to_discord
-from discord_rss_bot.feeds import send_to_discord
-from discord_rss_bot.git_backup import commit_state_change
-from discord_rss_bot.git_backup import get_backup_path
-from discord_rss_bot.is_url_valid import is_url_valid
-from discord_rss_bot.search import create_search_context
+from discord_rss_bot.custom_filters import (
+ entry_is_blacklisted,
+ entry_is_whitelisted,
+)
+from discord_rss_bot.custom_message import (
+ CustomEmbed,
+ get_custom_message,
+ get_embed,
+ get_first_image,
+ replace_tags_in_text_message,
+ save_embed,
+)
+from discord_rss_bot.feeds import create_feed, send_entry_to_discord, send_to_discord
+from discord_rss_bot.missing_tags import add_missing_tags
+from discord_rss_bot.search import create_html_for_search_results
from discord_rss_bot.settings import get_reader
if TYPE_CHECKING:
- from collections.abc import AsyncGenerator
from collections.abc import Iterable
from reader.types import JSONType
-LOGGING_CONFIG: dict[str, Any] = {
+LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": True,
"formatters": {
@@ -101,71 +84,18 @@ LOGGING_CONFIG: dict[str, Any] = {
logging.config.dictConfig(LOGGING_CONFIG)
logger: logging.Logger = logging.getLogger(__name__)
-
-
-def get_reader_dependency() -> Reader:
- """Provide the app Reader instance as a FastAPI dependency.
-
- Returns:
- Reader: The shared Reader instance.
- """
- return get_reader()
-
-
-# Time constants for relative time formatting
-SECONDS_PER_MINUTE = 60
-SECONDS_PER_HOUR = 3600
-SECONDS_PER_DAY = 86400
-
-
-def relative_time(dt: datetime | None) -> str:
- """Convert a datetime to a relative time string (e.g., '2 hours ago', 'in 5 minutes').
-
- Args:
- dt: The datetime to convert (should be timezone-aware).
-
- Returns:
- A human-readable relative time string.
- """
- if dt is None:
- return "Never"
-
- now = datetime.now(tz=UTC)
- diff = dt - now
- seconds = int(abs(diff.total_seconds()))
- is_future = diff.total_seconds() > 0
-
- # Determine the appropriate unit and value
- if seconds < SECONDS_PER_MINUTE:
- value = seconds
- unit = "s"
- elif seconds < SECONDS_PER_HOUR:
- value = seconds // SECONDS_PER_MINUTE
- unit = "m"
- elif seconds < SECONDS_PER_DAY:
- value = seconds // SECONDS_PER_HOUR
- unit = "h"
- else:
- value = seconds // SECONDS_PER_DAY
- unit = "d"
-
- # Format based on future or past
- return f"in {value}{unit}" if is_future else f"{value}{unit} ago"
+reader: Reader = get_reader()
@asynccontextmanager
-async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
- """Lifespan function for the FastAPI app."""
- reader: Reader = get_reader()
- scheduler: AsyncIOScheduler = AsyncIOScheduler(timezone=UTC)
- scheduler.add_job(
- func=send_to_discord,
- trigger="interval",
- minutes=1,
- id="send_to_discord",
- max_instances=1,
- next_run_time=datetime.now(tz=UTC),
- )
+async def lifespan(app: FastAPI) -> typing.AsyncGenerator[None]:
+ """This is needed for the ASGI server to run."""
+ add_missing_tags(reader)
+ scheduler: AsyncIOScheduler = AsyncIOScheduler()
+
+ # Update all feeds every 15 minutes.
+ # TODO(TheLovinator): Make this configurable.
+ scheduler.add_job(send_to_discord, "interval", minutes=15, next_run_time=datetime.now(tz=UTC))
scheduler.start()
logger.info("Scheduler started.")
yield
@@ -180,29 +110,27 @@ templates: Jinja2Templates = Jinja2Templates(directory="discord_rss_bot/template
# Add the filters to the Jinja2 environment so they can be used in html templates.
templates.env.filters["encode_url"] = lambda url: urllib.parse.quote(url) if url else ""
+templates.env.filters["entry_is_whitelisted"] = entry_is_whitelisted
+templates.env.filters["entry_is_blacklisted"] = entry_is_blacklisted
templates.env.filters["discord_markdown"] = markdownify
-templates.env.filters["relative_time"] = relative_time
-templates.env.globals["get_backup_path"] = get_backup_path
@app.post("/add_webhook")
async def post_add_webhook(
webhook_name: Annotated[str, Form()],
webhook_url: Annotated[str, Form()],
- reader: Annotated[Reader, Depends(get_reader_dependency)],
) -> RedirectResponse:
"""Add a feed to the database.
Args:
webhook_name: The name of the webhook.
webhook_url: The url of the webhook.
- reader: The Reader instance.
-
- Returns:
- RedirectResponse: Redirect to the index page.
Raises:
HTTPException: If the webhook already exists.
+
+ Returns:
+ RedirectResponse: Redirect to the index page.
"""
# Get current webhooks from the database if they exist otherwise use an empty list.
webhooks = list(reader.get_tag((), "webhooks", []))
@@ -219,8 +147,6 @@ async def post_add_webhook(
reader.set_tag((), "webhooks", webhooks) # pyright: ignore[reportArgumentType]
- commit_state_change(reader, f"Add webhook {webhook_name.strip()}")
-
return RedirectResponse(url="/", status_code=303)
# TODO(TheLovinator): Show this error on the page.
@@ -229,22 +155,17 @@ async def post_add_webhook(
@app.post("/delete_webhook")
-async def post_delete_webhook(
- webhook_url: Annotated[str, Form()],
- reader: Annotated[Reader, Depends(get_reader_dependency)],
-) -> RedirectResponse:
+async def post_delete_webhook(webhook_url: Annotated[str, Form()]) -> RedirectResponse:
"""Delete a webhook from the database.
Args:
webhook_url: The url of the webhook.
- reader: The Reader instance.
-
- Returns:
- RedirectResponse: Redirect to the index page.
Raises:
HTTPException: If the webhook could not be deleted
+ Returns:
+ RedirectResponse: Redirect to the index page.
"""
# TODO(TheLovinator): Check if the webhook is in use by any feeds before deleting it.
# TODO(TheLovinator): Replace HTTPException with a custom exception for both of these.
@@ -271,8 +192,6 @@ async def post_delete_webhook(
# Add our new list of webhooks to the database.
reader.set_tag((), "webhooks", webhooks) # pyright: ignore[reportArgumentType]
- commit_state_change(reader, f"Delete webhook {webhook_url.strip()}")
-
return RedirectResponse(url="/", status_code=303)
@@ -280,34 +199,27 @@ async def post_delete_webhook(
async def post_create_feed(
feed_url: Annotated[str, Form()],
webhook_dropdown: Annotated[str, Form()],
- reader: Annotated[Reader, Depends(get_reader_dependency)],
) -> RedirectResponse:
"""Add a feed to the database.
Args:
feed_url: The feed to add.
webhook_dropdown: The webhook to use.
- reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the feed page.
"""
clean_feed_url: str = feed_url.strip()
create_feed(reader, feed_url, webhook_dropdown)
- commit_state_change(reader, f"Add feed {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.post("/pause")
-async def post_pause_feed(
- feed_url: Annotated[str, Form()],
- reader: Annotated[Reader, Depends(get_reader_dependency)],
-) -> RedirectResponse:
+async def post_pause_feed(feed_url: Annotated[str, Form()]) -> RedirectResponse:
"""Pause a feed.
Args:
feed_url: The feed to pause.
- reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the feed page.
@@ -318,15 +230,11 @@ async def post_pause_feed(
@app.post("/unpause")
-async def post_unpause_feed(
- feed_url: Annotated[str, Form()],
- reader: Annotated[Reader, Depends(get_reader_dependency)],
-) -> RedirectResponse:
+async def post_unpause_feed(feed_url: Annotated[str, Form()]) -> RedirectResponse:
"""Unpause a feed.
Args:
feed_url: The Feed to unpause.
- reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the feed page.
@@ -338,15 +246,10 @@ async def post_unpause_feed(
@app.post("/whitelist")
async def post_set_whitelist(
- reader: Annotated[Reader, Depends(get_reader_dependency)],
whitelist_title: Annotated[str, Form()] = "",
whitelist_summary: Annotated[str, Form()] = "",
whitelist_content: Annotated[str, Form()] = "",
whitelist_author: Annotated[str, Form()] = "",
- regex_whitelist_title: Annotated[str, Form()] = "",
- regex_whitelist_summary: Annotated[str, Form()] = "",
- regex_whitelist_content: Annotated[str, Form()] = "",
- regex_whitelist_author: Annotated[str, Form()] = "",
feed_url: Annotated[str, Form()] = "",
) -> RedirectResponse:
"""Set what the whitelist should be sent, if you have this set only words in the whitelist will be sent.
@@ -356,12 +259,7 @@ async def post_set_whitelist(
whitelist_summary: Whitelisted words for when checking the summary.
whitelist_content: Whitelisted words for when checking the content.
whitelist_author: Whitelisted words for when checking the author.
- regex_whitelist_title: Whitelisted regex for when checking the title.
- regex_whitelist_summary: Whitelisted regex for when checking the summary.
- regex_whitelist_content: Whitelisted regex for when checking the content.
- regex_whitelist_author: Whitelisted regex for when checking the author.
feed_url: The feed we should set the whitelist for.
- reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the feed page.
@@ -371,28 +269,17 @@ async def post_set_whitelist(
reader.set_tag(clean_feed_url, "whitelist_summary", whitelist_summary) # pyright: ignore[reportArgumentType][call-overload]
reader.set_tag(clean_feed_url, "whitelist_content", whitelist_content) # pyright: ignore[reportArgumentType][call-overload]
reader.set_tag(clean_feed_url, "whitelist_author", whitelist_author) # pyright: ignore[reportArgumentType][call-overload]
- reader.set_tag(clean_feed_url, "regex_whitelist_title", regex_whitelist_title) # pyright: ignore[reportArgumentType][call-overload]
- reader.set_tag(clean_feed_url, "regex_whitelist_summary", regex_whitelist_summary) # pyright: ignore[reportArgumentType][call-overload]
- reader.set_tag(clean_feed_url, "regex_whitelist_content", regex_whitelist_content) # pyright: ignore[reportArgumentType][call-overload]
- reader.set_tag(clean_feed_url, "regex_whitelist_author", regex_whitelist_author) # pyright: ignore[reportArgumentType][call-overload]
-
- commit_state_change(reader, f"Update whitelist for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.get("/whitelist", response_class=HTMLResponse)
-async def get_whitelist(
- feed_url: str,
- request: Request,
- reader: Annotated[Reader, Depends(get_reader_dependency)],
-):
+async def get_whitelist(feed_url: str, request: Request):
"""Get the whitelist.
Args:
feed_url: What feed we should get the whitelist for.
request: The request object.
- reader: The Reader instance.
Returns:
HTMLResponse: The whitelist page.
@@ -400,14 +287,11 @@ async def get_whitelist(
clean_feed_url: str = feed_url.strip()
feed: Feed = reader.get_feed(urllib.parse.unquote(clean_feed_url))
+ # Get previous data, this is used when creating the form.
whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", ""))
whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", ""))
whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", ""))
whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", ""))
- regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", ""))
- regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", ""))
- regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", ""))
- regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", ""))
context = {
"request": request,
@@ -416,25 +300,16 @@ async def get_whitelist(
"whitelist_summary": whitelist_summary,
"whitelist_content": whitelist_content,
"whitelist_author": whitelist_author,
- "regex_whitelist_title": regex_whitelist_title,
- "regex_whitelist_summary": regex_whitelist_summary,
- "regex_whitelist_content": regex_whitelist_content,
- "regex_whitelist_author": regex_whitelist_author,
}
return templates.TemplateResponse(request=request, name="whitelist.html", context=context)
@app.post("/blacklist")
async def post_set_blacklist(
- reader: Annotated[Reader, Depends(get_reader_dependency)],
blacklist_title: Annotated[str, Form()] = "",
blacklist_summary: Annotated[str, Form()] = "",
blacklist_content: Annotated[str, Form()] = "",
blacklist_author: Annotated[str, Form()] = "",
- regex_blacklist_title: Annotated[str, Form()] = "",
- regex_blacklist_summary: Annotated[str, Form()] = "",
- regex_blacklist_content: Annotated[str, Form()] = "",
- regex_blacklist_author: Annotated[str, Form()] = "",
feed_url: Annotated[str, Form()] = "",
) -> RedirectResponse:
"""Set the blacklist.
@@ -447,12 +322,7 @@ async def post_set_blacklist(
blacklist_summary: Blacklisted words for when checking the summary.
blacklist_content: Blacklisted words for when checking the content.
blacklist_author: Blacklisted words for when checking the author.
- regex_blacklist_title: Blacklisted regex for when checking the title.
- regex_blacklist_summary: Blacklisted regex for when checking the summary.
- regex_blacklist_content: Blacklisted regex for when checking the content.
- regex_blacklist_author: Blacklisted regex for when checking the author.
feed_url: What feed we should set the blacklist for.
- reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the feed page.
@@ -462,40 +332,28 @@ async def post_set_blacklist(
reader.set_tag(clean_feed_url, "blacklist_summary", blacklist_summary) # pyright: ignore[reportArgumentType][call-overload]
reader.set_tag(clean_feed_url, "blacklist_content", blacklist_content) # pyright: ignore[reportArgumentType][call-overload]
reader.set_tag(clean_feed_url, "blacklist_author", blacklist_author) # pyright: ignore[reportArgumentType][call-overload]
- reader.set_tag(clean_feed_url, "regex_blacklist_title", regex_blacklist_title) # pyright: ignore[reportArgumentType][call-overload]
- reader.set_tag(clean_feed_url, "regex_blacklist_summary", regex_blacklist_summary) # pyright: ignore[reportArgumentType][call-overload]
- reader.set_tag(clean_feed_url, "regex_blacklist_content", regex_blacklist_content) # pyright: ignore[reportArgumentType][call-overload]
- reader.set_tag(clean_feed_url, "regex_blacklist_author", regex_blacklist_author) # pyright: ignore[reportArgumentType][call-overload]
- commit_state_change(reader, f"Update blacklist for {clean_feed_url}")
+
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.get("/blacklist", response_class=HTMLResponse)
-async def get_blacklist(
- feed_url: str,
- request: Request,
- reader: Annotated[Reader, Depends(get_reader_dependency)],
-):
+async def get_blacklist(feed_url: str, request: Request):
"""Get the blacklist.
Args:
feed_url: What feed we should get the blacklist for.
request: The request object.
- reader: The Reader instance.
Returns:
HTMLResponse: The blacklist page.
"""
feed: Feed = reader.get_feed(urllib.parse.unquote(feed_url))
+ # Get previous data, this is used when creating the form.
blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", ""))
blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", ""))
blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", ""))
blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", ""))
- regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", ""))
- regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", ""))
- regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", ""))
- regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", ""))
context = {
"request": request,
@@ -504,10 +362,6 @@ async def get_blacklist(
"blacklist_summary": blacklist_summary,
"blacklist_content": blacklist_content,
"blacklist_author": blacklist_author,
- "regex_blacklist_title": regex_blacklist_title,
- "regex_blacklist_summary": regex_blacklist_summary,
- "regex_blacklist_content": regex_blacklist_content,
- "regex_blacklist_author": regex_blacklist_author,
}
return templates.TemplateResponse(request=request, name="blacklist.html", context=context)
@@ -515,7 +369,6 @@ async def get_blacklist(
@app.post("/custom")
async def post_set_custom(
feed_url: Annotated[str, Form()],
- reader: Annotated[Reader, Depends(get_reader_dependency)],
custom_message: Annotated[str, Form()] = "",
) -> RedirectResponse:
"""Set the custom message, this is used when sending the message.
@@ -523,7 +376,6 @@ async def post_set_custom(
Args:
custom_message: The custom message.
feed_url: The feed we should set the custom message for.
- reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the feed page.
@@ -540,22 +392,16 @@ async def post_set_custom(
reader.set_tag(feed_url, "custom_message", default_custom_message)
clean_feed_url: str = feed_url.strip()
- commit_state_change(reader, f"Update custom message for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.get("/custom", response_class=HTMLResponse)
-async def get_custom(
- feed_url: str,
- request: Request,
- reader: Annotated[Reader, Depends(get_reader_dependency)],
-):
+async def get_custom(feed_url: str, request: Request):
"""Get the custom message. This is used when sending the message to Discord.
Args:
feed_url: What feed we should get the custom message for.
request: The request object.
- reader: The Reader instance.
Returns:
HTMLResponse: The custom message page.
@@ -576,17 +422,12 @@ async def get_custom(
@app.get("/embed", response_class=HTMLResponse)
-async def get_embed_page(
- feed_url: str,
- request: Request,
- reader: Annotated[Reader, Depends(get_reader_dependency)],
-):
+async def get_embed_page(feed_url: str, request: Request):
"""Get the custom message. This is used when sending the message to Discord.
Args:
feed_url: What feed we should get the custom message for.
request: The request object.
- reader: The Reader instance.
Returns:
HTMLResponse: The embed page.
@@ -620,9 +461,8 @@ async def get_embed_page(
@app.post("/embed", response_class=HTMLResponse)
-async def post_embed( # noqa: C901
+async def post_embed( # noqa: PLR0913, PLR0917
feed_url: Annotated[str, Form()],
- reader: Annotated[Reader, Depends(get_reader_dependency)],
title: Annotated[str, Form()] = "",
description: Annotated[str, Form()] = "",
color: Annotated[str, Form()] = "",
@@ -648,7 +488,7 @@ async def post_embed( # noqa: C901
author_icon_url: The author icon url of the embed.
footer_text: The footer text of the embed.
footer_icon_url: The footer icon url of the embed.
- reader: The Reader instance.
+
Returns:
RedirectResponse: Redirect to the embed page.
@@ -657,245 +497,59 @@ async def post_embed( # noqa: C901
feed: Feed = reader.get_feed(urllib.parse.unquote(clean_feed_url))
custom_embed: CustomEmbed = get_embed(reader, feed)
- # Only overwrite fields that the user provided. This prevents accidental
- # clearing of previously saved embed data when the form submits empty
- # values for fields the user did not change.
- if title:
- custom_embed.title = title
- if description:
- custom_embed.description = description
- if color:
- custom_embed.color = color
- if image_url:
- custom_embed.image_url = image_url
- if thumbnail_url:
- custom_embed.thumbnail_url = thumbnail_url
- if author_name:
- custom_embed.author_name = author_name
- if author_url:
- custom_embed.author_url = author_url
- if author_icon_url:
- custom_embed.author_icon_url = author_icon_url
- if footer_text:
- custom_embed.footer_text = footer_text
- if footer_icon_url:
- custom_embed.footer_icon_url = footer_icon_url
+ custom_embed.title = title
+ custom_embed.description = description
+ custom_embed.color = color
+ custom_embed.image_url = image_url
+ custom_embed.thumbnail_url = thumbnail_url
+ custom_embed.author_name = author_name
+ custom_embed.author_url = author_url
+ custom_embed.author_icon_url = author_icon_url
+ custom_embed.footer_text = footer_text
+ custom_embed.footer_icon_url = footer_icon_url
# Save the data.
save_embed(reader, feed, custom_embed)
- commit_state_change(reader, f"Update embed settings for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.post("/use_embed")
-async def post_use_embed(
- feed_url: Annotated[str, Form()],
- reader: Annotated[Reader, Depends(get_reader_dependency)],
-) -> RedirectResponse:
+async def post_use_embed(feed_url: Annotated[str, Form()]) -> RedirectResponse:
"""Use embed instead of text.
Args:
feed_url: The feed to change.
- reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the feed page.
"""
clean_feed_url: str = feed_url.strip()
reader.set_tag(clean_feed_url, "should_send_embed", True) # pyright: ignore[reportArgumentType]
- commit_state_change(reader, f"Enable embed mode for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@app.post("/use_text")
-async def post_use_text(
- feed_url: Annotated[str, Form()],
- reader: Annotated[Reader, Depends(get_reader_dependency)],
-) -> RedirectResponse:
+async def post_use_text(feed_url: Annotated[str, Form()]) -> RedirectResponse:
"""Use text instead of embed.
Args:
feed_url: The feed to change.
- reader: The Reader instance.
Returns:
RedirectResponse: Redirect to the feed page.
"""
clean_feed_url: str = feed_url.strip()
reader.set_tag(clean_feed_url, "should_send_embed", False) # pyright: ignore[reportArgumentType]
- commit_state_change(reader, f"Disable embed mode for {clean_feed_url}")
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
-@app.post("/set_update_interval")
-async def post_set_update_interval(
- feed_url: Annotated[str, Form()],
- reader: Annotated[Reader, Depends(get_reader_dependency)],
- interval_minutes: Annotated[int | None, Form()] = None,
- redirect_to: Annotated[str, Form()] = "",
-) -> RedirectResponse:
- """Set the update interval for a feed.
-
- Args:
- feed_url: The feed to change.
- interval_minutes: The update interval in minutes (None to reset to global default).
- redirect_to: Optional redirect URL (defaults to feed page).
- reader: The Reader instance.
-
- Returns:
- RedirectResponse: Redirect to the specified page or feed page.
- """
- clean_feed_url: str = feed_url.strip()
-
- # If no interval specified, reset to global default
- if interval_minutes is None:
- try:
- reader.delete_tag(clean_feed_url, ".reader.update")
- commit_state_change(reader, f"Reset update interval to default for {clean_feed_url}")
- except TagNotFoundError:
- pass
- else:
- # Validate interval (minimum 1 minute, no maximum)
- interval_minutes = max(interval_minutes, 1)
- reader.set_tag(clean_feed_url, ".reader.update", {"interval": interval_minutes}) # pyright: ignore[reportArgumentType]
- commit_state_change(reader, f"Set update interval to {interval_minutes} minutes for {clean_feed_url}")
-
- # Update the feed immediately to recalculate update_after with the new interval
- try:
- reader.update_feed(clean_feed_url)
- logger.info("Updated feed after interval change: %s", clean_feed_url)
- except Exception:
- logger.exception("Failed to update feed after interval change: %s", clean_feed_url)
-
- if redirect_to:
- return RedirectResponse(url=redirect_to, status_code=303)
- return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
-
-
-@app.post("/change_feed_url")
-async def post_change_feed_url(
- old_feed_url: Annotated[str, Form()],
- new_feed_url: Annotated[str, Form()],
- reader: Annotated[Reader, Depends(get_reader_dependency)],
-) -> RedirectResponse:
- """Change the URL for an existing feed.
-
- Args:
- old_feed_url: Current feed URL.
- new_feed_url: New feed URL to change to.
- reader: The Reader instance.
-
- Returns:
- RedirectResponse: Redirect to the feed page for the resulting URL.
-
- Raises:
- HTTPException: If the old feed is not found, the new URL already exists, or change fails.
- """
- clean_old_feed_url: str = old_feed_url.strip()
- clean_new_feed_url: str = new_feed_url.strip()
-
- if not clean_old_feed_url or not clean_new_feed_url:
- raise HTTPException(status_code=400, detail="Feed URLs cannot be empty")
-
- if clean_old_feed_url == clean_new_feed_url:
- return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_old_feed_url)}", status_code=303)
-
- try:
- reader.change_feed_url(clean_old_feed_url, clean_new_feed_url)
- except FeedNotFoundError as e:
- raise HTTPException(status_code=404, detail=f"Feed not found: {clean_old_feed_url}") from e
- except FeedExistsError as e:
- raise HTTPException(status_code=409, detail=f"Feed already exists: {clean_new_feed_url}") from e
- except ReaderError as e:
- raise HTTPException(status_code=400, detail=f"Failed to change feed URL: {e}") from e
-
- # Update the feed with the new URL so we can discover what entries it returns.
- # Then mark all unread entries as read so the scheduler doesn't resend them.
- try:
- reader.update_feed(clean_new_feed_url)
- except Exception:
- logger.exception("Failed to update feed after URL change: %s", clean_new_feed_url)
-
- for entry in reader.get_entries(feed=clean_new_feed_url, read=False):
- try:
- reader.set_entry_read(entry, True)
- except Exception:
- logger.exception("Failed to mark entry as read after URL change: %s", entry.id)
-
- commit_state_change(reader, f"Change feed URL from {clean_old_feed_url} to {clean_new_feed_url}")
- return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_new_feed_url)}", status_code=303)
-
-
-@app.post("/reset_update_interval")
-async def post_reset_update_interval(
- feed_url: Annotated[str, Form()],
- reader: Annotated[Reader, Depends(get_reader_dependency)],
- redirect_to: Annotated[str, Form()] = "",
-) -> RedirectResponse:
- """Reset the update interval for a feed to use the global default.
-
- Args:
- feed_url: The feed to change.
- redirect_to: Optional redirect URL (defaults to feed page).
- reader: The Reader instance.
-
- Returns:
- RedirectResponse: Redirect to the specified page or feed page.
- """
- clean_feed_url: str = feed_url.strip()
-
- try:
- reader.delete_tag(clean_feed_url, ".reader.update")
- commit_state_change(reader, f"Reset update interval to default for {clean_feed_url}")
- except TagNotFoundError:
- # Tag doesn't exist, which is fine
- pass
-
- # Update the feed immediately to recalculate update_after with the new interval
- try:
- reader.update_feed(clean_feed_url)
- logger.info("Updated feed after interval reset: %s", clean_feed_url)
- except Exception:
- logger.exception("Failed to update feed after interval reset: %s", clean_feed_url)
-
- if redirect_to:
- return RedirectResponse(url=redirect_to, status_code=303)
- return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
-
-
-@app.post("/set_global_update_interval")
-async def post_set_global_update_interval(
- interval_minutes: Annotated[int, Form()],
- reader: Annotated[Reader, Depends(get_reader_dependency)],
-) -> RedirectResponse:
- """Set the global default update interval.
-
- Args:
- interval_minutes: The update interval in minutes.
- reader: The Reader instance.
-
- Returns:
- RedirectResponse: Redirect to the settings page.
- """
- # Validate interval (minimum 1 minute, no maximum)
- interval_minutes = max(interval_minutes, 1)
-
- reader.set_tag((), ".reader.update", {"interval": interval_minutes}) # pyright: ignore[reportArgumentType]
- commit_state_change(reader, f"Set global update interval to {interval_minutes} minutes")
- return RedirectResponse(url="/settings", status_code=303)
-
-
@app.get("/add", response_class=HTMLResponse)
-def get_add(
- request: Request,
- reader: Annotated[Reader, Depends(get_reader_dependency)],
-):
+def get_add(request: Request):
"""Page for adding a new feed.
Args:
request: The request object.
- reader: The Reader instance.
Returns:
HTMLResponse: The add feed page.
@@ -908,25 +562,19 @@ def get_add(
@app.get("/feed", response_class=HTMLResponse)
-async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
- feed_url: str,
- request: Request,
- reader: Annotated[Reader, Depends(get_reader_dependency)],
- starting_after: str = "",
-):
+async def get_feed(feed_url: str, request: Request, starting_after: str = ""):
"""Get a feed by URL.
Args:
feed_url: The feed to add.
request: The request object.
starting_after: The entry to start after. Used for pagination.
- reader: The Reader instance.
-
- Returns:
- HTMLResponse: The feed page.
Raises:
HTTPException: If the feed is not found.
+
+ Returns:
+ HTMLResponse: The feed page.
"""
entries_per_page: int = 20
@@ -939,7 +587,7 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
# Only show button if more than 10 entries.
total_entries: int = reader.get_entry_counts(feed=feed).total or 0
- is_show_more_entries_button_visible: bool = total_entries > entries_per_page
+ show_more_entires_button: bool = total_entries > entries_per_page
# Get entries from the feed.
if starting_after:
@@ -950,22 +598,7 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
except EntryNotFoundError as e:
current_entries = list(reader.get_entries(feed=clean_feed_url))
msg: str = f"{e}\n\n{[entry.id for entry in current_entries]}"
- html: str = create_html_for_feed(reader=reader, entries=current_entries, current_feed_url=clean_feed_url)
-
- # Get feed and global intervals for error case too
- feed_interval: int | None = None
- feed_update_config = reader.get_tag(feed, ".reader.update", None)
- if isinstance(feed_update_config, dict) and "interval" in feed_update_config:
- interval_value = feed_update_config["interval"]
- if isinstance(interval_value, int):
- feed_interval = interval_value
-
- global_interval: int = 60
- global_update_config = reader.get_tag((), ".reader.update", None)
- if isinstance(global_update_config, dict) and "interval" in global_update_config:
- interval_value = global_update_config["interval"]
- if isinstance(interval_value, int):
- global_interval = interval_value
+ html: str = create_html_for_feed(current_entries)
context = {
"request": request,
@@ -976,10 +609,8 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
"should_send_embed": False,
"last_entry": None,
"messages": msg,
- "is_show_more_entries_button_visible": is_show_more_entries_button_visible,
+ "show_more_entires_button": show_more_entires_button,
"total_entries": total_entries,
- "feed_interval": feed_interval,
- "global_interval": global_interval,
}
return templates.TemplateResponse(request=request, name="feed.html", context=context)
@@ -1000,25 +631,13 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
last_entry = entries[-1]
# Create the html for the entries.
- html: str = create_html_for_feed(reader=reader, entries=entries, current_feed_url=clean_feed_url)
+ html: str = create_html_for_feed(entries)
- should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed", True))
-
- # Get the update interval for this feed
- feed_interval: int | None = None
- feed_update_config = reader.get_tag(feed, ".reader.update", None)
- if isinstance(feed_update_config, dict) and "interval" in feed_update_config:
- interval_value = feed_update_config["interval"]
- if isinstance(interval_value, int):
- feed_interval = interval_value
-
- # Get the global default update interval
- global_interval: int = 60 # Default to 60 minutes if not set
- global_update_config = reader.get_tag((), ".reader.update", None)
- if isinstance(global_update_config, dict) and "interval" in global_update_config:
- interval_value = global_update_config["interval"]
- if isinstance(interval_value, int):
- global_interval = interval_value
+ try:
+ should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed"))
+ except TagNotFoundError:
+ add_missing_tags(reader)
+ should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed"))
context = {
"request": request,
@@ -1028,25 +647,17 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
"html": html,
"should_send_embed": should_send_embed,
"last_entry": last_entry,
- "is_show_more_entries_button_visible": is_show_more_entries_button_visible,
+ "show_more_entires_button": show_more_entires_button,
"total_entries": total_entries,
- "feed_interval": feed_interval,
- "global_interval": global_interval,
}
return templates.TemplateResponse(request=request, name="feed.html", context=context)
-def create_html_for_feed( # noqa: C901, PLR0914
- reader: Reader,
- entries: Iterable[Entry],
- current_feed_url: str = "",
-) -> str:
+def create_html_for_feed(entries: Iterable[Entry]) -> str:
"""Create HTML for the search results.
Args:
- reader: The Reader instance to use.
entries: The entries to create HTML for.
- current_feed_url: The feed URL currently being viewed in /feed.
Returns:
str: The HTML for the search results.
@@ -1062,75 +673,31 @@ def create_html_for_feed( # noqa: C901, PLR0914
first_image = get_first_image(summary, content)
- text: str = replace_tags_in_text_message(entry, reader=reader) or (
- "
No content available.
"
- )
+ text: str = replace_tags_in_text_message(entry) or "
No content available.
"
published = ""
if entry.published:
published: str = entry.published.strftime("%Y-%m-%d %H:%M:%S")
blacklisted: str = ""
- if entry_is_blacklisted(entry, reader=reader):
+ if entry_is_blacklisted(entry):
blacklisted = "Blacklisted"
whitelisted: str = ""
- if entry_is_whitelisted(entry, reader=reader):
+ if entry_is_whitelisted(entry):
whitelisted = "Whitelisted"
- source_feed_url: str = getattr(entry, "original_feed_url", None) or entry.feed.url
-
- from_another_feed: str = ""
- if current_feed_url and source_feed_url != current_feed_url:
- from_another_feed = f"From another feed: {source_feed_url}"
-
- # Add feed link when viewing from webhook_entries or aggregated views
- feed_link: str = ""
- if not current_feed_url or source_feed_url != current_feed_url:
- encoded_feed_url: str = urllib.parse.quote(source_feed_url)
- feed_title: str = entry.feed.title if hasattr(entry.feed, "title") and entry.feed.title else source_feed_url
- feed_link = (
- f"{feed_title} "
- )
-
entry_id: str = urllib.parse.quote(entry.id)
- encoded_source_feed_url: str = urllib.parse.quote(source_feed_url)
- to_discord_html: str = (
- f""
- "Send to Discord"
- )
-
- # Check if this is a YouTube feed entry and the entry has a link
- is_youtube_feed = "youtube.com/feeds/videos.xml" in entry.feed.url
- video_embed_html = ""
-
- if is_youtube_feed and entry.link:
- # Extract the video ID and create an embed if possible
- video_id: str | None = extract_youtube_video_id(entry.link)
- if video_id:
- video_embed_html: str = f"""
-
-
-
- """
- # Don't use the first image if we have a video embed
- first_image = ""
-
+ to_discord_html: str = f"Send to Discord"
image_html: str = f"" if first_image else ""
html += f"""
- {{ row.resolved_url if resolve_urls else row.candidate_url }}
-
-
- {% if not row.has_match %}
- No match
- {% elif row.will_force_ignore_errors %}
- Will force update (ignore resolve error)
- {% elif row.resolution_error %}
- {{ row.resolution_error }}
- {% elif row.will_force_overwrite %}
- Will force overwrite
- {% elif row.target_exists %}
- Conflict: target URL exists
- {% elif row.will_change %}
- Will update
- {% else %}
- No change
- {% endif %}
-
-
- {% endfor %}
-
-
-
-{% elif replace_from %}
-
No preview rows found for that replacement pattern.
-{% endif %}
diff --git a/discord_rss_bot/templates/base.html b/discord_rss_bot/templates/base.html
index 9146b35..a8640dd 100644
--- a/discord_rss_bot/templates/base.html
+++ b/discord_rss_bot/templates/base.html
@@ -1,12 +1,13 @@
+
+ content="Stay updated with the latest news and events with our easy-to-use RSS bot. Never miss a message or announcement again with real-time notifications directly to your Discord server." />
+ content="discord, rss, bot, notifications, announcements, updates, real-time, server, messages, news, events, feed." />
@@ -17,20 +18,19 @@
{% block head %}
{% endblock head %}
+
{% include "nav.html" %}
You can remove the embed from links by adding < and> around the link. (For example <
- {% raw %} {{entry_link}} {% endraw %}>)
+ {% raw %} {{ entry_link }} {% endraw %}>)
- Hello there!
-
-
- You need to add a webhook here to get started. After that, you can
- add feeds here. You can find both of these links in the navigation bar
- above.
-
-
- If you have any questions or suggestions, feel free to contact me on tlovinator@gmail.com or TheLovinator#9276 on Discord.
-
-
- Thanks!
-
+ Hello there!
+
+ You need to add a webhook here to get started. After that, you can
+ add feeds here. You can find both of these links in the navigation bar
+ above.
+
+
+ If you have any questions or suggestions, feel free to contact me on tlovinator@gmail.com or TheLovinator#9276 on Discord.
+
+
+ Thanks!
+
+ {% endif %}
+
+ {% if broken_feeds %}
+
+
+ Feeds without webhook:
+ {% for broken_feed in broken_feeds %}
+ {{ broken_feed.url }}
+ {% endfor %}
+