diff --git a/discord_rss_bot/feeds.py b/discord_rss_bot/feeds.py index 90c7af1..b5cf82d 100644 --- a/discord_rss_bot/feeds.py +++ b/discord_rss_bot/feeds.py @@ -711,16 +711,15 @@ def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, d use_default_message_on_empty=True, ) - # Check if the entry is blacklisted, and if it is, we will skip it. - if entry_should_be_skipped(effective_reader, entry): + # Whitelist should take precedence when configured. + if has_white_tags(effective_reader, entry.feed): + if not should_be_sent(effective_reader, entry): + logger.info("Entry was not whitelisted: %s", entry.id) + continue + elif entry_should_be_skipped(effective_reader, entry): logger.info("Entry was blacklisted: %s", entry.id) continue - # Check if the feed has a whitelist, and if it does, check if the entry is whitelisted. - if has_white_tags(effective_reader, entry.feed) and not should_be_sent(effective_reader, entry): - logger.info("Entry was not whitelisted: %s", entry.id) - continue - # Use a custom webhook for Hoyolab feeds. if is_c3kay_feed(entry.feed.url): entry_link: str | None = entry.link diff --git a/discord_rss_bot/filter/blacklist.py b/discord_rss_bot/filter/blacklist.py index 8260993..ef61fcb 100644 --- a/discord_rss_bot/filter/blacklist.py +++ b/discord_rss_bot/filter/blacklist.py @@ -2,6 +2,7 @@ from __future__ import annotations from typing import TYPE_CHECKING +from discord_rss_bot.filter.utils import get_domain_filter_tags from discord_rss_bot.filter.utils import is_regex_match from discord_rss_bot.filter.utils import is_word_in_text @@ -11,6 +12,37 @@ if TYPE_CHECKING: from reader import Reader +_MATCH_FIELDS: tuple[str, ...] = ("title", "summary", "content", "author") + + +def _get_effective_blacklist_values(reader: Reader, feed: Feed) -> tuple[dict[str, str], dict[str, str]]: + """Return merged feed-level and domain-level blacklist values.""" + local_values: dict[str, str] = { + field: str(reader.get_tag(feed, f"blacklist_{field}", "")).strip() for field in _MATCH_FIELDS + } + local_regex_values: dict[str, str] = { + field: str(reader.get_tag(feed, f"regex_blacklist_{field}", "")).strip() for field in _MATCH_FIELDS + } + + domain_values_raw: dict[str, str] = get_domain_filter_tags(reader, feed, "domain_blacklist") + domain_values: dict[str, str] = { + field: str(domain_values_raw.get(f"blacklist_{field}", "")).strip() for field in _MATCH_FIELDS + } + domain_regex_values: dict[str, str] = { + field: str(domain_values_raw.get(f"regex_blacklist_{field}", "")).strip() for field in _MATCH_FIELDS + } + + merged_values: dict[str, str] = { + field: ",".join(value for value in (local_values[field], domain_values[field]) if value) + for field in _MATCH_FIELDS + } + merged_regex_values: dict[str, str] = { + field: "\n".join(value for value in (local_regex_values[field], domain_regex_values[field]) if value) + for field in _MATCH_FIELDS + } + return merged_values, merged_regex_values + + def feed_has_blacklist_tags(reader: Reader, feed: Feed) -> bool: """Return True if the feed has blacklist tags. @@ -31,26 +63,8 @@ def feed_has_blacklist_tags(reader: Reader, feed: Feed) -> bool: Returns: bool: If the feed has any of the tags. """ - blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", "")).strip() - blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", "")).strip() - blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", "")).strip() - blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", "")).strip() - - regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", "")).strip() - regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", "")).strip() - regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", "")).strip() - regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", "")).strip() - - return bool( - blacklist_title - or blacklist_author - or blacklist_content - or blacklist_summary - or regex_blacklist_author - or regex_blacklist_content - or regex_blacklist_summary - or regex_blacklist_title, - ) + merged_values, merged_regex_values = _get_effective_blacklist_values(reader, feed) + return any(merged_values.values()) or any(merged_regex_values.values()) def entry_should_be_skipped(reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 @@ -63,58 +77,55 @@ def entry_should_be_skipped(reader: Reader, entry: Entry) -> bool: # noqa: PLR0 Returns: bool: If the entry is in the blacklist. """ - feed = entry.feed - - blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", "")).strip() - blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", "")).strip() - blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", "")).strip() - blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", "")).strip() - - regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", "")).strip() - regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", "")).strip() - regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", "")).strip() - regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", "")).strip() + merged_values, merged_regex_values = _get_effective_blacklist_values(reader, entry.feed) # TODO(TheLovinator): Also add support for entry_text and more. # Check regular blacklist - if entry.title and blacklist_title and is_word_in_text(blacklist_title, entry.title): + if entry.title and merged_values["title"] and is_word_in_text(merged_values["title"], entry.title): return True - if entry.summary and blacklist_summary and is_word_in_text(blacklist_summary, entry.summary): + if entry.summary and merged_values["summary"] and is_word_in_text(merged_values["summary"], entry.summary): return True if ( entry.content and entry.content[0].value - and blacklist_content - and is_word_in_text(blacklist_content, entry.content[0].value) + and merged_values["content"] + and is_word_in_text(merged_values["content"], entry.content[0].value) ): return True - if entry.author and blacklist_author and is_word_in_text(blacklist_author, entry.author): + if entry.author and merged_values["author"] and is_word_in_text(merged_values["author"], entry.author): return True if ( entry.content and entry.content[0].value - and blacklist_content - and is_word_in_text(blacklist_content, entry.content[0].value) + and merged_values["content"] + and is_word_in_text(merged_values["content"], entry.content[0].value) ): return True # Check regex blacklist - if entry.title and regex_blacklist_title and is_regex_match(regex_blacklist_title, entry.title): + if entry.title and merged_regex_values["title"] and is_regex_match(merged_regex_values["title"], entry.title): return True - if entry.summary and regex_blacklist_summary and is_regex_match(regex_blacklist_summary, entry.summary): + if ( + entry.summary + and merged_regex_values["summary"] + and is_regex_match( + merged_regex_values["summary"], + entry.summary, + ) + ): return True if ( entry.content and entry.content[0].value - and regex_blacklist_content - and is_regex_match(regex_blacklist_content, entry.content[0].value) + and merged_regex_values["content"] + and is_regex_match(merged_regex_values["content"], entry.content[0].value) ): return True - if entry.author and regex_blacklist_author and is_regex_match(regex_blacklist_author, entry.author): + if entry.author and merged_regex_values["author"] and is_regex_match(merged_regex_values["author"], entry.author): return True return bool( entry.content and entry.content[0].value - and regex_blacklist_content - and is_regex_match(regex_blacklist_content, entry.content[0].value), + and merged_regex_values["content"] + and is_regex_match(merged_regex_values["content"], entry.content[0].value), ) diff --git a/discord_rss_bot/filter/utils.py b/discord_rss_bot/filter/utils.py index ff93e59..d25e398 100644 --- a/discord_rss_bot/filter/utils.py +++ b/discord_rss_bot/filter/utils.py @@ -2,6 +2,14 @@ from __future__ import annotations import logging import re +from typing import TYPE_CHECKING +from urllib.parse import urlparse + +import tldextract + +if TYPE_CHECKING: + from reader import Feed + from reader import Reader logger: logging.Logger = logging.getLogger(__name__) @@ -70,3 +78,53 @@ def is_regex_match(regex_string: str, text: str) -> bool: logger.info("No regex patterns matched.") return False + + +def get_domain_key(url: str) -> str: + """Return a normalized domain key used for domain-wide filters. + + Args: + url: The URL to extract the domain from. + + Returns: + str: A normalized domain key (e.g. ``example.com``). + """ + if not url: + return "" + + parsed_url = urlparse(url) + host: str = parsed_url.netloc.lower().strip() + host = host.removeprefix("www.") + + if not host: + return "" + + ext = tldextract.extract(host) + top_domain: str = ext.top_domain_under_public_suffix + return top_domain or host + + +def get_domain_filter_tags(reader: Reader, feed: Feed, tag_name: str) -> dict[str, str]: + """Return domain-wide filter tags for a feed. + + Args: + reader: Reader instance. + feed: Feed instance. + tag_name: Global tag name that stores domain filters. + + Returns: + dict[str, str]: Domain filter values for the feed's domain. + """ + domain_key: str = get_domain_key(str(feed.url)) + if not domain_key: + return {} + + domain_filters: object = reader.get_tag((), tag_name, {}) + if not isinstance(domain_filters, dict): + return {} + + values: object = domain_filters.get(domain_key, {}) + if not isinstance(values, dict): + return {} + + return {str(key): str(value) for key, value in values.items() if isinstance(key, str)} diff --git a/discord_rss_bot/filter/whitelist.py b/discord_rss_bot/filter/whitelist.py index bb5303d..1d8de85 100644 --- a/discord_rss_bot/filter/whitelist.py +++ b/discord_rss_bot/filter/whitelist.py @@ -2,6 +2,7 @@ from __future__ import annotations from typing import TYPE_CHECKING +from discord_rss_bot.filter.utils import get_domain_filter_tags from discord_rss_bot.filter.utils import is_regex_match from discord_rss_bot.filter.utils import is_word_in_text @@ -11,6 +12,37 @@ if TYPE_CHECKING: from reader import Reader +_MATCH_FIELDS: tuple[str, ...] = ("title", "summary", "content", "author") + + +def _get_effective_whitelist_values(reader: Reader, feed: Feed) -> tuple[dict[str, str], dict[str, str]]: + """Return merged feed-level and domain-level whitelist values.""" + local_values: dict[str, str] = { + field: str(reader.get_tag(feed, f"whitelist_{field}", "")).strip() for field in _MATCH_FIELDS + } + local_regex_values: dict[str, str] = { + field: str(reader.get_tag(feed, f"regex_whitelist_{field}", "")).strip() for field in _MATCH_FIELDS + } + + domain_values_raw: dict[str, str] = get_domain_filter_tags(reader, feed, "domain_whitelist") + domain_values: dict[str, str] = { + field: str(domain_values_raw.get(f"whitelist_{field}", "")).strip() for field in _MATCH_FIELDS + } + domain_regex_values: dict[str, str] = { + field: str(domain_values_raw.get(f"regex_whitelist_{field}", "")).strip() for field in _MATCH_FIELDS + } + + merged_values: dict[str, str] = { + field: ",".join(value for value in (local_values[field], domain_values[field]) if value) + for field in _MATCH_FIELDS + } + merged_regex_values: dict[str, str] = { + field: "\n".join(value for value in (local_regex_values[field], domain_regex_values[field]) if value) + for field in _MATCH_FIELDS + } + return merged_values, merged_regex_values + + def has_white_tags(reader: Reader, feed: Feed) -> bool: """Return True if the feed has whitelist tags. @@ -31,26 +63,8 @@ def has_white_tags(reader: Reader, feed: Feed) -> bool: Returns: bool: If the feed has any of the tags. """ - whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", "")).strip() - whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", "")).strip() - whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", "")).strip() - whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", "")).strip() - - regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", "")).strip() - regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", "")).strip() - regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", "")).strip() - regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", "")).strip() - - return bool( - whitelist_title - or whitelist_author - or whitelist_content - or whitelist_summary - or regex_whitelist_author - or regex_whitelist_content - or regex_whitelist_summary - or regex_whitelist_title, - ) + merged_values, merged_regex_values = _get_effective_whitelist_values(reader, feed) + return any(merged_values.values()) or any(merged_regex_values.values()) def should_be_sent(reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 @@ -63,44 +77,40 @@ def should_be_sent(reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 Returns: bool: If the entry is in the whitelist. """ - feed: Feed = entry.feed - # Regular whitelist tags - whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", "")).strip() - whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", "")).strip() - whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", "")).strip() - whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", "")).strip() - - # Regex whitelist tags - regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", "")).strip() - regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", "")).strip() - regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", "")).strip() - regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", "")).strip() + merged_values, merged_regex_values = _get_effective_whitelist_values(reader, entry.feed) # Check regular whitelist - if entry.title and whitelist_title and is_word_in_text(whitelist_title, entry.title): + if entry.title and merged_values["title"] and is_word_in_text(merged_values["title"], entry.title): return True - if entry.summary and whitelist_summary and is_word_in_text(whitelist_summary, entry.summary): + if entry.summary and merged_values["summary"] and is_word_in_text(merged_values["summary"], entry.summary): return True - if entry.author and whitelist_author and is_word_in_text(whitelist_author, entry.author): + if entry.author and merged_values["author"] and is_word_in_text(merged_values["author"], entry.author): return True if ( entry.content and entry.content[0].value - and whitelist_content - and is_word_in_text(whitelist_content, entry.content[0].value) + and merged_values["content"] + and is_word_in_text(merged_values["content"], entry.content[0].value) ): return True # Check regex whitelist - if entry.title and regex_whitelist_title and is_regex_match(regex_whitelist_title, entry.title): + if entry.title and merged_regex_values["title"] and is_regex_match(merged_regex_values["title"], entry.title): return True - if entry.summary and regex_whitelist_summary and is_regex_match(regex_whitelist_summary, entry.summary): + if ( + entry.summary + and merged_regex_values["summary"] + and is_regex_match( + merged_regex_values["summary"], + entry.summary, + ) + ): return True - if entry.author and regex_whitelist_author and is_regex_match(regex_whitelist_author, entry.author): + if entry.author and merged_regex_values["author"] and is_regex_match(merged_regex_values["author"], entry.author): return True return bool( entry.content and entry.content[0].value - and regex_whitelist_content - and is_regex_match(regex_whitelist_content, entry.content[0].value), + and merged_regex_values["content"] + and is_regex_match(merged_regex_values["content"], entry.content[0].value), ) diff --git a/discord_rss_bot/git_backup.py b/discord_rss_bot/git_backup.py index febc34c..9b1c585 100644 --- a/discord_rss_bot/git_backup.py +++ b/discord_rss_bot/git_backup.py @@ -28,7 +28,7 @@ import shutil import subprocess # noqa: S404 from pathlib import Path from typing import TYPE_CHECKING -from typing import Any +from typing import cast if TYPE_CHECKING: from reader import Reader @@ -37,11 +37,8 @@ logger: logging.Logger = logging.getLogger(__name__) GIT_EXECUTABLE: str = shutil.which("git") or "git" -type TAG_VALUE = ( - dict[str, str | int | float | bool | dict[str, Any] | list[Any] | None] - | list[str | int | float | bool | dict[str, Any] | list[Any] | None] - | None -) +type JsonScalar = str | int | float | bool | None +type JsonLike = JsonScalar | dict[str, JsonLike] | list[JsonLike] # Tags that are exported per-feed (empty values are omitted). _FEED_TAGS: tuple[str, ...] = ( @@ -157,47 +154,68 @@ def setup_backup_repo(backup_path: Path) -> bool: return True -def export_state(reader: Reader, backup_path: Path) -> None: - """Serialise the current bot state to ``state.json`` inside *backup_path*. +def _build_feed_state(reader: Reader) -> list[JsonLike]: + """Collect feed and per-feed tag state. - Args: - reader: The :class:`reader.Reader` instance to read state from. - backup_path: Destination directory for the exported ``state.json``. + Returns: + A list of dictionaries containing feed URLs and their associated tag values. """ - feeds_state: list[dict] = [] + feeds_state: list[JsonLike] = [] for feed in reader.get_feeds(): - feed_data: dict = {"url": feed.url} + feed_data: dict[str, JsonLike] = {"url": feed.url} for tag in _FEED_TAGS: try: - value: TAG_VALUE = reader.get_tag(feed, tag, None) + value: JsonLike | None = cast("JsonLike | None", reader.get_tag(feed, tag, None)) if value is not None and value != "": # noqa: PLC1901 feed_data[tag] = value except Exception: logger.exception("Failed to read tag '%s' for feed '%s' during state export", tag, feed.url) feeds_state.append(feed_data) + return feeds_state - webhooks: list[str | int | float | bool | dict[str, Any] | list[Any] | None] = list( - reader.get_tag((), "webhooks", []), - ) - # Export global update interval if set - global_update_interval: dict[str, Any] | None = None - global_update_config = reader.get_tag((), ".reader.update", None) - if isinstance(global_update_config, dict): - global_update_interval = global_update_config +def _get_global_dict_tag(reader: Reader, tag_name: str) -> dict[str, JsonLike] | None: + """Return a global tag value if it is a dictionary.""" + tag_value: JsonLike | None = cast("JsonLike | None", reader.get_tag((), tag_name, None)) + return tag_value if isinstance(tag_value, dict) else None - global_screenshot_layout: str | None = None - screenshot_layout = reader.get_tag((), "screenshot_layout", None) - if isinstance(screenshot_layout, str): - clean_layout = screenshot_layout.strip().lower() - if clean_layout in {"desktop", "mobile"}: - global_screenshot_layout = clean_layout - state: dict = {"feeds": feeds_state, "webhooks": webhooks} +def _get_global_screenshot_layout(reader: Reader) -> str | None: + """Return normalized global screenshot layout if valid.""" + screenshot_layout: JsonLike | None = cast("JsonLike | None", reader.get_tag((), "screenshot_layout", None)) + if not isinstance(screenshot_layout, str): + return None + + clean_layout: str = screenshot_layout.strip().lower() + return clean_layout if clean_layout in {"desktop", "mobile"} else None + + +def export_state(reader: Reader, backup_path: Path) -> None: + """Serialize the current bot state to ``state.json`` inside *backup_path*. + + Args: + reader: The :class:`reader.Reader` instance to read state from. + backup_path: Destination directory for the exported ``state.json``. + """ + feeds_state: list[JsonLike] = _build_feed_state(reader) + + webhooks_raw: JsonLike | None = cast("JsonLike | None", reader.get_tag((), "webhooks", [])) + webhooks: list[JsonLike] = webhooks_raw if isinstance(webhooks_raw, list) else [] + + global_update_interval: dict[str, JsonLike] | None = _get_global_dict_tag(reader, ".reader.update") + global_screenshot_layout: str | None = _get_global_screenshot_layout(reader) + domain_blacklist: dict[str, JsonLike] | None = _get_global_dict_tag(reader, "domain_blacklist") + domain_whitelist: dict[str, JsonLike] | None = _get_global_dict_tag(reader, "domain_whitelist") + + state: dict[str, JsonLike] = {"feeds": feeds_state, "webhooks": webhooks} if global_update_interval is not None: state["global_update_interval"] = global_update_interval if global_screenshot_layout is not None: state["global_screenshot_layout"] = global_screenshot_layout + if domain_blacklist is not None: + state["domain_blacklist"] = domain_blacklist + if domain_whitelist is not None: + state["domain_whitelist"] = domain_whitelist state_file: Path = backup_path / "state.json" state_file.write_text(json.dumps(state, indent=2, default=str), encoding="utf-8") diff --git a/discord_rss_bot/main.py b/discord_rss_bot/main.py index 961c70e..8b02323 100644 --- a/discord_rss_bot/main.py +++ b/discord_rss_bot/main.py @@ -54,6 +54,7 @@ from discord_rss_bot.feeds import get_feed_delivery_mode from discord_rss_bot.feeds import get_screenshot_layout from discord_rss_bot.feeds import send_entry_to_discord from discord_rss_bot.feeds import send_to_discord +from discord_rss_bot.filter.utils import get_domain_key from discord_rss_bot.git_backup import commit_state_change from discord_rss_bot.git_backup import get_backup_path from discord_rss_bot.is_url_valid import is_url_valid @@ -399,6 +400,7 @@ async def post_unpause_feed( @app.post("/whitelist") async def post_set_whitelist( reader: Annotated[Reader, Depends(get_reader_dependency)], + *, whitelist_title: Annotated[str, Form()] = "", whitelist_summary: Annotated[str, Form()] = "", whitelist_content: Annotated[str, Form()] = "", @@ -407,6 +409,7 @@ async def post_set_whitelist( regex_whitelist_summary: Annotated[str, Form()] = "", regex_whitelist_content: Annotated[str, Form()] = "", regex_whitelist_author: Annotated[str, Form()] = "", + apply_to_domain: Annotated[bool, Form()] = False, feed_url: Annotated[str, Form()] = "", ) -> RedirectResponse: """Set what the whitelist should be sent, if you have this set only words in the whitelist will be sent. @@ -420,6 +423,7 @@ async def post_set_whitelist( regex_whitelist_summary: Whitelisted regex for when checking the summary. regex_whitelist_content: Whitelisted regex for when checking the content. regex_whitelist_author: Whitelisted regex for when checking the author. + apply_to_domain: Also store these values as domain-wide whitelist rules. feed_url: The feed we should set the whitelist for. reader: The Reader instance. @@ -427,16 +431,43 @@ async def post_set_whitelist( RedirectResponse: Redirect to the feed page. """ clean_feed_url: str = feed_url.strip() if feed_url else "" - reader.set_tag(clean_feed_url, "whitelist_title", whitelist_title) # pyright: ignore[reportArgumentType][call-overload] - reader.set_tag(clean_feed_url, "whitelist_summary", whitelist_summary) # pyright: ignore[reportArgumentType][call-overload] - reader.set_tag(clean_feed_url, "whitelist_content", whitelist_content) # pyright: ignore[reportArgumentType][call-overload] - reader.set_tag(clean_feed_url, "whitelist_author", whitelist_author) # pyright: ignore[reportArgumentType][call-overload] - reader.set_tag(clean_feed_url, "regex_whitelist_title", regex_whitelist_title) # pyright: ignore[reportArgumentType][call-overload] - reader.set_tag(clean_feed_url, "regex_whitelist_summary", regex_whitelist_summary) # pyright: ignore[reportArgumentType][call-overload] - reader.set_tag(clean_feed_url, "regex_whitelist_content", regex_whitelist_content) # pyright: ignore[reportArgumentType][call-overload] - reader.set_tag(clean_feed_url, "regex_whitelist_author", regex_whitelist_author) # pyright: ignore[reportArgumentType][call-overload] + whitelist_values: dict[str, str] = { + "whitelist_title": whitelist_title.strip(), + "whitelist_summary": whitelist_summary.strip(), + "whitelist_content": whitelist_content.strip(), + "whitelist_author": whitelist_author.strip(), + "regex_whitelist_title": regex_whitelist_title.strip(), + "regex_whitelist_summary": regex_whitelist_summary.strip(), + "regex_whitelist_content": regex_whitelist_content.strip(), + "regex_whitelist_author": regex_whitelist_author.strip(), + } - commit_state_change(reader, f"Update whitelist for {clean_feed_url}") + for tag, value in whitelist_values.items(): + reader.set_tag(clean_feed_url, tag, value) # pyright: ignore[reportArgumentType][call-overload] + + message: str = f"Update whitelist for {clean_feed_url}" + if apply_to_domain: + domain_key: str = get_domain_key(clean_feed_url) + if domain_key: + domain_whitelists_raw = reader.get_tag((), "domain_whitelist", {}) + domain_whitelists: dict[str, dict[str, str]] = {} + if isinstance(domain_whitelists_raw, dict): + for existing_domain, existing_values in domain_whitelists_raw.items(): + if isinstance(existing_domain, str) and isinstance(existing_values, dict): + domain_whitelists[existing_domain] = { + str(key): str(value) for key, value in existing_values.items() if isinstance(key, str) + } + + domain_values: dict[str, str] = {k: v for k, v in whitelist_values.items() if v} + if domain_values: + domain_whitelists[domain_key] = domain_values + else: + domain_whitelists.pop(domain_key, None) + + reader.set_tag((), "domain_whitelist", domain_whitelists) # pyright: ignore[reportArgumentType] + message = f"Update whitelist for {clean_feed_url} and domain {domain_key}" + + commit_state_change(reader, message) return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @@ -468,6 +499,11 @@ async def get_whitelist( regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", "")) regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", "")) regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", "")) + domain_key: str = get_domain_key(feed.url) + domain_whitelist_raw = reader.get_tag((), "domain_whitelist", {}) + domain_whitelist_enabled: bool = bool( + isinstance(domain_whitelist_raw, dict) and domain_key and domain_key in domain_whitelist_raw, + ) context = { "request": request, @@ -480,6 +516,9 @@ async def get_whitelist( "regex_whitelist_summary": regex_whitelist_summary, "regex_whitelist_content": regex_whitelist_content, "regex_whitelist_author": regex_whitelist_author, + "domain_key": domain_key, + "domain_name": extract_domain(feed.url), + "domain_whitelist_enabled": domain_whitelist_enabled, } return templates.TemplateResponse(request=request, name="whitelist.html", context=context) @@ -487,6 +526,7 @@ async def get_whitelist( @app.post("/blacklist") async def post_set_blacklist( reader: Annotated[Reader, Depends(get_reader_dependency)], + *, blacklist_title: Annotated[str, Form()] = "", blacklist_summary: Annotated[str, Form()] = "", blacklist_content: Annotated[str, Form()] = "", @@ -495,6 +535,7 @@ async def post_set_blacklist( regex_blacklist_summary: Annotated[str, Form()] = "", regex_blacklist_content: Annotated[str, Form()] = "", regex_blacklist_author: Annotated[str, Form()] = "", + apply_to_domain: Annotated[bool, Form()] = False, feed_url: Annotated[str, Form()] = "", ) -> RedirectResponse: """Set the blacklist. @@ -511,6 +552,7 @@ async def post_set_blacklist( regex_blacklist_summary: Blacklisted regex for when checking the summary. regex_blacklist_content: Blacklisted regex for when checking the content. regex_blacklist_author: Blacklisted regex for when checking the author. + apply_to_domain: Also store these values as domain-wide blacklist rules. feed_url: What feed we should set the blacklist for. reader: The Reader instance. @@ -518,15 +560,43 @@ async def post_set_blacklist( RedirectResponse: Redirect to the feed page. """ clean_feed_url: str = feed_url.strip() if feed_url else "" - reader.set_tag(clean_feed_url, "blacklist_title", blacklist_title) # pyright: ignore[reportArgumentType][call-overload] - reader.set_tag(clean_feed_url, "blacklist_summary", blacklist_summary) # pyright: ignore[reportArgumentType][call-overload] - reader.set_tag(clean_feed_url, "blacklist_content", blacklist_content) # pyright: ignore[reportArgumentType][call-overload] - reader.set_tag(clean_feed_url, "blacklist_author", blacklist_author) # pyright: ignore[reportArgumentType][call-overload] - reader.set_tag(clean_feed_url, "regex_blacklist_title", regex_blacklist_title) # pyright: ignore[reportArgumentType][call-overload] - reader.set_tag(clean_feed_url, "regex_blacklist_summary", regex_blacklist_summary) # pyright: ignore[reportArgumentType][call-overload] - reader.set_tag(clean_feed_url, "regex_blacklist_content", regex_blacklist_content) # pyright: ignore[reportArgumentType][call-overload] - reader.set_tag(clean_feed_url, "regex_blacklist_author", regex_blacklist_author) # pyright: ignore[reportArgumentType][call-overload] - commit_state_change(reader, f"Update blacklist for {clean_feed_url}") + blacklist_values: dict[str, str] = { + "blacklist_title": blacklist_title.strip(), + "blacklist_summary": blacklist_summary.strip(), + "blacklist_content": blacklist_content.strip(), + "blacklist_author": blacklist_author.strip(), + "regex_blacklist_title": regex_blacklist_title.strip(), + "regex_blacklist_summary": regex_blacklist_summary.strip(), + "regex_blacklist_content": regex_blacklist_content.strip(), + "regex_blacklist_author": regex_blacklist_author.strip(), + } + + for tag, value in blacklist_values.items(): + reader.set_tag(clean_feed_url, tag, value) # pyright: ignore[reportArgumentType][call-overload] + + message: str = f"Update blacklist for {clean_feed_url}" + if apply_to_domain: + domain_key: str = get_domain_key(clean_feed_url) + if domain_key: + domain_blacklists_raw = reader.get_tag((), "domain_blacklist", {}) + domain_blacklists: dict[str, dict[str, str]] = {} + if isinstance(domain_blacklists_raw, dict): + for existing_domain, existing_values in domain_blacklists_raw.items(): + if isinstance(existing_domain, str) and isinstance(existing_values, dict): + domain_blacklists[existing_domain] = { + str(key): str(value) for key, value in existing_values.items() if isinstance(key, str) + } + + domain_values: dict[str, str] = {k: v for k, v in blacklist_values.items() if v} + if domain_values: + domain_blacklists[domain_key] = domain_values + else: + domain_blacklists.pop(domain_key, None) + + reader.set_tag((), "domain_blacklist", domain_blacklists) # pyright: ignore[reportArgumentType] + message = f"Update blacklist for {clean_feed_url} and domain {domain_key}" + + commit_state_change(reader, message) return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @@ -556,6 +626,11 @@ async def get_blacklist( regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", "")) regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", "")) regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", "")) + domain_key: str = get_domain_key(feed.url) + domain_blacklist_raw = reader.get_tag((), "domain_blacklist", {}) + domain_blacklist_enabled: bool = bool( + isinstance(domain_blacklist_raw, dict) and domain_key and domain_key in domain_blacklist_raw, + ) context = { "request": request, @@ -568,6 +643,9 @@ async def get_blacklist( "regex_blacklist_summary": regex_blacklist_summary, "regex_blacklist_content": regex_blacklist_content, "regex_blacklist_author": regex_blacklist_author, + "domain_key": domain_key, + "domain_name": extract_domain(feed.url), + "domain_blacklist_enabled": domain_blacklist_enabled, } return templates.TemplateResponse(request=request, name="blacklist.html", context=context) diff --git a/discord_rss_bot/templates/blacklist.html b/discord_rss_bot/templates/blacklist.html index ec16bce..6904085 100644 --- a/discord_rss_bot/templates/blacklist.html +++ b/discord_rss_bot/templates/blacklist.html @@ -1,98 +1,127 @@ {% extends "base.html" %} {% block title %} -| Blacklist + | Blacklist {% endblock title %} {% block content %} -