Compare commits

..

17 Commits

Author SHA1 Message Date
44f50a4a98 Remove test for updating an existing feed
All checks were successful
Test and build Docker image / docker (push) Successful in 1m27s
2025-05-17 04:07:13 +02:00
2a6dbd33dd Add button for manually updating feed
Some checks failed
Test and build Docker image / docker (push) Failing after 32s
2025-05-17 03:58:08 +02:00
96bcd81191 Use ATX headers instead of SETEXT 2025-05-17 03:53:15 +02:00
901d6cb1a6 Honor 429 Too Many Requests and 503 Service Unavailable responses
All checks were successful
Test and build Docker image / docker (push) Successful in 1m33s
2025-05-05 01:19:52 +02:00
7f9c934d08 Also use custom feed stuff if sent from send_to_discord
All checks were successful
Test and build Docker image / docker (push) Successful in 2m18s
2025-05-04 16:50:29 +02:00
c3a11f55b0 Update Docker healthcheck
All checks were successful
Test and build Docker image / docker (push) Successful in 1m31s
2025-05-04 05:28:37 +02:00
d8247fec01 Replace GitHub Actions build workflow with Gitea workflow
All checks were successful
Test and build Docker image / docker (push) Successful in 1m27s
2025-05-04 04:08:39 +02:00
ffd6f2f9f2 Add Hoyolab API integration
Some checks failed
Test and build Docker image / test (push) Successful in 27s
Test and build Docker image / ruff (push) Successful in 5s
Test and build Docker image / build (push) Failing after 10m49s
2025-05-04 03:48:22 +02:00
544ef6dca3 Update ruff-pre-commit to version 0.11.8 2025-05-03 19:42:20 +02:00
e33b331564 Update ruff-pre-commit to version 0.11.5 2025-04-16 13:33:56 +02:00
cd0f63d59a Add tldextract for improved domain extraction and add new tests for extract_domain function 2025-04-16 13:32:31 +02:00
8b50003eda Group feeds by domain 2025-04-03 16:47:53 +02:00
97d06ddb43 Embed YouTube videos in /feed HTML. Strong code, many bananas! 🦍🦍🦍🦍 2025-04-03 06:20:01 +02:00
ac63041b28 Add regex support to blacklist and whitelist filters. Strong code, many bananas! 🦍🦍🦍🦍 2025-04-03 05:44:50 +02:00
84e39c9f79 Add .gitattributes to set Jinja as the language for HTML files 2025-04-01 22:58:42 +02:00
8408db9afd Enhance YouTube feed display in index.html with username and channel ID formatting 2025-04-01 22:56:54 +02:00
6dfc72d3b0 Add discord_rss_bot directory to Dockerfile 2025-02-10 05:17:46 +01:00
28 changed files with 1413 additions and 160 deletions

1
.gitattributes vendored Normal file
View File

@ -0,0 +1 @@
*.html linguist-language=jinja

View File

@ -0,0 +1,98 @@
---
name: Test and build Docker image
on:
push:
branches:
- master
pull_request:
workflow_dispatch:
schedule:
- cron: "@daily"
env:
TEST_WEBHOOK_URL: ${{ secrets.TEST_WEBHOOK_URL }}
jobs:
docker:
runs-on: ubuntu-latest
steps:
# GitHub Container Registry
- uses: https://github.com/docker/login-action@v3
if: github.event_name != 'pull_request'
with:
registry: ghcr.io
username: thelovinator1
password: ${{ secrets.PACKAGES_WRITE_GITHUB_TOKEN }}
# Gitea Container Registry
- uses: https://github.com/docker/login-action@v3
if: github.event_name != 'pull_request'
with:
registry: git.lovinator.space
username: thelovinator
password: ${{ secrets.PACKAGES_WRITE_GITEA_TOKEN }}
# Download the latest commit from the master branch
- uses: https://github.com/actions/checkout@v4
# Set up QEMU
- id: qemu
uses: https://github.com/docker/setup-qemu-action@v3
with:
image: tonistiigi/binfmt:master
platforms: linux/amd64,linux/arm64
cache-image: false
# Set up Buildx so we can build multi-arch images
- uses: https://github.com/docker/setup-buildx-action@v3
# Install the latest version of ruff
- uses: https://github.com/astral-sh/ruff-action@v3
with:
version: "latest"
# Lint the Python code using ruff
- run: ruff check --exit-non-zero-on-fix --verbose
# Check if the Python code needs formatting
- run: ruff format --check --verbose
# Lint Dockerfile
- run: docker build --check .
# Set up Python 3.13
- uses: actions/setup-python@v5
with:
python-version: 3.13
# Install dependencies
- uses: astral-sh/setup-uv@v5
with:
version: "latest"
- run: uv sync --all-extras --all-groups
# Run tests
- run: uv run pytest
# Extract metadata (tags, labels) from Git reference and GitHub events for Docker
- id: meta
uses: https://github.com/docker/metadata-action@v5
env:
DOCKER_METADATA_ANNOTATIONS_LEVELS: manifest,index
with:
images: |
ghcr.io/thelovinator1/discord-rss-bot
git.lovinator.space/thelovinator/discord-rss-bot
tags: |
type=raw,value=latest,enable=${{ gitea.ref == format('refs/heads/{0}', 'master') }}
type=raw,value=master,enable=${{ gitea.ref == format('refs/heads/{0}', 'master') }}
# Build and push the Docker image
- uses: https://github.com/docker/build-push-action@v6
with:
context: .
platforms: linux/amd64,linux/arm64
push: ${{ gitea.event_name != 'pull_request' }}
labels: ${{ steps.meta.outputs.labels }}
tags: ${{ steps.meta.outputs.tags }}
annotations: ${{ steps.meta.outputs.annotations }}

View File

@ -1,64 +0,0 @@
---
name: Test and build Docker image
on:
push:
pull_request:
workflow_dispatch:
schedule:
- cron: "0 6 * * *"
env:
TEST_WEBHOOK_URL: ${{ secrets.TEST_WEBHOOK_URL }}
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: 3.12
- uses: astral-sh/setup-uv@v5
with:
version: "latest"
- run: uv sync --all-extras --all-groups
- run: uv run pytest
ruff:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: astral-sh/ruff-action@v3
with:
version: "latest"
- run: ruff check --exit-non-zero-on-fix --verbose
- run: ruff format --check --verbose
build:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
if: github.event_name != 'pull_request'
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
needs: [test, ruff]
steps:
- uses: actions/checkout@v4
- uses: docker/setup-qemu-action@v3
with:
platforms: all
- uses: docker/setup-buildx-action@v3
- uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- uses: docker/build-push-action@v6
with:
context: .
platforms: linux/amd64, linux/arm64
push: ${{ github.event_name != 'pull_request' }}
tags: |
ghcr.io/thelovinator1/discord-rss-bot:latest
ghcr.io/thelovinator1/discord-rss-bot:master

View File

@ -38,7 +38,7 @@ repos:
# An extremely fast Python linter and formatter. # An extremely fast Python linter and formatter.
- repo: https://github.com/astral-sh/ruff-pre-commit - repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.9.5 rev: v0.11.8
hooks: hooks:
- id: ruff-format - id: ruff-format
- id: ruff - id: ruff

6
.vscode/launch.json vendored
View File

@ -8,7 +8,11 @@
"module": "uvicorn", "module": "uvicorn",
"args": [ "args": [
"discord_rss_bot.main:app", "discord_rss_bot.main:app",
"--reload" "--reload",
"--host",
"0.0.0.0",
"--port",
"5000",
], ],
"jinja": true, "jinja": true,
"justMyCode": true "justMyCode": true

View File

@ -2,6 +2,8 @@
"cSpell.words": [ "cSpell.words": [
"botuser", "botuser",
"Genshins", "Genshins",
"healthcheck",
"Hoyolab",
"levelname", "levelname",
"Lovinator", "Lovinator",
"markdownified", "markdownified",

View File

@ -9,6 +9,8 @@ COPY --chown=botuser:botuser requirements.txt /home/botuser/discord-rss-bot/
RUN --mount=type=cache,target=/root/.cache/uv \ RUN --mount=type=cache,target=/root/.cache/uv \
--mount=type=bind,source=pyproject.toml,target=pyproject.toml \ --mount=type=bind,source=pyproject.toml,target=pyproject.toml \
uv sync --no-install-project uv sync --no-install-project
COPY --chown=botuser:botuser discord_rss_bot/ /home/botuser/discord-rss-bot/discord_rss_bot/
EXPOSE 5000 EXPOSE 5000
VOLUME ["/home/botuser/.local/share/discord_rss_bot/"] VOLUME ["/home/botuser/.local/share/discord_rss_bot/"]
HEALTHCHECK --interval=10m --timeout=5s CMD ["uv", "run", "./discord_rss_bot/healthcheck.py"]
CMD ["uv", "run", "uvicorn", "discord_rss_bot.main:app", "--host=0.0.0.0", "--port=5000", "--proxy-headers", "--forwarded-allow-ips='*'", "--log-level", "debug"] CMD ["uv", "run", "uvicorn", "discord_rss_bot.main:app", "--host=0.0.0.0", "--port=5000", "--proxy-headers", "--forwarded-allow-ips='*'", "--log-level", "debug"]

View File

@ -2,8 +2,20 @@
Subscribe to RSS feeds and get updates to a Discord webhook. Subscribe to RSS feeds and get updates to a Discord webhook.
> [!NOTE] ## Features
> You should look at [MonitoRSS](https://github.com/synzen/monitorss) for a more feature-rich project.
- Subscribe to RSS feeds and get updates to a Discord webhook.
- Web interface to manage subscriptions.
- Customizable message format for each feed.
- Choose between Discord embed or plain text.
- Regex filters for RSS feeds.
- Blacklist/whitelist words in the title/description/author/etc.
- Gets extra information from APIs if available, currently for:
- [https://feeds.c3kay.de/](https://feeds.c3kay.de/)
- Genshin Impact News
- Honkai Impact 3rd News
- Honkai Starrail News
- Zenless Zone Zero News
## Installation ## Installation

View File

@ -68,8 +68,18 @@ def replace_tags_in_text_message(entry: Entry) -> str:
first_image: str = get_first_image(summary, content) first_image: str = get_first_image(summary, content)
summary = markdownify(html=summary, strip=["img", "table", "td", "tr", "tbody", "thead"], escape_misc=False) summary = markdownify(
content = markdownify(html=content, strip=["img", "table", "td", "tr", "tbody", "thead"], escape_misc=False) html=summary,
strip=["img", "table", "td", "tr", "tbody", "thead"],
escape_misc=False,
heading_style="ATX",
)
content = markdownify(
html=content,
strip=["img", "table", "td", "tr", "tbody", "thead"],
escape_misc=False,
heading_style="ATX",
)
if "[https://" in content or "[https://www." in content: if "[https://" in content or "[https://www." in content:
content = content.replace("[https://", "[") content = content.replace("[https://", "[")
@ -152,13 +162,6 @@ def get_first_image(summary: str | None, content: str | None) -> str:
logger.warning("Invalid URL: %s", src) logger.warning("Invalid URL: %s", src)
continue continue
# Genshins first image is a divider, so we ignore it.
# https://hyl-static-res-prod.hoyolab.com/divider_config/PC/line_3.png
skip_images: list[str] = [
"https://img-os-static.hoyolab.com/divider_config/",
"https://hyl-static-res-prod.hoyolab.com/divider_config/",
]
if not str(image.attrs["src"]).startswith(tuple(skip_images)):
return str(image.attrs["src"]) return str(image.attrs["src"])
if summary and (images := BeautifulSoup(summary, features="lxml").find_all("img")): if summary and (images := BeautifulSoup(summary, features="lxml").find_all("img")):
for image in images: for image in images:
@ -170,8 +173,6 @@ def get_first_image(summary: str | None, content: str | None) -> str:
logger.warning("Invalid URL: %s", image.attrs["src"]) logger.warning("Invalid URL: %s", image.attrs["src"])
continue continue
# Genshins first image is a divider, so we ignore it.
if not str(image.attrs["src"]).startswith("https://img-os-static.hoyolab.com/divider_config"):
return str(image.attrs["src"]) return str(image.attrs["src"])
return "" return ""
@ -198,8 +199,18 @@ def replace_tags_in_embed(feed: Feed, entry: Entry) -> CustomEmbed:
first_image: str = get_first_image(summary, content) first_image: str = get_first_image(summary, content)
summary = markdownify(html=summary, strip=["img", "table", "td", "tr", "tbody", "thead"], escape_misc=False) summary = markdownify(
content = markdownify(html=content, strip=["img", "table", "td", "tr", "tbody", "thead"], escape_misc=False) html=summary,
strip=["img", "table", "td", "tr", "tbody", "thead"],
escape_misc=False,
heading_style="ATX",
)
content = markdownify(
html=content,
strip=["img", "table", "td", "tr", "tbody", "thead"],
escape_misc=False,
heading_style="ATX",
)
if "[https://" in content or "[https://www." in content: if "[https://" in content or "[https://www." in content:
content = content.replace("[https://", "[") content = content.replace("[https://", "[")

View File

@ -2,9 +2,13 @@ from __future__ import annotations
import datetime import datetime
import logging import logging
import os
import pprint import pprint
from typing import TYPE_CHECKING import re
from typing import TYPE_CHECKING, Any
from urllib.parse import ParseResult, urlparse
import tldextract
from discord_webhook import DiscordEmbed, DiscordWebhook from discord_webhook import DiscordEmbed, DiscordWebhook
from fastapi import HTTPException from fastapi import HTTPException
from reader import Entry, EntryNotFoundError, Feed, FeedExistsError, Reader, ReaderError, StorageError, TagNotFoundError from reader import Entry, EntryNotFoundError, Feed, FeedExistsError, Reader, ReaderError, StorageError, TagNotFoundError
@ -17,6 +21,12 @@ from discord_rss_bot.custom_message import (
) )
from discord_rss_bot.filter.blacklist import entry_should_be_skipped from discord_rss_bot.filter.blacklist import entry_should_be_skipped
from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent
from discord_rss_bot.hoyolab_api import (
create_hoyolab_webhook,
extract_post_id_from_hoyolab_url,
fetch_hoyolab_post,
is_c3kay_feed,
)
from discord_rss_bot.is_url_valid import is_url_valid from discord_rss_bot.is_url_valid import is_url_valid
from discord_rss_bot.missing_tags import add_missing_tags from discord_rss_bot.missing_tags import add_missing_tags
from discord_rss_bot.settings import default_custom_message, get_reader from discord_rss_bot.settings import default_custom_message, get_reader
@ -29,7 +39,56 @@ if TYPE_CHECKING:
logger: logging.Logger = logging.getLogger(__name__) logger: logging.Logger = logging.getLogger(__name__)
def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> str | None: def extract_domain(url: str) -> str: # noqa: PLR0911
"""Extract the domain name from a URL.
Args:
url: The URL to extract the domain from.
Returns:
str: The domain name, formatted for display.
"""
# Check for empty URL first
if not url:
return "Other"
try:
# Special handling for YouTube feeds
if "youtube.com/feeds/videos.xml" in url:
return "YouTube"
# Special handling for Reddit feeds
if "reddit.com" in url or (".rss" in url and "r/" in url):
return "Reddit"
# Parse the URL and extract the domain
parsed_url: ParseResult = urlparse(url)
domain: str = parsed_url.netloc
# If we couldn't extract a domain, return "Other"
if not domain:
return "Other"
# Remove www. prefix if present
domain = re.sub(r"^www\.", "", domain)
# Special handling for common domains
domain_mapping: dict[str, str] = {"github.com": "GitHub"}
if domain in domain_mapping:
return domain_mapping[domain]
# Use tldextract to get the domain (SLD)
ext = tldextract.extract(url)
if ext.domain:
return ext.domain.capitalize()
return domain.capitalize()
except (ValueError, AttributeError, TypeError) as e:
logger.warning("Error extracting domain from %s: %s", url, e)
return "Other"
def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> str | None: # noqa: PLR0912
"""Send a single entry to Discord. """Send a single entry to Discord.
Args: Args:
@ -47,6 +106,24 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) ->
if not webhook_url: if not webhook_url:
return "No webhook URL found." return "No webhook URL found."
# Check if this is a c3kay feed
if is_c3kay_feed(entry.feed.url):
entry_link: str | None = entry.link
if entry_link:
post_id: str | None = extract_post_id_from_hoyolab_url(entry_link)
if post_id:
post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id)
if post_data:
webhook = create_hoyolab_webhook(webhook_url, entry, post_data)
execute_webhook(webhook, entry)
return None
logger.warning(
"Failed to create Hoyolab webhook for feed %s, falling back to regular processing",
entry.feed.url,
)
else:
logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url)
webhook_message: str = "" webhook_message: str = ""
# Try to get the custom message for the feed. If the user has none, we will use the default message. # Try to get the custom message for the feed. If the user has none, we will use the default message.
@ -67,6 +144,10 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) ->
logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url) logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url)
should_send_embed = True should_send_embed = True
# YouTube feeds should never use embeds
if is_youtube_feed(entry.feed.url):
should_send_embed = False
if should_send_embed: if should_send_embed:
webhook = create_embed_webhook(webhook_url, entry) webhook = create_embed_webhook(webhook_url, entry)
else: else:
@ -209,7 +290,7 @@ def set_entry_as_read(reader: Reader, entry: Entry) -> None:
logger.exception("Error setting entry to read: %s", entry.id) logger.exception("Error setting entry to read: %s", entry.id)
def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: # noqa: PLR0912
"""Send entries to Discord. """Send entries to Discord.
If response was not ok, we will log the error and mark the entry as unread, so it will be sent again next time. If response was not ok, we will log the error and mark the entry as unread, so it will be sent again next time.
@ -223,7 +304,10 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non
reader: Reader = get_reader() if custom_reader is None else custom_reader reader: Reader = get_reader() if custom_reader is None else custom_reader
# Check for new entries for every feed. # Check for new entries for every feed.
reader.update_feeds() reader.update_feeds(
scheduled=True,
workers=os.cpu_count() or 1,
)
# Loop through the unread entries. # Loop through the unread entries.
entries: Iterable[Entry] = reader.get_entries(feed=feed, read=False) entries: Iterable[Entry] = reader.get_entries(feed=feed, read=False)
@ -240,6 +324,11 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non
continue continue
should_send_embed: bool = should_send_embed_check(reader, entry) should_send_embed: bool = should_send_embed_check(reader, entry)
# Youtube feeds only need to send the link
if is_youtube_feed(entry.feed.url):
should_send_embed = False
if should_send_embed: if should_send_embed:
webhook = create_embed_webhook(webhook_url, entry) webhook = create_embed_webhook(webhook_url, entry)
else: else:
@ -261,11 +350,27 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non
continue continue
# Check if the feed has a whitelist, and if it does, check if the entry is whitelisted. # Check if the feed has a whitelist, and if it does, check if the entry is whitelisted.
if has_white_tags(reader, entry.feed): if has_white_tags(reader, entry.feed) and not should_be_sent(reader, entry):
if should_be_sent(reader, entry): logger.info("Entry was not whitelisted: %s", entry.id)
continue
# Use a custom webhook for Hoyolab feeds.
if is_c3kay_feed(entry.feed.url):
entry_link: str | None = entry.link
if entry_link:
post_id: str | None = extract_post_id_from_hoyolab_url(entry_link)
if post_id:
post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id)
if post_data:
webhook = create_hoyolab_webhook(webhook_url, entry, post_data)
execute_webhook(webhook, entry) execute_webhook(webhook, entry)
return return
continue logger.warning(
"Failed to create Hoyolab webhook for feed %s, falling back to regular processing",
entry.feed.url,
)
else:
logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url)
# Send the entry to Discord as it is not blacklisted or feed has a whitelist. # Send the entry to Discord as it is not blacklisted or feed has a whitelist.
execute_webhook(webhook, entry) execute_webhook(webhook, entry)
@ -295,6 +400,18 @@ def execute_webhook(webhook: DiscordWebhook, entry: Entry) -> None:
logger.info("Sent entry to Discord: %s", entry.id) logger.info("Sent entry to Discord: %s", entry.id)
def is_youtube_feed(feed_url: str) -> bool:
"""Check if the feed is a YouTube feed.
Args:
feed_url: The feed URL to check.
Returns:
bool: True if the feed is a YouTube feed, False otherwise.
"""
return "youtube.com/feeds/videos.xml" in feed_url
def should_send_embed_check(reader: Reader, entry: Entry) -> bool: def should_send_embed_check(reader: Reader, entry: Entry) -> bool:
"""Check if we should send an embed to Discord. """Check if we should send an embed to Discord.
@ -305,6 +422,10 @@ def should_send_embed_check(reader: Reader, entry: Entry) -> bool:
Returns: Returns:
bool: True if we should send an embed, False otherwise. bool: True if we should send an embed, False otherwise.
""" """
# YouTube feeds should never use embeds - only links
if is_youtube_feed(entry.feed.url):
return False
try: try:
should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed")) should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed"))
except TagNotFoundError: except TagNotFoundError:

View File

@ -2,7 +2,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from discord_rss_bot.filter.utils import is_word_in_text from discord_rss_bot.filter.utils import is_regex_match, is_word_in_text
if TYPE_CHECKING: if TYPE_CHECKING:
from reader import Entry, Feed, Reader from reader import Entry, Feed, Reader
@ -12,9 +12,14 @@ def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool:
"""Return True if the feed has blacklist tags. """Return True if the feed has blacklist tags.
The following tags are checked: The following tags are checked:
- blacklist_title - blacklist_author
- blacklist_content
- blacklist_summary - blacklist_summary
- blacklist_content. - blacklist_title
- regex_blacklist_author
- regex_blacklist_content
- regex_blacklist_summary
- regex_blacklist_title
Args: Args:
custom_reader: The reader. custom_reader: The reader.
@ -23,14 +28,29 @@ def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool:
Returns: Returns:
bool: If the feed has any of the tags. bool: If the feed has any of the tags.
""" """
blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", "")) blacklist_author: str = str(custom_reader.get_tag(feed, "blacklist_author", "")).strip()
blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", "")) blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", "")).strip()
blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", "")) blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", "")).strip()
blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", "")).strip()
return bool(blacklist_title or blacklist_summary or blacklist_content) regex_blacklist_author: str = str(custom_reader.get_tag(feed, "regex_blacklist_author", "")).strip()
regex_blacklist_content: str = str(custom_reader.get_tag(feed, "regex_blacklist_content", "")).strip()
regex_blacklist_summary: str = str(custom_reader.get_tag(feed, "regex_blacklist_summary", "")).strip()
regex_blacklist_title: str = str(custom_reader.get_tag(feed, "regex_blacklist_title", "")).strip()
return bool(
blacklist_title
or blacklist_author
or blacklist_content
or blacklist_summary
or regex_blacklist_author
or regex_blacklist_content
or regex_blacklist_summary
or regex_blacklist_title,
)
def entry_should_be_skipped(custom_reader: Reader, entry: Entry) -> bool: def entry_should_be_skipped(custom_reader: Reader, entry: Entry) -> bool: # noqa: PLR0911
"""Return True if the entry is in the blacklist. """Return True if the entry is in the blacklist.
Args: Args:
@ -40,21 +60,58 @@ def entry_should_be_skipped(custom_reader: Reader, entry: Entry) -> bool:
Returns: Returns:
bool: If the entry is in the blacklist. bool: If the entry is in the blacklist.
""" """
blacklist_title: str = str(custom_reader.get_tag(entry.feed, "blacklist_title", "")) feed = entry.feed
blacklist_summary: str = str(custom_reader.get_tag(entry.feed, "blacklist_summary", ""))
blacklist_content: str = str(custom_reader.get_tag(entry.feed, "blacklist_content", "")) blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", "")).strip()
blacklist_author: str = str(custom_reader.get_tag(entry.feed, "blacklist_author", "")) blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", "")).strip()
blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", "")).strip()
blacklist_author: str = str(custom_reader.get_tag(feed, "blacklist_author", "")).strip()
regex_blacklist_title: str = str(custom_reader.get_tag(feed, "regex_blacklist_title", "")).strip()
regex_blacklist_summary: str = str(custom_reader.get_tag(feed, "regex_blacklist_summary", "")).strip()
regex_blacklist_content: str = str(custom_reader.get_tag(feed, "regex_blacklist_content", "")).strip()
regex_blacklist_author: str = str(custom_reader.get_tag(feed, "regex_blacklist_author", "")).strip()
# TODO(TheLovinator): Also add support for entry_text and more. # TODO(TheLovinator): Also add support for entry_text and more.
# Check regular blacklist
if entry.title and blacklist_title and is_word_in_text(blacklist_title, entry.title): if entry.title and blacklist_title and is_word_in_text(blacklist_title, entry.title):
return True return True
if entry.summary and blacklist_summary and is_word_in_text(blacklist_summary, entry.summary): if entry.summary and blacklist_summary and is_word_in_text(blacklist_summary, entry.summary):
return True return True
if (
entry.content
and entry.content[0].value
and blacklist_content
and is_word_in_text(blacklist_content, entry.content[0].value)
):
return True
if entry.author and blacklist_author and is_word_in_text(blacklist_author, entry.author): if entry.author and blacklist_author and is_word_in_text(blacklist_author, entry.author):
return True return True
if (
entry.content
and entry.content[0].value
and blacklist_content
and is_word_in_text(blacklist_content, entry.content[0].value)
):
return True
# Check regex blacklist
if entry.title and regex_blacklist_title and is_regex_match(regex_blacklist_title, entry.title):
return True
if entry.summary and regex_blacklist_summary and is_regex_match(regex_blacklist_summary, entry.summary):
return True
if (
entry.content
and entry.content[0].value
and regex_blacklist_content
and is_regex_match(regex_blacklist_content, entry.content[0].value)
):
return True
if entry.author and regex_blacklist_author and is_regex_match(regex_blacklist_author, entry.author):
return True
return bool( return bool(
entry.content entry.content
and entry.content[0].value and entry.content[0].value
and blacklist_content and regex_blacklist_content
and is_word_in_text(blacklist_content, entry.content[0].value), and is_regex_match(regex_blacklist_content, entry.content[0].value),
) )

View File

@ -1,7 +1,10 @@
from __future__ import annotations from __future__ import annotations
import logging
import re import re
logger: logging.Logger = logging.getLogger(__name__)
def is_word_in_text(word_string: str, text: str) -> bool: def is_word_in_text(word_string: str, text: str) -> bool:
"""Check if any of the words are in the text. """Check if any of the words are in the text.
@ -20,3 +23,50 @@ def is_word_in_text(word_string: str, text: str) -> bool:
# Check if any pattern matches the text. # Check if any pattern matches the text.
return any(pattern.search(text) for pattern in patterns) return any(pattern.search(text) for pattern in patterns)
def is_regex_match(regex_string: str, text: str) -> bool:
"""Check if any of the regex patterns match the text.
Args:
regex_string: A string containing regex patterns, separated by newlines or commas.
text: The text to search in.
Returns:
bool: True if any regex pattern matches the text, otherwise False.
"""
if not regex_string or not text:
return False
# Split by newlines first, then by commas (for backward compatibility)
regex_list: list[str] = []
# First split by newlines
lines: list[str] = regex_string.split("\n")
for line in lines:
stripped_line: str = line.strip()
if stripped_line:
# For backward compatibility, also split by commas if there are any
if "," in stripped_line:
regex_list.extend([part.strip() for part in stripped_line.split(",") if part.strip()])
else:
regex_list.append(stripped_line)
# Attempt to compile and apply each regex pattern
for pattern_str in regex_list:
if not pattern_str:
logger.warning("Empty regex pattern found in the list.")
continue
try:
pattern: re.Pattern[str] = re.compile(pattern_str, re.IGNORECASE)
if pattern.search(text):
logger.info("Regex pattern matched: %s", pattern_str)
return True
except re.error:
logger.warning("Invalid regex pattern: %s", pattern_str)
continue
logger.info("No regex patterns matched.")
return False

View File

@ -2,7 +2,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from discord_rss_bot.filter.utils import is_word_in_text from discord_rss_bot.filter.utils import is_regex_match, is_word_in_text
if TYPE_CHECKING: if TYPE_CHECKING:
from reader import Entry, Feed, Reader from reader import Entry, Feed, Reader
@ -12,9 +12,14 @@ def has_white_tags(custom_reader: Reader, feed: Feed) -> bool:
"""Return True if the feed has whitelist tags. """Return True if the feed has whitelist tags.
The following tags are checked: The following tags are checked:
- whitelist_title - regex_whitelist_author
- regex_whitelist_content
- regex_whitelist_summary
- regex_whitelist_title
- whitelist_author
- whitelist_content
- whitelist_summary - whitelist_summary
- whitelist_content. - whitelist_title
Args: Args:
custom_reader: The reader. custom_reader: The reader.
@ -23,14 +28,29 @@ def has_white_tags(custom_reader: Reader, feed: Feed) -> bool:
Returns: Returns:
bool: If the feed has any of the tags. bool: If the feed has any of the tags.
""" """
whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")) whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")).strip()
whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")) whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")).strip()
whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")) whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")).strip()
whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", "")).strip()
return bool(whitelist_title or whitelist_summary or whitelist_content) regex_whitelist_title: str = str(custom_reader.get_tag(feed, "regex_whitelist_title", "")).strip()
regex_whitelist_summary: str = str(custom_reader.get_tag(feed, "regex_whitelist_summary", "")).strip()
regex_whitelist_content: str = str(custom_reader.get_tag(feed, "regex_whitelist_content", "")).strip()
regex_whitelist_author: str = str(custom_reader.get_tag(feed, "regex_whitelist_author", "")).strip()
return bool(
whitelist_title
or whitelist_author
or whitelist_content
or whitelist_summary
or regex_whitelist_author
or regex_whitelist_content
or regex_whitelist_summary
or regex_whitelist_title,
)
def should_be_sent(custom_reader: Reader, entry: Entry) -> bool: def should_be_sent(custom_reader: Reader, entry: Entry) -> bool: # noqa: PLR0911
"""Return True if the entry is in the whitelist. """Return True if the entry is in the whitelist.
Args: Args:
@ -41,20 +61,43 @@ def should_be_sent(custom_reader: Reader, entry: Entry) -> bool:
bool: If the entry is in the whitelist. bool: If the entry is in the whitelist.
""" """
feed: Feed = entry.feed feed: Feed = entry.feed
whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")) # Regular whitelist tags
whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")) whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")).strip()
whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")) whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")).strip()
whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", "")) whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")).strip()
whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", "")).strip()
# Regex whitelist tags
regex_whitelist_title: str = str(custom_reader.get_tag(feed, "regex_whitelist_title", "")).strip()
regex_whitelist_summary: str = str(custom_reader.get_tag(feed, "regex_whitelist_summary", "")).strip()
regex_whitelist_content: str = str(custom_reader.get_tag(feed, "regex_whitelist_content", "")).strip()
regex_whitelist_author: str = str(custom_reader.get_tag(feed, "regex_whitelist_author", "")).strip()
# Check regular whitelist
if entry.title and whitelist_title and is_word_in_text(whitelist_title, entry.title): if entry.title and whitelist_title and is_word_in_text(whitelist_title, entry.title):
return True return True
if entry.summary and whitelist_summary and is_word_in_text(whitelist_summary, entry.summary): if entry.summary and whitelist_summary and is_word_in_text(whitelist_summary, entry.summary):
return True return True
if entry.author and whitelist_author and is_word_in_text(whitelist_author, entry.author): if entry.author and whitelist_author and is_word_in_text(whitelist_author, entry.author):
return True return True
return bool( if (
entry.content entry.content
and entry.content[0].value and entry.content[0].value
and whitelist_content and whitelist_content
and is_word_in_text(whitelist_content, entry.content[0].value), and is_word_in_text(whitelist_content, entry.content[0].value)
):
return True
# Check regex whitelist
if entry.title and regex_whitelist_title and is_regex_match(regex_whitelist_title, entry.title):
return True
if entry.summary and regex_whitelist_summary and is_regex_match(regex_whitelist_summary, entry.summary):
return True
if entry.author and regex_whitelist_author and is_regex_match(regex_whitelist_author, entry.author):
return True
return bool(
entry.content
and entry.content[0].value
and regex_whitelist_content
and is_regex_match(regex_whitelist_content, entry.content[0].value),
) )

View File

@ -0,0 +1,193 @@
from __future__ import annotations
import contextlib
import json
import logging
import re
from typing import TYPE_CHECKING, Any
import requests
from discord_webhook import DiscordEmbed, DiscordWebhook
if TYPE_CHECKING:
from reader import Entry
logger: logging.Logger = logging.getLogger(__name__)
def is_c3kay_feed(feed_url: str) -> bool:
"""Check if the feed is from c3kay.de.
Args:
feed_url: The feed URL to check.
Returns:
bool: True if the feed is from c3kay.de, False otherwise.
"""
return "feeds.c3kay.de" in feed_url
def extract_post_id_from_hoyolab_url(url: str) -> str | None:
"""Extract the post ID from a Hoyolab URL.
Args:
url: The Hoyolab URL to extract the post ID from.
For example: https://www.hoyolab.com/article/38588239
Returns:
str | None: The post ID if found, None otherwise.
"""
try:
match: re.Match[str] | None = re.search(r"/article/(\d+)", url)
if match:
return match.group(1)
except (ValueError, AttributeError, TypeError) as e:
logger.warning("Error extracting post ID from Hoyolab URL %s: %s", url, e)
return None
def fetch_hoyolab_post(post_id: str) -> dict[str, Any] | None:
"""Fetch post data from the Hoyolab API.
Args:
post_id: The post ID to fetch.
Returns:
dict[str, Any] | None: The post data if successful, None otherwise.
"""
if not post_id:
return None
http_ok = 200
try:
url: str = f"https://bbs-api-os.hoyolab.com/community/post/wapi/getPostFull?post_id={post_id}"
response: requests.Response = requests.get(url, timeout=10)
if response.status_code == http_ok:
data: dict[str, Any] = response.json()
if data.get("retcode") == 0 and "data" in data and "post" in data["data"]:
return data["data"]["post"]
logger.warning("Failed to fetch Hoyolab post %s: %s", post_id, response.text)
except (requests.RequestException, ValueError):
logger.exception("Error fetching Hoyolab post %s", post_id)
return None
def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: dict[str, Any]) -> DiscordWebhook: # noqa: C901, PLR0912, PLR0914, PLR0915
"""Create a webhook with data from the Hoyolab API.
Args:
webhook_url: The webhook URL.
entry: The entry to send to Discord.
post_data: The post data from the Hoyolab API.
Returns:
DiscordWebhook: The webhook with the embed.
"""
entry_link: str = entry.link or entry.feed.url
webhook = DiscordWebhook(url=webhook_url, rate_limit_retry=True)
# Extract relevant data from the post
post: dict[str, Any] = post_data.get("post", {})
subject: str = post.get("subject", "")
content: str = post.get("content", "{}")
logger.debug("Post subject: %s", subject)
logger.debug("Post content: %s", content)
content_data: dict[str, str] = {}
with contextlib.suppress(json.JSONDecodeError, ValueError):
content_data = json.loads(content)
logger.debug("Content data: %s", content_data)
description: str = content_data.get("describe", "")
if not description:
description = post.get("desc", "")
# Create the embed
discord_embed = DiscordEmbed()
# Set title and description
discord_embed.set_title(subject)
discord_embed.set_url(entry_link)
# Get post.image_list
image_list: list[dict[str, Any]] = post_data.get("image_list", [])
if image_list:
image_url: str = str(image_list[0].get("url", ""))
image_height: int = int(image_list[0].get("height", 1080))
image_width: int = int(image_list[0].get("width", 1920))
logger.debug("Image URL: %s, Height: %s, Width: %s", image_url, image_height, image_width)
discord_embed.set_image(url=image_url, height=image_height, width=image_width)
video: dict[str, str | int | bool] = post_data.get("video", {})
if video and video.get("url"):
video_url: str = str(video.get("url", ""))
logger.debug("Video URL: %s", video_url)
with contextlib.suppress(requests.RequestException):
video_response: requests.Response = requests.get(video_url, stream=True, timeout=10)
if video_response.ok:
webhook.add_file(
file=video_response.content,
filename=f"{entry.id}.mp4",
)
game = post_data.get("game", {})
if game and game.get("color"):
game_color = str(game.get("color", ""))
discord_embed.set_color(game_color.removeprefix("#"))
user: dict[str, str | int | bool] = post_data.get("user", {})
author_name: str = str(user.get("nickname", ""))
avatar_url: str = str(user.get("avatar_url", ""))
if author_name:
webhook.avatar_url = avatar_url
webhook.username = author_name
classification = post_data.get("classification", {})
if classification and classification.get("name"):
footer = str(classification.get("name", ""))
discord_embed.set_footer(text=footer)
webhook.add_embed(discord_embed)
# Only show Youtube URL if available
structured_content: str = post.get("structured_content", "")
if structured_content: # noqa: PLR1702
try:
structured_content_data: list[dict[str, Any]] = json.loads(structured_content)
for item in structured_content_data:
if item.get("insert") and isinstance(item["insert"], dict):
video_url: str = str(item["insert"].get("video", ""))
if video_url:
video_id_match: re.Match[str] | None = re.search(r"embed/([a-zA-Z0-9_-]+)", video_url)
if video_id_match:
video_id: str = video_id_match.group(1)
logger.debug("Video ID: %s", video_id)
webhook.content = f"https://www.youtube.com/watch?v={video_id}"
webhook.remove_embeds()
except (json.JSONDecodeError, ValueError) as e:
logger.warning("Error parsing structured content: %s", e)
event_start_date: str = post.get("event_start_date", "")
if event_start_date and event_start_date != "0":
discord_embed.add_embed_field(name="Start", value=f"<t:{event_start_date}:R>")
event_end_date: str = post.get("event_end_date", "")
if event_end_date and event_end_date != "0":
discord_embed.add_embed_field(name="End", value=f"<t:{event_end_date}:R>")
created_at: str = post.get("created_at", "")
if created_at and created_at != "0":
discord_embed.set_timestamp(timestamp=created_at)
return webhook

View File

@ -37,13 +37,13 @@ from discord_rss_bot.custom_message import (
replace_tags_in_text_message, replace_tags_in_text_message,
save_embed, save_embed,
) )
from discord_rss_bot.feeds import create_feed, send_entry_to_discord, send_to_discord from discord_rss_bot.feeds import create_feed, extract_domain, send_entry_to_discord, send_to_discord
from discord_rss_bot.missing_tags import add_missing_tags from discord_rss_bot.missing_tags import add_missing_tags
from discord_rss_bot.search import create_html_for_search_results from discord_rss_bot.search import create_html_for_search_results
from discord_rss_bot.settings import get_reader from discord_rss_bot.settings import get_reader
if TYPE_CHECKING: if TYPE_CHECKING:
from collections.abc import Iterable from collections.abc import AsyncGenerator, Iterable
from reader.types import JSONType from reader.types import JSONType
@ -88,14 +88,21 @@ reader: Reader = get_reader()
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI) -> typing.AsyncGenerator[None]: async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
"""This is needed for the ASGI server to run.""" """Lifespan for the FastAPI app.
Args:
app: The FastAPI app.
Yields:
None: Nothing.
"""
add_missing_tags(reader) add_missing_tags(reader)
scheduler: AsyncIOScheduler = AsyncIOScheduler() scheduler: AsyncIOScheduler = AsyncIOScheduler()
# Update all feeds every 15 minutes. # Run job every minute to check for new entries. Feeds will be checked every 15 minutes.
# TODO(TheLovinator): Make this configurable. # TODO(TheLovinator): Make this configurable.
scheduler.add_job(send_to_discord, "interval", minutes=15, next_run_time=datetime.now(tz=UTC)) scheduler.add_job(send_to_discord, "interval", minutes=1, next_run_time=datetime.now(tz=UTC))
scheduler.start() scheduler.start()
logger.info("Scheduler started.") logger.info("Scheduler started.")
yield yield
@ -250,6 +257,10 @@ async def post_set_whitelist(
whitelist_summary: Annotated[str, Form()] = "", whitelist_summary: Annotated[str, Form()] = "",
whitelist_content: Annotated[str, Form()] = "", whitelist_content: Annotated[str, Form()] = "",
whitelist_author: Annotated[str, Form()] = "", whitelist_author: Annotated[str, Form()] = "",
regex_whitelist_title: Annotated[str, Form()] = "",
regex_whitelist_summary: Annotated[str, Form()] = "",
regex_whitelist_content: Annotated[str, Form()] = "",
regex_whitelist_author: Annotated[str, Form()] = "",
feed_url: Annotated[str, Form()] = "", feed_url: Annotated[str, Form()] = "",
) -> RedirectResponse: ) -> RedirectResponse:
"""Set what the whitelist should be sent, if you have this set only words in the whitelist will be sent. """Set what the whitelist should be sent, if you have this set only words in the whitelist will be sent.
@ -259,6 +270,10 @@ async def post_set_whitelist(
whitelist_summary: Whitelisted words for when checking the summary. whitelist_summary: Whitelisted words for when checking the summary.
whitelist_content: Whitelisted words for when checking the content. whitelist_content: Whitelisted words for when checking the content.
whitelist_author: Whitelisted words for when checking the author. whitelist_author: Whitelisted words for when checking the author.
regex_whitelist_title: Whitelisted regex for when checking the title.
regex_whitelist_summary: Whitelisted regex for when checking the summary.
regex_whitelist_content: Whitelisted regex for when checking the content.
regex_whitelist_author: Whitelisted regex for when checking the author.
feed_url: The feed we should set the whitelist for. feed_url: The feed we should set the whitelist for.
Returns: Returns:
@ -269,6 +284,10 @@ async def post_set_whitelist(
reader.set_tag(clean_feed_url, "whitelist_summary", whitelist_summary) # pyright: ignore[reportArgumentType][call-overload] reader.set_tag(clean_feed_url, "whitelist_summary", whitelist_summary) # pyright: ignore[reportArgumentType][call-overload]
reader.set_tag(clean_feed_url, "whitelist_content", whitelist_content) # pyright: ignore[reportArgumentType][call-overload] reader.set_tag(clean_feed_url, "whitelist_content", whitelist_content) # pyright: ignore[reportArgumentType][call-overload]
reader.set_tag(clean_feed_url, "whitelist_author", whitelist_author) # pyright: ignore[reportArgumentType][call-overload] reader.set_tag(clean_feed_url, "whitelist_author", whitelist_author) # pyright: ignore[reportArgumentType][call-overload]
reader.set_tag(clean_feed_url, "regex_whitelist_title", regex_whitelist_title) # pyright: ignore[reportArgumentType][call-overload]
reader.set_tag(clean_feed_url, "regex_whitelist_summary", regex_whitelist_summary) # pyright: ignore[reportArgumentType][call-overload]
reader.set_tag(clean_feed_url, "regex_whitelist_content", regex_whitelist_content) # pyright: ignore[reportArgumentType][call-overload]
reader.set_tag(clean_feed_url, "regex_whitelist_author", regex_whitelist_author) # pyright: ignore[reportArgumentType][call-overload]
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@ -287,11 +306,14 @@ async def get_whitelist(feed_url: str, request: Request):
clean_feed_url: str = feed_url.strip() clean_feed_url: str = feed_url.strip()
feed: Feed = reader.get_feed(urllib.parse.unquote(clean_feed_url)) feed: Feed = reader.get_feed(urllib.parse.unquote(clean_feed_url))
# Get previous data, this is used when creating the form.
whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", "")) whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", ""))
whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", "")) whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", ""))
whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", "")) whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", ""))
whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", "")) whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", ""))
regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", ""))
regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", ""))
regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", ""))
regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", ""))
context = { context = {
"request": request, "request": request,
@ -300,6 +322,10 @@ async def get_whitelist(feed_url: str, request: Request):
"whitelist_summary": whitelist_summary, "whitelist_summary": whitelist_summary,
"whitelist_content": whitelist_content, "whitelist_content": whitelist_content,
"whitelist_author": whitelist_author, "whitelist_author": whitelist_author,
"regex_whitelist_title": regex_whitelist_title,
"regex_whitelist_summary": regex_whitelist_summary,
"regex_whitelist_content": regex_whitelist_content,
"regex_whitelist_author": regex_whitelist_author,
} }
return templates.TemplateResponse(request=request, name="whitelist.html", context=context) return templates.TemplateResponse(request=request, name="whitelist.html", context=context)
@ -310,6 +336,10 @@ async def post_set_blacklist(
blacklist_summary: Annotated[str, Form()] = "", blacklist_summary: Annotated[str, Form()] = "",
blacklist_content: Annotated[str, Form()] = "", blacklist_content: Annotated[str, Form()] = "",
blacklist_author: Annotated[str, Form()] = "", blacklist_author: Annotated[str, Form()] = "",
regex_blacklist_title: Annotated[str, Form()] = "",
regex_blacklist_summary: Annotated[str, Form()] = "",
regex_blacklist_content: Annotated[str, Form()] = "",
regex_blacklist_author: Annotated[str, Form()] = "",
feed_url: Annotated[str, Form()] = "", feed_url: Annotated[str, Form()] = "",
) -> RedirectResponse: ) -> RedirectResponse:
"""Set the blacklist. """Set the blacklist.
@ -322,6 +352,10 @@ async def post_set_blacklist(
blacklist_summary: Blacklisted words for when checking the summary. blacklist_summary: Blacklisted words for when checking the summary.
blacklist_content: Blacklisted words for when checking the content. blacklist_content: Blacklisted words for when checking the content.
blacklist_author: Blacklisted words for when checking the author. blacklist_author: Blacklisted words for when checking the author.
regex_blacklist_title: Blacklisted regex for when checking the title.
regex_blacklist_summary: Blacklisted regex for when checking the summary.
regex_blacklist_content: Blacklisted regex for when checking the content.
regex_blacklist_author: Blacklisted regex for when checking the author.
feed_url: What feed we should set the blacklist for. feed_url: What feed we should set the blacklist for.
Returns: Returns:
@ -332,7 +366,10 @@ async def post_set_blacklist(
reader.set_tag(clean_feed_url, "blacklist_summary", blacklist_summary) # pyright: ignore[reportArgumentType][call-overload] reader.set_tag(clean_feed_url, "blacklist_summary", blacklist_summary) # pyright: ignore[reportArgumentType][call-overload]
reader.set_tag(clean_feed_url, "blacklist_content", blacklist_content) # pyright: ignore[reportArgumentType][call-overload] reader.set_tag(clean_feed_url, "blacklist_content", blacklist_content) # pyright: ignore[reportArgumentType][call-overload]
reader.set_tag(clean_feed_url, "blacklist_author", blacklist_author) # pyright: ignore[reportArgumentType][call-overload] reader.set_tag(clean_feed_url, "blacklist_author", blacklist_author) # pyright: ignore[reportArgumentType][call-overload]
reader.set_tag(clean_feed_url, "regex_blacklist_title", regex_blacklist_title) # pyright: ignore[reportArgumentType][call-overload]
reader.set_tag(clean_feed_url, "regex_blacklist_summary", regex_blacklist_summary) # pyright: ignore[reportArgumentType][call-overload]
reader.set_tag(clean_feed_url, "regex_blacklist_content", regex_blacklist_content) # pyright: ignore[reportArgumentType][call-overload]
reader.set_tag(clean_feed_url, "regex_blacklist_author", regex_blacklist_author) # pyright: ignore[reportArgumentType][call-overload]
return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303)
@ -349,11 +386,14 @@ async def get_blacklist(feed_url: str, request: Request):
""" """
feed: Feed = reader.get_feed(urllib.parse.unquote(feed_url)) feed: Feed = reader.get_feed(urllib.parse.unquote(feed_url))
# Get previous data, this is used when creating the form.
blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", "")) blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", ""))
blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", "")) blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", ""))
blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", "")) blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", ""))
blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", "")) blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", ""))
regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", ""))
regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", ""))
regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", ""))
regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", ""))
context = { context = {
"request": request, "request": request,
@ -362,6 +402,10 @@ async def get_blacklist(feed_url: str, request: Request):
"blacklist_summary": blacklist_summary, "blacklist_summary": blacklist_summary,
"blacklist_content": blacklist_content, "blacklist_content": blacklist_content,
"blacklist_author": blacklist_author, "blacklist_author": blacklist_author,
"regex_blacklist_title": regex_blacklist_title,
"regex_blacklist_summary": regex_blacklist_summary,
"regex_blacklist_content": regex_blacklist_content,
"regex_blacklist_author": regex_blacklist_author,
} }
return templates.TemplateResponse(request=request, name="blacklist.html", context=context) return templates.TemplateResponse(request=request, name="blacklist.html", context=context)
@ -461,7 +505,7 @@ async def get_embed_page(feed_url: str, request: Request):
@app.post("/embed", response_class=HTMLResponse) @app.post("/embed", response_class=HTMLResponse)
async def post_embed( # noqa: PLR0913, PLR0917 async def post_embed(
feed_url: Annotated[str, Form()], feed_url: Annotated[str, Form()],
title: Annotated[str, Form()] = "", title: Annotated[str, Form()] = "",
description: Annotated[str, Form()] = "", description: Annotated[str, Form()] = "",
@ -688,6 +732,27 @@ def create_html_for_feed(entries: Iterable[Entry]) -> str:
entry_id: str = urllib.parse.quote(entry.id) entry_id: str = urllib.parse.quote(entry.id)
to_discord_html: str = f"<a class='text-muted' href='/post_entry?entry_id={entry_id}'>Send to Discord</a>" to_discord_html: str = f"<a class='text-muted' href='/post_entry?entry_id={entry_id}'>Send to Discord</a>"
# Check if this is a YouTube feed entry and the entry has a link
is_youtube_feed = "youtube.com/feeds/videos.xml" in entry.feed.url
video_embed_html = ""
if is_youtube_feed and entry.link:
# Extract the video ID and create an embed if possible
video_id: str | None = extract_youtube_video_id(entry.link)
if video_id:
video_embed_html: str = f"""
<div class="ratio ratio-16x9 mt-3 mb-3">
<iframe src="https://www.youtube.com/embed/{video_id}"
title="{entry.title}"
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture"
allowfullscreen>
</iframe>
</div>
"""
# Don't use the first image if we have a video embed
first_image = ""
image_html: str = f"<img src='{first_image}' class='img-fluid'>" if first_image else "" image_html: str = f"<img src='{first_image}' class='img-fluid'>" if first_image else ""
html += f"""<div class="p-2 mb-2 border border-dark"> html += f"""<div class="p-2 mb-2 border border-dark">
@ -695,6 +760,7 @@ def create_html_for_feed(entries: Iterable[Entry]) -> str:
{f"By {entry.author} @" if entry.author else ""}{published} - {to_discord_html} {f"By {entry.author} @" if entry.author else ""}{published} - {to_discord_html}
{text} {text}
{video_embed_html}
{image_html} {image_html}
</div> </div>
""" """
@ -809,11 +875,12 @@ def make_context_index(request: Request):
broken_feeds = [] broken_feeds = []
feeds_without_attached_webhook = [] feeds_without_attached_webhook = []
# Get all feeds and organize them
feeds: Iterable[Feed] = reader.get_feeds() feeds: Iterable[Feed] = reader.get_feeds()
for feed in feeds: for feed in feeds:
try: try:
webhook = reader.get_tag(feed.url, "webhook") webhook = reader.get_tag(feed.url, "webhook")
feed_list.append({"feed": feed, "webhook": webhook}) feed_list.append({"feed": feed, "webhook": webhook, "domain": extract_domain(feed.url)})
except TagNotFoundError: except TagNotFoundError:
broken_feeds.append(feed) broken_feeds.append(feed)
continue continue
@ -854,6 +921,29 @@ async def remove_feed(feed_url: Annotated[str, Form()]):
return RedirectResponse(url="/", status_code=303) return RedirectResponse(url="/", status_code=303)
@app.get("/update", response_class=HTMLResponse)
async def update_feed(request: Request, feed_url: str):
"""Update a feed.
Args:
request: The request object.
feed_url: The feed URL to update.
Raises:
HTTPException: If the feed is not found.
Returns:
RedirectResponse: Redirect to the feed page.
"""
try:
reader.update_feed(urllib.parse.unquote(feed_url))
except FeedNotFoundError as e:
raise HTTPException(status_code=404, detail="Feed not found") from e
logger.info("Manually updated feed: %s", feed_url)
return RedirectResponse(url="/feed?feed_url=" + urllib.parse.quote(feed_url), status_code=303)
@app.get("/search", response_class=HTMLResponse) @app.get("/search", response_class=HTMLResponse)
async def search(request: Request, query: str): async def search(request: Request, query: str):
"""Get entries matching a full-text search query. """Get entries matching a full-text search query.
@ -947,6 +1037,29 @@ def modify_webhook(old_hook: Annotated[str, Form()], new_hook: Annotated[str, Fo
return RedirectResponse(url="/webhooks", status_code=303) return RedirectResponse(url="/webhooks", status_code=303)
def extract_youtube_video_id(url: str) -> str | None:
"""Extract YouTube video ID from a YouTube video URL.
Args:
url: The YouTube video URL.
Returns:
The video ID if found, None otherwise.
"""
if not url:
return None
# Handle standard YouTube URLs (youtube.com/watch?v=VIDEO_ID)
if "youtube.com/watch" in url and "v=" in url:
return url.split("v=")[1].split("&")[0]
# Handle shortened YouTube URLs (youtu.be/VIDEO_ID)
if "youtu.be/" in url:
return url.split("youtu.be/")[1].split("?")[0]
return None
if __name__ == "__main__": if __name__ == "__main__":
sentry_sdk.init( sentry_sdk.init(
dsn="https://6e77a0d7acb9c7ea22e85a375e0ff1f4@o4505228040339456.ingest.us.sentry.io/4508792887967744", dsn="https://6e77a0d7acb9c7ea22e85a375e0ff1f4@o4505228040339456.ingest.us.sentry.io/4508792887967744",

View File

@ -24,7 +24,7 @@ default_custom_embed: dict[str, str] = {
} }
@lru_cache @lru_cache(maxsize=1)
def get_reader(custom_location: Path | None = None) -> Reader: def get_reader(custom_location: Path | None = None) -> Reader:
"""Get the reader. """Get the reader.
@ -35,5 +35,10 @@ def get_reader(custom_location: Path | None = None) -> Reader:
The reader. The reader.
""" """
db_location: Path = custom_location or Path(data_dir) / "db.sqlite" db_location: Path = custom_location or Path(data_dir) / "db.sqlite"
reader: Reader = make_reader(url=str(db_location))
return make_reader(url=str(db_location)) # https://reader.readthedocs.io/en/latest/api.html#reader.types.UpdateConfig
# Set the update interval to 15 minutes
reader.set_tag((), ".reader.update", {"interval": 15})
return reader

View File

@ -42,6 +42,49 @@
<label for="blacklist_author" class="col-sm-6 col-form-label">Blacklist - Author</label> <label for="blacklist_author" class="col-sm-6 col-form-label">Blacklist - Author</label>
<input name="blacklist_author" type="text" class="form-control bg-dark border-dark text-muted" <input name="blacklist_author" type="text" class="form-control bg-dark border-dark text-muted"
id="blacklist_author" value="{%- if blacklist_author -%}{{ blacklist_author }}{%- endif -%}" /> id="blacklist_author" value="{%- if blacklist_author -%}{{ blacklist_author }}{%- endif -%}" />
<div class="mt-4">
<div class="form-text">
<ul class="list-inline">
<li>
Regular expression patterns for advanced filtering. Each pattern should be on a new
line.
</li>
<li>Patterns are case-insensitive.</li>
<li>
Examples:
<code>
<pre>
^New Release:.*
\b(update|version|patch)\s+\d+\.\d+
.*\[(important|notice)\].*
</pre>
</code>
</li>
</ul>
</div>
<label for="regex_blacklist_title" class="col-sm-6 col-form-label">Regex Blacklist - Title</label>
<textarea name="regex_blacklist_title" class="form-control bg-dark border-dark text-muted"
id="regex_blacklist_title"
rows="3">{%- if regex_blacklist_title -%}{{ regex_blacklist_title }}{%- endif -%}</textarea>
<label for="regex_blacklist_summary" class="col-sm-6 col-form-label">Regex Blacklist -
Summary</label>
<textarea name="regex_blacklist_summary" class="form-control bg-dark border-dark text-muted"
id="regex_blacklist_summary"
rows="3">{%- if regex_blacklist_summary -%}{{ regex_blacklist_summary }}{%- endif -%}</textarea>
<label for="regex_blacklist_content" class="col-sm-6 col-form-label">Regex Blacklist -
Content</label>
<textarea name="regex_blacklist_content" class="form-control bg-dark border-dark text-muted"
id="regex_blacklist_content"
rows="3">{%- if regex_blacklist_content -%}{{ regex_blacklist_content }}{%- endif -%}</textarea>
<label for="regex_blacklist_author" class="col-sm-6 col-form-label">Regex Blacklist - Author</label>
<textarea name="regex_blacklist_author" class="form-control bg-dark border-dark text-muted"
id="regex_blacklist_author"
rows="3">{%- if regex_blacklist_author -%}{{ regex_blacklist_author }}{%- endif -%}</textarea>
</div>
</div> </div>
</div> </div>
<!-- Add a hidden feed_url field to the form --> <!-- Add a hidden feed_url field to the form -->

View File

@ -28,6 +28,8 @@
<!-- Feed Actions --> <!-- Feed Actions -->
<div class="mt-3 d-flex flex-wrap gap-2"> <div class="mt-3 d-flex flex-wrap gap-2">
<a href="/update?feed_url={{ feed.url|encode_url }}" class="btn btn-primary btn-sm">Update</a>
<form action="/remove" method="post" class="d-inline"> <form action="/remove" method="post" class="d-inline">
<button class="btn btn-danger btn-sm" name="feed_url" value="{{ feed.url }}" <button class="btn btn-danger btn-sm" name="feed_url" value="{{ feed.url }}"
onclick="return confirm('Are you sure you want to delete this feed?')">Remove</button> onclick="return confirm('Are you sure you want to delete this feed?')">Remove</button>
@ -43,6 +45,7 @@
</form> </form>
{% endif %} {% endif %}
{% if not "youtube.com/feeds/videos.xml" in feed.url %}
{% if should_send_embed %} {% if should_send_embed %}
<form action="/use_text" method="post" class="d-inline"> <form action="/use_text" method="post" class="d-inline">
<button class="btn btn-dark btn-sm" name="feed_url" value="{{ feed.url }}"> <button class="btn btn-dark btn-sm" name="feed_url" value="{{ feed.url }}">
@ -56,6 +59,7 @@
</button> </button>
</form> </form>
{% endif %} {% endif %}
{% endif %}
</div> </div>
<!-- Additional Links --> <!-- Additional Links -->
@ -65,9 +69,11 @@
<a class="text-muted d-block" href="/custom?feed_url={{ feed.url|encode_url }}"> <a class="text-muted d-block" href="/custom?feed_url={{ feed.url|encode_url }}">
Customize message {% if not should_send_embed %}(Currently active){% endif %} Customize message {% if not should_send_embed %}(Currently active){% endif %}
</a> </a>
{% if not "youtube.com/feeds/videos.xml" in feed.url %}
<a class="text-muted d-block" href="/embed?feed_url={{ feed.url|encode_url }}"> <a class="text-muted d-block" href="/embed?feed_url={{ feed.url|encode_url }}">
Customize embed {% if should_send_embed %}(Currently active){% endif %} Customize embed {% if should_send_embed %}(Currently active){% endif %}
</a> </a>
{% endif %}
</div> </div>
</div> </div>

View File

@ -28,32 +28,66 @@
{{ entry_count.averages[2]|round(1) }}) {{ entry_count.averages[2]|round(1) }})
</abbr> </abbr>
</p> </p>
<!-- Loop through the webhooks and add the feeds connected to them. -->
<!-- Loop through the webhooks and add the feeds grouped by domain -->
{% for hook_from_context in webhooks %} {% for hook_from_context in webhooks %}
<div class="p-2 mb-2 border border-dark"> <div class="p-2 mb-3 border border-dark">
<h2 class="h5"> <h2 class="h5 mb-3">
<a class="text-muted" href="/webhooks">{{ hook_from_context.name }}</a> <a class="text-muted" href="/webhooks">{{ hook_from_context.name }}</a>
</h2> </h2>
<ul class="list-group">
<!-- Group feeds by domain within each webhook -->
{% set feeds_for_hook = [] %}
{% for feed_webhook in feeds %} {% for feed_webhook in feeds %}
{% set feed = feed_webhook["feed"] %} {% if hook_from_context.url == feed_webhook.webhook %}
{% set hook_from_feed = feed_webhook["webhook"] %} {% set _ = feeds_for_hook.append(feed_webhook) %}
{% if hook_from_context.url == hook_from_feed %} {% endif %}
<div> {% endfor %}
<a class="text-muted" href="/feed?feed_url={{ feed.url|encode_url }}">{{ feed.url }}</a>
{% if feeds_for_hook %}
<!-- Create a dictionary to hold feeds grouped by domain -->
{% set domains = {} %}
{% for feed_item in feeds_for_hook %}
{% set feed = feed_item.feed %}
{% set domain = feed_item.domain %}
{% if domain not in domains %}
{% set _ = domains.update({domain: []}) %}
{% endif %}
{% set _ = domains[domain].append(feed) %}
{% endfor %}
<!-- Display domains and their feeds -->
{% for domain, domain_feeds in domains.items() %}
<div class="card bg-dark border border-dark mb-2">
<div class="card-header">
<h3 class="h6 mb-0 text-white-50">{{ domain }} ({{ domain_feeds|length }})</h3>
</div>
<div class="card-body p-2">
<ul class="list-group list-unstyled mb-0">
{% for feed in domain_feeds %}
<li>
<a class="text-muted" href="/feed?feed_url={{ feed.url|encode_url }}">
{% if feed.title %}{{ feed.title }}{% else %}{{ feed.url }}{% endif %}
</a>
{% if not feed.updates_enabled %}<span class="text-warning">Disabled</span>{% endif %} {% if not feed.updates_enabled %}<span class="text-warning">Disabled</span>{% endif %}
{% if feed.last_exception %}<span {% if feed.last_exception %}<span
class="text-danger">({{ feed.last_exception.value_str }})</span>{% endif %} class="text-danger">({{ feed.last_exception.value_str }})</span>{% endif %}
</div> </li>
{% endif %}
{% endfor %} {% endfor %}
</ul> </ul>
</div> </div>
</div>
{% endfor %}
{% else %}
<p class="text-muted">No feeds associated with this webhook.</p>
{% endif %}
</div>
{% endfor %} {% endfor %}
{% else %} {% else %}
<p> <p>
Hello there! Hello there!
<br> <br>
<br>
You need to add a webhook <a class="text-muted" href="/add_webhook">here</a> to get started. After that, you can You need to add a webhook <a class="text-muted" href="/add_webhook">here</a> to get started. After that, you can
add feeds <a class="text-muted" href="/add">here</a>. You can find both of these links in the navigation bar add feeds <a class="text-muted" href="/add">here</a>. You can find both of these links in the navigation bar
above. above.
@ -66,24 +100,52 @@
Thanks! Thanks!
</p> </p>
{% endif %} {% endif %}
<!-- Show feeds without webhooks --> <!-- Show feeds without webhooks -->
{% if broken_feeds %} {% if broken_feeds %}
<div class="p-2 mb-2 border border-dark"> <div class="p-2 mb-2 border border-dark">
<ul class="list-group text-danger"> <ul class="list-group text-danger">
Feeds without webhook: Feeds without webhook:
{% for broken_feed in broken_feeds %} {% for broken_feed in broken_feeds %}
<a class="text-muted" href="/feed?feed_url={{ broken_feed.url|encode_url }}">{{ broken_feed.url }}</a> <a class="text-muted" href="/feed?feed_url={{ broken_feed.url|encode_url }}">
{# Display username@youtube for YouTube feeds #}
{% if "youtube.com/feeds/videos.xml" in broken_feed.url %}
{% if "user=" in broken_feed.url %}
{{ broken_feed.url.split("user=")[1] }}@youtube
{% elif "channel_id=" in broken_feed.url %}
{{ broken_feed.title if broken_feed.title else broken_feed.url.split("channel_id=")[1] }}@youtube
{% else %}
{{ broken_feed.url }}
{% endif %}
{% else %}
{{ broken_feed.url }}
{% endif %}
</a>
{% endfor %} {% endfor %}
</ul> </ul>
</div> </div>
{% endif %} {% endif %}
<!-- Show feeds that has no attached webhook --> <!-- Show feeds that has no attached webhook -->
{% if feeds_without_attached_webhook %} {% if feeds_without_attached_webhook %}
<div class="p-2 mb-2 border border-dark"> <div class="p-2 mb-2 border border-dark">
<ul class="list-group text-danger"> <ul class="list-group text-danger">
Feeds without attached webhook: Feeds without attached webhook:
{% for feed in feeds_without_attached_webhook %} {% for feed in feeds_without_attached_webhook %}
<a class="text-muted" href="/feed?feed_url={{ feed.url|encode_url }}">{{ feed.url }}</a> <a class="text-muted" href="/feed?feed_url={{ feed.url|encode_url }}">
{# Display username@youtube for YouTube feeds #}
{% if "youtube.com/feeds/videos.xml" in feed.url %}
{% if "user=" in feed.url %}
{{ feed.url.split("user=")[1] }}@youtube
{% elif "channel_id=" in feed.url %}
{{ feed.title if feed.title else feed.url.split("channel_id=")[1] }}@youtube
{% else %}
{{ feed.url }}
{% endif %}
{% else %}
{{ feed.url }}
{% endif %}
</a>
{% endfor %} {% endfor %}
</ul> </ul>
</div> </div>

View File

@ -1,6 +1,6 @@
{% extends "base.html" %} {% extends "base.html" %}
{% block title %} {% block title %}
| Blacklist | Whitelist
{% endblock title %} {% endblock title %}
{% block content %} {% block content %}
<div class="p-2 border border-dark"> <div class="p-2 border border-dark">
@ -42,6 +42,49 @@
<label for="whitelist_author" class="col-sm-6 col-form-label">Whitelist - Author</label> <label for="whitelist_author" class="col-sm-6 col-form-label">Whitelist - Author</label>
<input name="whitelist_author" type="text" class="form-control bg-dark border-dark text-muted" <input name="whitelist_author" type="text" class="form-control bg-dark border-dark text-muted"
id="whitelist_author" value="{%- if whitelist_author -%} {{ whitelist_author }} {%- endif -%}" /> id="whitelist_author" value="{%- if whitelist_author -%} {{ whitelist_author }} {%- endif -%}" />
<div class="mt-4">
<div class="form-text">
<ul class="list-inline">
<li>
Regular expression patterns for advanced filtering. Each pattern should be on a new
line.
</li>
<li>Patterns are case-insensitive.</li>
<li>
Examples:
<code>
<pre>
^New Release:.*
\b(update|version|patch)\s+\d+\.\d+
.*\[(important|notice)\].*
</pre>
</code>
</li>
</ul>
</div>
<label for="regex_whitelist_title" class="col-sm-6 col-form-label">Regex Whitelist - Title</label>
<textarea name="regex_whitelist_title" class="form-control bg-dark border-dark text-muted"
id="regex_whitelist_title"
rows="3">{%- if regex_whitelist_title -%}{{ regex_whitelist_title }}{%- endif -%}</textarea>
<label for="regex_whitelist_summary" class="col-sm-6 col-form-label">Regex Whitelist -
Summary</label>
<textarea name="regex_whitelist_summary" class="form-control bg-dark border-dark text-muted"
id="regex_whitelist_summary"
rows="3">{%- if regex_whitelist_summary -%}{{ regex_whitelist_summary }}{%- endif -%}</textarea>
<label for="regex_whitelist_content" class="col-sm-6 col-form-label">Regex Whitelist -
Content</label>
<textarea name="regex_whitelist_content" class="form-control bg-dark border-dark text-muted"
id="regex_whitelist_content"
rows="3">{%- if regex_whitelist_content -%}{{ regex_whitelist_content }}{%- endif -%}</textarea>
<label for="regex_whitelist_author" class="col-sm-6 col-form-label">Regex Whitelist - Author</label>
<textarea name="regex_whitelist_author" class="form-control bg-dark border-dark text-muted"
id="regex_whitelist_author"
rows="3">{%- if regex_whitelist_author -%}{{ regex_whitelist_author }}{%- endif -%}</textarea>
</div>
</div> </div>
</div> </div>
<!-- Add a hidden feed_url field to the form --> <!-- Add a hidden feed_url field to the form -->

View File

@ -10,7 +10,7 @@ services:
# - /Docker/Bots/discord-rss-bot:/home/botuser/.local/share/discord_rss_bot/ # - /Docker/Bots/discord-rss-bot:/home/botuser/.local/share/discord_rss_bot/
- data:/home/botuser/.local/share/discord_rss_bot/ - data:/home/botuser/.local/share/discord_rss_bot/
healthcheck: healthcheck:
test: ["CMD", "python", "discord_rss_bot/healthcheck.py"] test: [ "CMD", "uv", "run", "./discord_rss_bot/healthcheck.py" ]
interval: 1m interval: 1m
timeout: 10s timeout: 10s
retries: 3 retries: 3

View File

@ -17,6 +17,7 @@ dependencies = [
"python-multipart", "python-multipart",
"reader", "reader",
"sentry-sdk[fastapi]", "sentry-sdk[fastapi]",
"tldextract",
"uvicorn", "uvicorn",
] ]
@ -42,7 +43,7 @@ platformdirs = "*"
python-dotenv = "*" python-dotenv = "*"
python-multipart = "*" python-multipart = "*"
reader = "*" reader = "*"
sentry-sdk = {version = "*", extras = ["fastapi"]} sentry-sdk = { version = "*", extras = ["fastapi"] }
uvicorn = "*" uvicorn = "*"
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
@ -86,6 +87,8 @@ lint.ignore = [
"PLR6301", # Checks for the presence of unused self parameter in methods definitions. "PLR6301", # Checks for the presence of unused self parameter in methods definitions.
"RUF029", # Checks for functions declared async that do not await or otherwise use features requiring the function to be declared async. "RUF029", # Checks for functions declared async that do not await or otherwise use features requiring the function to be declared async.
"TD003", # Checks that a TODO comment is associated with a link to a relevant issue or ticket. "TD003", # Checks that a TODO comment is associated with a link to a relevant issue or ticket.
"PLR0913", # Checks for function definitions that include too many arguments.
"PLR0917", # Checks for function definitions that include too many positional arguments.
# Conflicting lint rules when using Ruff's formatter # Conflicting lint rules when using Ruff's formatter
# https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules # https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules

View File

@ -39,6 +39,13 @@ def test_has_black_tags() -> None:
check_if_has_tag(reader, feed, "blacklist_title") check_if_has_tag(reader, feed, "blacklist_title")
check_if_has_tag(reader, feed, "blacklist_summary") check_if_has_tag(reader, feed, "blacklist_summary")
check_if_has_tag(reader, feed, "blacklist_content") check_if_has_tag(reader, feed, "blacklist_content")
check_if_has_tag(reader, feed, "blacklist_author")
# Test regex blacklist tags
check_if_has_tag(reader, feed, "regex_blacklist_title")
check_if_has_tag(reader, feed, "regex_blacklist_summary")
check_if_has_tag(reader, feed, "regex_blacklist_content")
check_if_has_tag(reader, feed, "regex_blacklist_author")
# Clean up # Clean up
reader.delete_feed(feed_url) reader.delete_feed(feed_url)
@ -74,6 +81,7 @@ def test_should_be_skipped() -> None:
# Test entry without any blacklists # Test entry without any blacklists
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}" assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
# Test standard blacklist functionality
reader.set_tag(feed, "blacklist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType] reader.set_tag(feed, "blacklist_title", "fvnnnfnfdnfdnfd") # pyright: ignore[reportArgumentType]
assert entry_should_be_skipped(reader, first_entry[0]) is True, f"Entry should be skipped: {first_entry[0]}" assert entry_should_be_skipped(reader, first_entry[0]) is True, f"Entry should be skipped: {first_entry[0]}"
reader.delete_tag(feed, "blacklist_title") reader.delete_tag(feed, "blacklist_title")
@ -113,3 +121,81 @@ def test_should_be_skipped() -> None:
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}" assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
reader.delete_tag(feed, "blacklist_author") reader.delete_tag(feed, "blacklist_author")
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}" assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
def test_regex_should_be_skipped() -> None:
"""Test the regex filtering functionality for blacklist."""
reader: Reader = get_reader()
# Add feed and update entries
reader.add_feed(feed_url)
feed: Feed = reader.get_feed(feed_url)
reader.update_feeds()
# Get first entry
first_entry: list[Entry] = []
entries: Iterable[Entry] = reader.get_entries(feed=feed)
assert entries is not None, f"Entries should not be None: {entries}"
for entry in entries:
first_entry.append(entry)
break
assert len(first_entry) == 1, f"First entry should be added: {first_entry}"
# Test entry without any regex blacklists
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
# Test regex blacklist for title
reader.set_tag(feed, "regex_blacklist_title", r"fvnnn\w+") # pyright: ignore[reportArgumentType]
assert entry_should_be_skipped(reader, first_entry[0]) is True, (
f"Entry should be skipped with regex title match: {first_entry[0]}"
)
reader.delete_tag(feed, "regex_blacklist_title")
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
# Test regex blacklist for summary
reader.set_tag(feed, "regex_blacklist_summary", r"ffdnfdn\w+") # pyright: ignore[reportArgumentType]
assert entry_should_be_skipped(reader, first_entry[0]) is True, (
f"Entry should be skipped with regex summary match: {first_entry[0]}"
)
reader.delete_tag(feed, "regex_blacklist_summary")
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
# Test regex blacklist for content
reader.set_tag(feed, "regex_blacklist_content", r"ffdnfdnfdn\w+") # pyright: ignore[reportArgumentType]
assert entry_should_be_skipped(reader, first_entry[0]) is True, (
f"Entry should be skipped with regex content match: {first_entry[0]}"
)
reader.delete_tag(feed, "regex_blacklist_content")
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
# Test regex blacklist for author
reader.set_tag(feed, "regex_blacklist_author", r"TheLovinator\d*") # pyright: ignore[reportArgumentType]
assert entry_should_be_skipped(reader, first_entry[0]) is True, (
f"Entry should be skipped with regex author match: {first_entry[0]}"
)
reader.delete_tag(feed, "regex_blacklist_author")
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
# Test invalid regex pattern (should not raise an exception)
reader.set_tag(feed, "regex_blacklist_title", r"[incomplete") # pyright: ignore[reportArgumentType]
assert entry_should_be_skipped(reader, first_entry[0]) is False, (
f"Entry should not be skipped with invalid regex: {first_entry[0]}"
)
reader.delete_tag(feed, "regex_blacklist_title")
# Test multiple regex patterns separated by commas
reader.set_tag(feed, "regex_blacklist_author", r"pattern1,TheLovinator\d*,pattern3") # pyright: ignore[reportArgumentType]
assert entry_should_be_skipped(reader, first_entry[0]) is True, (
f"Entry should be skipped with one matching pattern in list: {first_entry[0]}"
)
reader.delete_tag(feed, "regex_blacklist_author")
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"
# Test newline-separated regex patterns
newline_patterns = "pattern1\nTheLovinator\\d*\npattern3"
reader.set_tag(feed, "regex_blacklist_author", newline_patterns) # pyright: ignore[reportArgumentType]
assert entry_should_be_skipped(reader, first_entry[0]) is True, (
f"Entry should be skipped with newline-separated patterns: {first_entry[0]}"
)
reader.delete_tag(feed, "regex_blacklist_author")
assert entry_should_be_skipped(reader, first_entry[0]) is False, f"Entry should not be skipped: {first_entry[0]}"

View File

@ -4,11 +4,19 @@ import os
import tempfile import tempfile
from pathlib import Path from pathlib import Path
from typing import LiteralString from typing import LiteralString
from unittest.mock import MagicMock, patch
import pytest import pytest
from reader import Feed, Reader, make_reader from reader import Feed, Reader, make_reader
from discord_rss_bot.feeds import send_to_discord, truncate_webhook_message from discord_rss_bot.feeds import (
extract_domain,
is_youtube_feed,
send_entry_to_discord,
send_to_discord,
should_send_embed_check,
truncate_webhook_message,
)
from discord_rss_bot.missing_tags import add_missing_tags from discord_rss_bot.missing_tags import add_missing_tags
@ -85,3 +93,186 @@ def test_truncate_webhook_message_long_message():
# Test the end of the message # Test the end of the message
assert_msg = "The end of the truncated message should be '...' to indicate truncation." assert_msg = "The end of the truncated message should be '...' to indicate truncation."
assert truncated_message[-half_length:] == "A" * half_length, assert_msg assert truncated_message[-half_length:] == "A" * half_length, assert_msg
def test_is_youtube_feed():
"""Test the is_youtube_feed function."""
# YouTube feed URLs
assert is_youtube_feed("https://www.youtube.com/feeds/videos.xml?channel_id=123456") is True
assert is_youtube_feed("https://www.youtube.com/feeds/videos.xml?user=username") is True
# Non-YouTube feed URLs
assert is_youtube_feed("https://www.example.com/feed.xml") is False
assert is_youtube_feed("https://www.youtube.com/watch?v=123456") is False
assert is_youtube_feed("https://www.reddit.com/r/Python/.rss") is False
@patch("discord_rss_bot.feeds.logger")
def test_should_send_embed_check_youtube_feeds(mock_logger: MagicMock) -> None:
"""Test should_send_embed_check returns False for YouTube feeds regardless of settings."""
# Create mocks
mock_reader = MagicMock()
mock_entry = MagicMock()
# Configure a YouTube feed
mock_entry.feed.url = "https://www.youtube.com/feeds/videos.xml?channel_id=123456"
# Set reader to return True for should_send_embed (would normally create an embed)
mock_reader.get_tag.return_value = True
# Result should be False, overriding the feed settings
result = should_send_embed_check(mock_reader, mock_entry)
assert result is False, "YouTube feeds should never use embeds"
# Function should not even call get_tag for YouTube feeds
mock_reader.get_tag.assert_not_called()
@patch("discord_rss_bot.feeds.logger")
def test_should_send_embed_check_normal_feeds(mock_logger: MagicMock) -> None:
"""Test should_send_embed_check returns feed settings for non-YouTube feeds."""
# Create mocks
mock_reader = MagicMock()
mock_entry = MagicMock()
# Configure a normal feed
mock_entry.feed.url = "https://www.example.com/feed.xml"
# Test with should_send_embed set to True
mock_reader.get_tag.return_value = True
result = should_send_embed_check(mock_reader, mock_entry)
assert result is True, "Normal feeds should use embeds when enabled"
# Test with should_send_embed set to False
mock_reader.get_tag.return_value = False
result = should_send_embed_check(mock_reader, mock_entry)
assert result is False, "Normal feeds should not use embeds when disabled"
@patch("discord_rss_bot.feeds.get_reader")
@patch("discord_rss_bot.feeds.get_custom_message")
@patch("discord_rss_bot.feeds.replace_tags_in_text_message")
@patch("discord_rss_bot.feeds.create_embed_webhook")
@patch("discord_rss_bot.feeds.DiscordWebhook")
@patch("discord_rss_bot.feeds.execute_webhook")
def test_send_entry_to_discord_youtube_feed(
mock_execute_webhook: MagicMock,
mock_discord_webhook: MagicMock,
mock_create_embed: MagicMock,
mock_replace_tags: MagicMock,
mock_get_custom_message: MagicMock,
mock_get_reader: MagicMock,
):
"""Test send_entry_to_discord function with YouTube feeds."""
# Set up mocks
mock_reader = MagicMock()
mock_get_reader.return_value = mock_reader
mock_entry = MagicMock()
mock_feed = MagicMock()
# Configure a YouTube feed
mock_entry.feed = mock_feed
mock_entry.feed.url = "https://www.youtube.com/feeds/videos.xml?channel_id=123456"
mock_entry.feed_url = "https://www.youtube.com/feeds/videos.xml?channel_id=123456"
# Mock the tags
mock_reader.get_tag.side_effect = lambda feed, tag, default=None: { # noqa: ARG005
"webhook": "https://discord.com/api/webhooks/123/abc",
"should_send_embed": True, # This should be ignored for YouTube feeds
}.get(tag, default)
# Mock custom message
mock_get_custom_message.return_value = "Custom message"
mock_replace_tags.return_value = "Formatted message with {{entry_link}}"
# Mock webhook
mock_webhook = MagicMock()
mock_discord_webhook.return_value = mock_webhook
# Call the function
send_entry_to_discord(mock_entry)
# Assertions
mock_create_embed.assert_not_called()
mock_discord_webhook.assert_called_once()
# Check webhook was created with the right message
webhook_call_kwargs = mock_discord_webhook.call_args[1]
assert "content" in webhook_call_kwargs, "Webhook should have content"
assert webhook_call_kwargs["url"] == "https://discord.com/api/webhooks/123/abc"
# Verify execute_webhook was called
mock_execute_webhook.assert_called_once_with(mock_webhook, mock_entry)
def test_extract_domain_youtube_feed() -> None:
"""Test extract_domain for YouTube feeds."""
url: str = "https://www.youtube.com/feeds/videos.xml?channel_id=123456"
assert extract_domain(url) == "YouTube", "YouTube feeds should return 'YouTube' as the domain."
def test_extract_domain_reddit_feed() -> None:
"""Test extract_domain for Reddit feeds."""
url: str = "https://www.reddit.com/r/Python/.rss"
assert extract_domain(url) == "Reddit", "Reddit feeds should return 'Reddit' as the domain."
def test_extract_domain_github_feed() -> None:
"""Test extract_domain for GitHub feeds."""
url: str = "https://www.github.com/user/repo"
assert extract_domain(url) == "GitHub", "GitHub feeds should return 'GitHub' as the domain."
def test_extract_domain_custom_domain() -> None:
"""Test extract_domain for custom domains."""
url: str = "https://www.example.com/feed"
assert extract_domain(url) == "Example", "Custom domains should return the capitalized first part of the domain."
def test_extract_domain_no_www_prefix() -> None:
"""Test extract_domain removes 'www.' prefix."""
url: str = "https://www.example.com/feed"
assert extract_domain(url) == "Example", "The 'www.' prefix should be removed from the domain."
def test_extract_domain_no_tld() -> None:
"""Test extract_domain for domains without a TLD."""
url: str = "https://localhost/feed"
assert extract_domain(url) == "Localhost", "Domains without a TLD should return the capitalized domain."
def test_extract_domain_invalid_url() -> None:
"""Test extract_domain for invalid URLs."""
url: str = "not-a-valid-url"
assert extract_domain(url) == "Other", "Invalid URLs should return 'Other' as the domain."
def test_extract_domain_empty_url() -> None:
"""Test extract_domain for empty URLs."""
url: str = ""
assert extract_domain(url) == "Other", "Empty URLs should return 'Other' as the domain."
def test_extract_domain_special_characters() -> None:
"""Test extract_domain for URLs with special characters."""
url: str = "https://www.ex-ample.com/feed"
assert extract_domain(url) == "Ex-ample", "Domains with special characters should return the capitalized domain."
@pytest.mark.parametrize(
argnames=("url", "expected"),
argvalues=[
("https://blog.something.com", "Something"),
("https://www.something.com", "Something"),
("https://subdomain.example.co.uk", "Example"),
("https://github.com/user/repo", "GitHub"),
("https://youtube.com/feeds/videos.xml?channel_id=abc", "YouTube"),
("https://reddit.com/r/python/.rss", "Reddit"),
("", "Other"),
("not a url", "Other"),
("https://www.example.com", "Example"),
("https://foo.bar.baz.com", "Baz"),
],
)
def test_extract_domain(url: str, expected: str) -> None:
assert extract_domain(url) == expected

39
tests/test_hoyolab_api.py Normal file
View File

@ -0,0 +1,39 @@
from __future__ import annotations
from discord_rss_bot.hoyolab_api import extract_post_id_from_hoyolab_url
class TestExtractPostIdFromHoyolabUrl:
def test_extract_post_id_from_article_url(self) -> None:
"""Test extracting post ID from a direct article URL."""
test_cases: list[str] = [
"https://www.hoyolab.com/article/38588239",
"http://hoyolab.com/article/12345",
"https://www.hoyolab.com/article/987654321/comments",
]
expected_ids: list[str] = ["38588239", "12345", "987654321"]
for url, expected_id in zip(test_cases, expected_ids, strict=False):
assert extract_post_id_from_hoyolab_url(url) == expected_id
def test_url_without_post_id(self) -> None:
"""Test with a URL that doesn't have a post ID."""
test_cases: list[str] = [
"https://www.hoyolab.com/community",
]
for url in test_cases:
assert extract_post_id_from_hoyolab_url(url) is None
def test_edge_cases(self) -> None:
"""Test edge cases like None, empty string, and malformed URLs."""
test_cases: list[str | None] = [
None,
"",
"not_a_url",
"http:/", # Malformed URL
]
for url in test_cases:
assert extract_post_id_from_hoyolab_url(url) is None # type: ignore

View File

@ -45,7 +45,7 @@ def test_search() -> None:
# Check that the feed was added. # Check that the feed was added.
response = client.get(url="/") response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}" assert response.status_code == 200, f"Failed to get /: {response.text}"
assert feed_url in response.text, f"Feed not found in /: {response.text}" assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
# Search for an entry. # Search for an entry.
response: Response = client.get(url="/search/?query=a") response: Response = client.get(url="/search/?query=a")
@ -85,7 +85,7 @@ def test_create_feed() -> None:
# Check that the feed was added. # Check that the feed was added.
response = client.get(url="/") response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}" assert response.status_code == 200, f"Failed to get /: {response.text}"
assert feed_url in response.text, f"Feed not found in /: {response.text}" assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
def test_get() -> None: def test_get() -> None:
@ -103,7 +103,7 @@ def test_get() -> None:
# Check that the feed was added. # Check that the feed was added.
response = client.get("/") response = client.get("/")
assert response.status_code == 200, f"Failed to get /: {response.text}" assert response.status_code == 200, f"Failed to get /: {response.text}"
assert feed_url in response.text, f"Feed not found in /: {response.text}" assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
response: Response = client.get(url="/add") response: Response = client.get(url="/add")
assert response.status_code == 200, f"/add failed: {response.text}" assert response.status_code == 200, f"/add failed: {response.text}"
@ -157,7 +157,7 @@ def test_pause_feed() -> None:
# Check that the feed was paused. # Check that the feed was paused.
response = client.get(url="/") response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}" assert response.status_code == 200, f"Failed to get /: {response.text}"
assert feed_url in response.text, f"Feed not found in /: {response.text}" assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
def test_unpause_feed() -> None: def test_unpause_feed() -> None:
@ -184,7 +184,7 @@ def test_unpause_feed() -> None:
# Check that the feed was unpaused. # Check that the feed was unpaused.
response = client.get(url="/") response = client.get(url="/")
assert response.status_code == 200, f"Failed to get /: {response.text}" assert response.status_code == 200, f"Failed to get /: {response.text}"
assert feed_url in response.text, f"Feed not found in /: {response.text}" assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
def test_remove_feed() -> None: def test_remove_feed() -> None:
@ -229,3 +229,16 @@ def test_delete_webhook() -> None:
response = client.get(url="/webhooks") response = client.get(url="/webhooks")
assert response.status_code == 200, f"Failed to get /webhooks: {response.text}" assert response.status_code == 200, f"Failed to get /webhooks: {response.text}"
assert webhook_name not in response.text, f"Webhook found in /webhooks: {response.text}" assert webhook_name not in response.text, f"Webhook found in /webhooks: {response.text}"
def test_update_feed_not_found() -> None:
"""Test updating a non-existent feed."""
# Generate a feed URL that does not exist
nonexistent_feed_url = "https://nonexistent-feed.example.com/rss.xml"
# Try to update the non-existent feed
response: Response = client.get(url="/update", params={"feed_url": urllib.parse.quote(nonexistent_feed_url)})
# Check that it returns a 404 status code
assert response.status_code == 404, f"Expected 404 for non-existent feed, got: {response.status_code}"
assert "Feed not found" in response.text

View File

@ -1,6 +1,6 @@
from __future__ import annotations from __future__ import annotations
from discord_rss_bot.filter.utils import is_word_in_text from discord_rss_bot.filter.utils import is_regex_match, is_word_in_text
def test_is_word_in_text() -> None: def test_is_word_in_text() -> None:
@ -14,3 +14,51 @@ def test_is_word_in_text() -> None:
assert is_word_in_text("Alert,Forma", "Outbreak - Mutagen Mass - Rhea (Saturn)") is False, msg_false assert is_word_in_text("Alert,Forma", "Outbreak - Mutagen Mass - Rhea (Saturn)") is False, msg_false
assert is_word_in_text("Alert,Forma", "Outbreak - Mutagen Mass - Rhea (Saturn)") is False, msg_false assert is_word_in_text("Alert,Forma", "Outbreak - Mutagen Mass - Rhea (Saturn)") is False, msg_false
assert is_word_in_text("word1,word2", "This is a sample text containing none of the words.") is False, msg_false assert is_word_in_text("word1,word2", "This is a sample text containing none of the words.") is False, msg_false
def test_is_regex_match() -> None:
msg_true = "Should return True"
msg_false = "Should return False"
# Test basic regex patterns
assert is_regex_match(r"word\d+", "This text contains word123") is True, msg_true
assert is_regex_match(r"^Hello", "Hello world") is True, msg_true
assert is_regex_match(r"world$", "Hello world") is True, msg_true
# Test case insensitivity
assert is_regex_match(r"hello", "This text contains HELLO") is True, msg_true
# Test comma-separated patterns
assert is_regex_match(r"pattern1,pattern2", "This contains pattern2") is True, msg_true
assert is_regex_match(r"pattern1, pattern2", "This contains pattern1") is True, msg_true
# Test regex that shouldn't match
assert is_regex_match(r"^start", "This doesn't start with the pattern") is False, msg_false
assert is_regex_match(r"end$", "This doesn't end with the pattern") is False, msg_false
# Test with empty input
assert is_regex_match("", "Some text") is False, msg_false
assert is_regex_match("pattern", "") is False, msg_false
# Test with invalid regex (should not raise an exception and return False)
assert is_regex_match(r"[incomplete", "Some text") is False, msg_false
# Test with multiple patterns where one is invalid
assert is_regex_match(r"valid, [invalid, \w+", "Contains word") is True, msg_true
# Test newline-separated patterns
newline_patterns = "pattern1\n^start\ncontains\\d+"
assert is_regex_match(newline_patterns, "This contains123 text") is True, msg_true
assert is_regex_match(newline_patterns, "start of line") is True, msg_true
assert is_regex_match(newline_patterns, "pattern1 is here") is True, msg_true
assert is_regex_match(newline_patterns, "None of these match") is False, msg_false
# Test mixed newline and comma patterns (for backward compatibility)
mixed_patterns = "pattern1\npattern2,pattern3\npattern4"
assert is_regex_match(mixed_patterns, "Contains pattern3") is True, msg_true
assert is_regex_match(mixed_patterns, "Contains pattern4") is True, msg_true
# Test with empty lines and spaces
whitespace_patterns = "\\s+\n \n\npattern\n\n"
assert is_regex_match(whitespace_patterns, "text with spaces") is True, msg_true
assert is_regex_match(whitespace_patterns, "text with pattern") is True, msg_true

View File

@ -38,6 +38,13 @@ def test_has_white_tags() -> None:
check_if_has_tag(reader, feed, "whitelist_title") check_if_has_tag(reader, feed, "whitelist_title")
check_if_has_tag(reader, feed, "whitelist_summary") check_if_has_tag(reader, feed, "whitelist_summary")
check_if_has_tag(reader, feed, "whitelist_content") check_if_has_tag(reader, feed, "whitelist_content")
check_if_has_tag(reader, feed, "whitelist_author")
# Test regex whitelist tags
check_if_has_tag(reader, feed, "regex_whitelist_title")
check_if_has_tag(reader, feed, "regex_whitelist_summary")
check_if_has_tag(reader, feed, "regex_whitelist_content")
check_if_has_tag(reader, feed, "regex_whitelist_author")
# Clean up # Clean up
reader.delete_feed(feed_url) reader.delete_feed(feed_url)
@ -109,3 +116,67 @@ def test_should_be_sent() -> None:
assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent" assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
reader.delete_tag(feed, "whitelist_author") reader.delete_tag(feed, "whitelist_author")
assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent" assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
def test_regex_should_be_sent() -> None:
"""Test the regex filtering functionality for whitelist."""
reader: Reader = get_reader()
# Add feed and update entries
reader.add_feed(feed_url)
feed: Feed = reader.get_feed(feed_url)
reader.update_feeds()
# Get first entry
first_entry: list[Entry] = []
entries: Iterable[Entry] = reader.get_entries(feed=feed)
assert entries is not None, "Entries should not be None"
for entry in entries:
first_entry.append(entry)
break
assert len(first_entry) == 1, "First entry should be added"
# Test entry without any regex whitelists
assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
# Test regex whitelist for title
reader.set_tag(feed, "regex_whitelist_title", r"fvnnn\w+") # pyright: ignore[reportArgumentType]
assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with regex title match"
reader.delete_tag(feed, "regex_whitelist_title")
assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
# Test regex whitelist for summary
reader.set_tag(feed, "regex_whitelist_summary", r"ffdnfdn\w+") # pyright: ignore[reportArgumentType]
assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with regex summary match"
reader.delete_tag(feed, "regex_whitelist_summary")
assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
# Test regex whitelist for content
reader.set_tag(feed, "regex_whitelist_content", r"ffdnfdnfdn\w+") # pyright: ignore[reportArgumentType]
assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with regex content match"
reader.delete_tag(feed, "regex_whitelist_content")
assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
# Test regex whitelist for author
reader.set_tag(feed, "regex_whitelist_author", r"TheLovinator\d*") # pyright: ignore[reportArgumentType]
assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with regex author match"
reader.delete_tag(feed, "regex_whitelist_author")
assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
# Test invalid regex pattern (should not raise an exception)
reader.set_tag(feed, "regex_whitelist_title", r"[incomplete") # pyright: ignore[reportArgumentType]
assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent with invalid regex"
reader.delete_tag(feed, "regex_whitelist_title")
# Test multiple regex patterns separated by commas
reader.set_tag(feed, "regex_whitelist_author", r"pattern1,TheLovinator\d*,pattern3") # pyright: ignore[reportArgumentType]
assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with one matching pattern in list"
reader.delete_tag(feed, "regex_whitelist_author")
assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"
# Test newline-separated regex patterns
newline_patterns = "pattern1\nTheLovinator\\d*\npattern3"
reader.set_tag(feed, "regex_whitelist_author", newline_patterns) # pyright: ignore[reportArgumentType]
assert should_be_sent(reader, first_entry[0]) is True, "Entry should be sent with newline-separated patterns"
reader.delete_tag(feed, "regex_whitelist_author")
assert should_be_sent(reader, first_entry[0]) is False, "Entry should not be sent"