diff --git a/.env.example b/.env.example deleted file mode 100644 index 2a098da..0000000 --- a/.env.example +++ /dev/null @@ -1,19 +0,0 @@ -# You can optionally store backups of your bot's configuration in a git repository. -# This allows you to track changes by subscribing to the repository or using a RSS feed. -# Local path for the backup git repository (e.g., /data/backup or /home/user/backups/discord-rss-bot) -# When set, the bot will initialize a git repo here and commit state.json after every configuration change -# GIT_BACKUP_PATH= - -# Remote URL for pushing backup commits (e.g., git@github.com:username/private-config.git) -# Optional - only set if you want automatic pushes to a remote repository -# Leave empty to keep git history local only -# GIT_BACKUP_REMOTE= - -# Sentry Configuration (Optional) -# Sentry DSN for error tracking and monitoring -# Leave empty to disable Sentry integration -# SENTRY_DSN= - -# Testing Configuration -# Discord webhook URL used for testing (optional, only needed when running tests) -# TEST_WEBHOOK_URL= diff --git a/.forgejo/workflows/build.yml b/.forgejo/workflows/build.yml deleted file mode 100644 index c2d854d..0000000 --- a/.forgejo/workflows/build.yml +++ /dev/null @@ -1,100 +0,0 @@ ---- -# Required setup for self-hosted runner: -# 1. Install dependencies: -# sudo pacman -S qemu-user-static qemu-user-static-binfmt docker docker-buildx -# 2. Add runner to docker group: -# sudo usermod -aG docker forgejo-runner -# 3. Restart runner service to apply group membership: -# sudo systemctl restart forgejo-runner -# 4. Install uv and ruff for the runner user -# 5. Login to GitHub Container Registry: -# echo "ghp_YOUR_TOKEN_HERE" | sudo -u forgejo-runner docker login ghcr.io -u TheLovinator1 --password-stdin -# 6. Configure sudoers for deployment (sudo EDITOR=nvim visudo): -# forgejo-runner ALL=(discord-rss) NOPASSWD: /usr/bin/git -C /home/discord-rss/discord-rss-bot pull -# forgejo-runner ALL=(discord-rss) NOPASSWD: /usr/bin/uv sync -U --directory /home/discord-rss/discord-rss-bot -# forgejo-runner ALL=(root) NOPASSWD: /bin/systemctl restart discord-rss-bot - -name: Test and build Docker image -on: - push: - branches: - - master - pull_request: - workflow_dispatch: - schedule: - - cron: "0 0 1 * *" - -jobs: - docker: - runs-on: self-hosted - steps: - # Download the latest commit from the master branch - - uses: actions/checkout@v6 - - # Verify local tools are available on the self-hosted runner - - name: Check local toolchain - run: | - python --version - uv --version - ruff --version - docker version - - # Bootstrap a local Buildx builder for multi-arch builds - # (requires qemu-user-static and qemu-user-static-binfmt installed via pacman) - - name: Configure local buildx for multi-arch - run: | - docker buildx inspect local-multiarch-builder >/dev/null 2>&1 || \ - docker buildx create --name local-multiarch-builder --driver docker-container - docker buildx use local-multiarch-builder - docker buildx inspect --bootstrap - - - name: Lint Python code - run: ruff check --exit-non-zero-on-fix --verbose - - - name: Check Python formatting - run: ruff format --check --verbose - - - name: Lint Dockerfile - run: docker build --check . - - - name: Install dependencies - run: uv sync --all-extras --all-groups - - - name: Run tests - run: uv run pytest - - - id: tags - name: Compute image tags - run: | - IMAGE="ghcr.io/thelovinator1/discord-rss-bot" - if [ "${FORGEJO_REF}" = "refs/heads/master" ]; then - echo "tags=${IMAGE}:latest,${IMAGE}:master" >> "$FORGEJO_OUTPUT" - else - SHORT_SHA="$(echo "$FORGEJO_SHA" | cut -c1-12)" - echo "tags=${IMAGE}:sha-${SHORT_SHA}" >> "$FORGEJO_OUTPUT" - fi - - # Build (and optionally push) Docker image - - name: Build and push Docker image - env: - TAGS: ${{ steps.tags.outputs.tags }} - run: | - IFS=',' read -r -a tag_array <<< "$TAGS" - tag_args=() - for tag in "${tag_array[@]}"; do - tag_args+=( -t "$tag" ) - done - - if [ "${{ forge.event_name }}" = "pull_request" ]; then - docker buildx build --platform linux/amd64,linux/arm64 "${tag_args[@]}" --load . - else - docker buildx build --platform linux/amd64,linux/arm64 "${tag_args[@]}" --push . - fi - - # Deploy to production server - - name: Deploy to Server - if: success() && forge.ref == 'refs/heads/master' - run: | - sudo -u discord-rss git -C /home/discord-rss/discord-rss-bot pull - sudo -u discord-rss uv sync -U --directory /home/discord-rss/discord-rss-bot - sudo systemctl restart discord-rss-bot diff --git a/.gitattributes b/.gitattributes deleted file mode 100644 index ccb351b..0000000 --- a/.gitattributes +++ /dev/null @@ -1 +0,0 @@ -*.html linguist-language=jinja diff --git a/.forgejo/renovate.json b/.github/renovate.json similarity index 82% rename from .forgejo/renovate.json rename to .github/renovate.json index 7884adb..734986c 100644 --- a/.forgejo/renovate.json +++ b/.github/renovate.json @@ -1,8 +1,6 @@ { "$schema": "https://docs.renovatebot.com/renovate-schema.json", - "extends": [ - "config:recommended" - ], + "extends": ["config:recommended"], "automerge": true, "configMigration": true, "dependencyDashboard": false, diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..7f0ea6d --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,64 @@ +--- +name: Test and build Docker image +on: + push: + pull_request: + workflow_dispatch: + schedule: + - cron: "0 6 * * *" + +env: + TEST_WEBHOOK_URL: ${{ secrets.TEST_WEBHOOK_URL }} + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: 3.12 + - uses: astral-sh/setup-uv@v5 + with: + version: "latest" + - run: uv sync --all-extras --all-groups + - run: uv run pytest + ruff: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: astral-sh/ruff-action@v3 + with: + version: "latest" + - run: ruff check --exit-non-zero-on-fix --verbose + - run: ruff format --check --verbose + + build: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + if: github.event_name != 'pull_request' + concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + needs: [test, ruff] + steps: + - uses: actions/checkout@v4 + - uses: docker/setup-qemu-action@v3 + with: + platforms: all + - uses: docker/setup-buildx-action@v3 + - uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.repository_owner }} + password: ${{ secrets.GITHUB_TOKEN }} + - uses: docker/build-push-action@v6 + with: + context: . + platforms: linux/amd64, linux/arm64 + push: ${{ github.event_name != 'pull_request' }} + tags: | + ghcr.io/thelovinator1/discord-rss-bot:latest + ghcr.io/thelovinator1/discord-rss-bot:master diff --git a/.gitignore b/.gitignore index 6817461..1ac2c11 100644 --- a/.gitignore +++ b/.gitignore @@ -92,7 +92,7 @@ ipython_config.py # However, in case of collaboration, if having platform-specific dependencies or dependencies # having no cross-platform support, pipenv may install dependencies that don't work, or not # install all needed dependencies. -# Pipfile.lock +Pipfile.lock # UV # Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. @@ -105,12 +105,11 @@ uv.lock # This is especially recommended for binary packages to ensure reproducibility, and is more # commonly ignored for libraries. # https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control -# poetry.lock -# poetry.toml +poetry.lock # pdm # Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. -# pdm.lock +#pdm.lock # pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it # in version control. # https://pdm.fming.dev/latest/usage/project/#working-with-version-control @@ -166,20 +165,7 @@ cython_debug/ # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -# .idea/ - -# Abstra -# Abstra is an AI-powered process automation framework. -# Ignore directories containing user credentials, local state, and settings. -# Learn more at https://abstra.io/docs -.abstra/ - -# Visual Studio Code -# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore -# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore -# and can be added to the global gitignore or merged into this file. However, if you prefer, -# you could uncomment the following to ignore the entire vscode folder -# .vscode/ +#.idea/ # Ruff stuff: .ruff_cache/ @@ -187,13 +173,6 @@ cython_debug/ # PyPI configuration file .pypirc -# Cursor -# Cursor is an AI-powered code editor. `.cursorignore` specifies files/directories to -# exclude from AI features like autocomplete and code analysis. Recommended for sensitive data -# refer to https://docs.cursor.com/context/ignore-files -.cursorignore -.cursorindexingignore - # Database stuff *.sqlite *.sqlite-shm diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 16a9a4f..a3c42c0 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,13 +1,13 @@ repos: # Automatically add trailing commas to calls and literals. - repo: https://github.com/asottile/add-trailing-comma - rev: v4.0.0 + rev: v3.1.0 hooks: - id: add-trailing-comma # Some out-of-the-box hooks for pre-commit. - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v6.0.0 + rev: v5.0.0 hooks: - id: check-added-large-files - id: check-ast @@ -31,14 +31,14 @@ repos: # Run Pyupgrade on all Python files. This will upgrade the code to Python 3.12. - repo: https://github.com/asottile/pyupgrade - rev: v3.21.2 + rev: v3.19.1 hooks: - id: pyupgrade args: ["--py312-plus"] # An extremely fast Python linter and formatter. - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.15.5 + rev: v0.9.5 hooks: - id: ruff-format - id: ruff @@ -46,6 +46,6 @@ repos: # Static checker for GitHub Actions workflow files. - repo: https://github.com/rhysd/actionlint - rev: v1.7.11 + rev: v1.7.7 hooks: - id: actionlint diff --git a/.vscode/launch.json b/.vscode/launch.json index 266d7f2..781b0bd 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -8,11 +8,7 @@ "module": "uvicorn", "args": [ "discord_rss_bot.main:app", - "--reload", - "--host", - "0.0.0.0", - "--port", - "3000", + "--reload" ], "jinja": true, "justMyCode": true diff --git a/.vscode/settings.json b/.vscode/settings.json index 8bd0ea9..f929fff 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,19 +1,13 @@ { "cSpell.words": [ - "autoexport", "botuser", "Genshins", - "healthcheck", - "Hoyolab", "levelname", "Lovinator", "markdownified", "markdownify", "pipx", - "pyproject", - "thead", - "thelovinator", - "uvicorn" + "thead" ], "python.analysis.typeCheckingMode": "basic" } diff --git a/Dockerfile b/Dockerfile index f27eed9..72714a0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,15 +1,14 @@ -FROM python:3.14-slim +FROM python:3.13-slim COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/ RUN useradd --create-home botuser && \ mkdir -p /home/botuser/discord-rss-bot/ /home/botuser/.local/share/discord_rss_bot/ && \ chown -R botuser:botuser /home/botuser/ USER botuser WORKDIR /home/botuser/discord-rss-bot +COPY --chown=botuser:botuser requirements.txt /home/botuser/discord-rss-bot/ RUN --mount=type=cache,target=/root/.cache/uv \ --mount=type=bind,source=pyproject.toml,target=pyproject.toml \ uv sync --no-install-project -COPY --chown=botuser:botuser discord_rss_bot/ /home/botuser/discord-rss-bot/discord_rss_bot/ EXPOSE 5000 VOLUME ["/home/botuser/.local/share/discord_rss_bot/"] -HEALTHCHECK --interval=10m --timeout=5s CMD ["uv", "run", "./discord_rss_bot/healthcheck.py"] CMD ["uv", "run", "uvicorn", "discord_rss_bot.main:app", "--host=0.0.0.0", "--port=5000", "--proxy-headers", "--forwarded-allow-ips='*'", "--log-level", "debug"] diff --git a/README.md b/README.md index 09b6bbc..849fb98 100644 --- a/README.md +++ b/README.md @@ -2,25 +2,8 @@ Subscribe to RSS feeds and get updates to a Discord webhook. -Email: [tlovinator@gmail.com](mailto:tlovinator@gmail.com) - -Discord: TheLovinator#9276 - -## Features - -- Subscribe to RSS feeds and get updates to a Discord webhook. -- Web interface to manage subscriptions. -- Customizable message format for each feed. -- Choose between Discord embed or plain text. -- Regex filters for RSS feeds. -- Blacklist/whitelist words in the title/description/author/etc. -- Set different update frequencies for each feed or use a global default. -- Gets extra information from APIs if available, currently for: - - [https://feeds.c3kay.de/](https://feeds.c3kay.de/) - - Genshin Impact News - - Honkai Impact 3rd News - - Honkai Starrail News - - Zenless Zone Zero News +> [!NOTE] +> You should look at [MonitoRSS](https://github.com/synzen/monitorss) for a more feature-rich project. ## Installation @@ -30,7 +13,9 @@ or [install directly on your computer](#install-directly-on-your-computer). ### Docker - Open a terminal in the repository folder. - - Shift + right-click in the folder and `Open PowerShell window here` + - Windows 10: Shift + right-click in the folder and select `Open PowerShell window here` + - Windows 11: Shift + right-click in the folder and Show more options + and `Open PowerShell window here` - Run the Docker Compose file: - `docker-compose up` - You can stop the bot with Ctrl + c. @@ -44,68 +29,34 @@ or [install directly on your computer](#install-directly-on-your-computer). ### Install directly on your computer -- Install the latest of [uv](https://docs.astral.sh/uv/#installation): - - `powershell -ExecutionPolicy ByPass -c "irm | iex"` +This is not recommended if you don't have an init system (e.g., systemd) + +- Install the latest version of needed software: + - [Python](https://www.python.org/) + - You should use the latest version. + - You want to add Python to your PATH. + - Windows: Find `App execution aliases` and disable python.exe and python3.exe + - [Poetry](https://python-poetry.org/docs/master/#installation) + - Windows: You have to add `%appdata%\Python\Scripts` to your PATH for Poetry to work. - Download the project from GitHub with Git or download the [ZIP](https://github.com/TheLovinator1/discord-rss-bot/archive/refs/heads/master.zip). - If you want to update the bot, you can run `git pull` in the project folder or download the ZIP again. - Open a terminal in the repository folder. - - Shift + right-click in the folder and `Open PowerShell window here` + - Windows 10: Shift + right-click in the folder and select `Open PowerShell window here` + - Windows 11: Shift + right-click in the folder and Show more options + and `Open PowerShell window here` +- Install requirements: + - Type `poetry install` into the PowerShell window. Make sure you are + in the repository folder where the [pyproject.toml](pyproject.toml) file is located. + - (You may have to restart your terminal if it can't find the `poetry` command. Also double check it is in + your PATH.) - Start the bot: - - Type `uv run discord_rss_bot/main.py` into the PowerShell window. + - Type `poetry run python discord_rss_bot/main.py` into the PowerShell window. - You can stop the bot with Ctrl + c. -- Bot is now running on port 3000. -- You should run this bot behind a reverse proxy like [Caddy](https://caddyserver.com/) - or [Nginx](https://www.nginx.com/) if you want to access it from the internet. Remember to add authentication. -- You can access the web interface at `http://localhost:3000/`. -- To run automatically on boot: - - Use [Windows Task Scheduler](https://en.wikipedia.org/wiki/Windows_Task_Scheduler). - - Or add a shortcut to `%userprofile%\AppData\Roaming\Microsoft\Windows\Start Menu\Programs\Startup`. +Note: You will need to run `poetry install` again if [poetry.lock](poetry.lock) has been modified. -## Git Backup (State Version Control) +## Contact -The bot can commit every configuration change (adding/removing feeds, webhook -changes, blacklist/whitelist updates) to a separate private Git repository so -you get a full, auditable history of state changes — similar to `etckeeper`. - -### Configuration - -Set the following environment variables (e.g. in `docker-compose.yml` or a -`.env` file): - -| Variable | Required | Description | -| ------------------- | -------- | ----------------------------------------------------------------------------------------------------------------------------------- | -| `GIT_BACKUP_PATH` | Yes | Local path where the backup git repository is stored. The bot will initialise it automatically if it does not yet exist. | -| `GIT_BACKUP_REMOTE` | No | Remote URL to push to after each commit (e.g. `git@github.com:you/private-config.git`). Leave unset to keep the history local only. | - -### What is backed up - -After every relevant change a `state.json` file is written and committed. -The file contains: - -- All feed URLs together with their webhook URL, custom message, embed - settings, and any blacklist/whitelist filters. -- The global list of Discord webhooks. - -### Docker example - -```yaml -services: - discord-rss-bot: - image: ghcr.io/thelovinator1/discord-rss-bot:latest - volumes: - - ./data:/data - environment: - - GIT_BACKUP_PATH=/data/backup - - GIT_BACKUP_REMOTE=git@github.com:you/private-config.git -``` - -For SSH-based remotes mount your SSH key into the container and make sure the -host key is trusted, e.g.: - -```yaml - volumes: - - ./data:/data - - ~/.ssh:/root/.ssh:ro -``` +Email: [mailto:tlovinator@gmail.com](tlovinator@gmail.com) +Discord: TheLovinator#9276 diff --git a/discord_rss_bot/custom_filters.py b/discord_rss_bot/custom_filters.py index fd9461c..99fe77d 100644 --- a/discord_rss_bot/custom_filters.py +++ b/discord_rss_bot/custom_filters.py @@ -4,14 +4,15 @@ import urllib.parse from functools import lru_cache from typing import TYPE_CHECKING -from discord_rss_bot.filter.blacklist import entry_should_be_skipped -from discord_rss_bot.filter.blacklist import feed_has_blacklist_tags -from discord_rss_bot.filter.whitelist import has_white_tags -from discord_rss_bot.filter.whitelist import should_be_sent +from discord_rss_bot.filter.blacklist import entry_should_be_skipped, feed_has_blacklist_tags +from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent +from discord_rss_bot.settings import get_reader if TYPE_CHECKING: - from reader import Entry - from reader import Reader + from reader import Entry, Reader + +# Our reader +reader: Reader = get_reader() @lru_cache @@ -30,12 +31,11 @@ def encode_url(url_to_quote: str) -> str: return urllib.parse.quote(string=url_to_quote) if url_to_quote else "" -def entry_is_whitelisted(entry_to_check: Entry, reader: Reader) -> bool: +def entry_is_whitelisted(entry_to_check: Entry) -> bool: """Check if the entry is whitelisted. Args: entry_to_check: The feed to check. - reader: Custom Reader instance. Returns: bool: True if the feed is whitelisted, False otherwise. @@ -44,12 +44,11 @@ def entry_is_whitelisted(entry_to_check: Entry, reader: Reader) -> bool: return bool(has_white_tags(reader, entry_to_check.feed) and should_be_sent(reader, entry_to_check)) -def entry_is_blacklisted(entry_to_check: Entry, reader: Reader) -> bool: +def entry_is_blacklisted(entry_to_check: Entry) -> bool: """Check if the entry is blacklisted. Args: entry_to_check: The feed to check. - reader: Custom Reader instance. Returns: bool: True if the feed is blacklisted, False otherwise. diff --git a/discord_rss_bot/custom_message.py b/discord_rss_bot/custom_message.py index 1626e39..9cb03e5 100644 --- a/discord_rss_bot/custom_message.py +++ b/discord_rss_bot/custom_message.py @@ -1,27 +1,18 @@ from __future__ import annotations -import html import json import logging -import re from dataclasses import dataclass -from typing import TYPE_CHECKING -from bs4 import BeautifulSoup -from bs4 import Tag +from bs4 import BeautifulSoup, Tag from markdownify import markdownify +from reader import Entry, Feed, Reader, TagNotFoundError from discord_rss_bot.is_url_valid import is_url_valid - -if TYPE_CHECKING: - from reader import Entry - from reader import Feed - from reader import Reader +from discord_rss_bot.settings import get_reader logger: logging.Logger = logging.getLogger(__name__) -DISCORD_TIMESTAMP_TAG_RE: re.Pattern[str] = re.compile(r"") - @dataclass(slots=True) class CustomEmbed: @@ -55,80 +46,18 @@ def try_to_replace(custom_message: str, template: str, replace_with: str) -> str return custom_message -def _preserve_discord_timestamp_tags(text: str) -> tuple[str, dict[str, str]]: - """Replace Discord timestamp tags with placeholders before markdown conversion. - - Args: - text: The text to replace tags in. - - Returns: - The text with Discord timestamp tags replaced by placeholders and a mapping of placeholders to original tags. - """ - replacements: dict[str, str] = {} - - def replace_match(match: re.Match[str]) -> str: - placeholder: str = f"DISCORDTIMESTAMPPLACEHOLDER{len(replacements)}" - replacements[placeholder] = match.group(0) - return placeholder - - return DISCORD_TIMESTAMP_TAG_RE.sub(replace_match, text), replacements - - -def _restore_discord_timestamp_tags(text: str, replacements: dict[str, str]) -> str: - """Restore preserved Discord timestamp tags after markdown conversion. - - Args: - text: The text to restore tags in. - replacements: A mapping of placeholders to original Discord timestamp tags. - - Returns: - The text with placeholders replaced by the original Discord timestamp tags. - """ - for placeholder, original_value in replacements.items(): - text = text.replace(placeholder, original_value) - return text - - -def format_entry_html_for_discord(text: str) -> str: - """Convert entry HTML to Discord-friendly markdown while preserving Discord timestamp tags. - - Args: - text: The HTML text to format. - - Returns: - The formatted text with Discord timestamp tags preserved. - """ - if not text: - return "" - - unescaped_text: str = html.unescape(text) - protected_text, replacements = _preserve_discord_timestamp_tags(unescaped_text) - formatted_text: str = markdownify( - html=protected_text, - strip=["img", "table", "td", "tr", "tbody", "thead"], - escape_misc=False, - heading_style="ATX", - ) - - if "[https://" in formatted_text or "[https://www." in formatted_text: - formatted_text = formatted_text.replace("[https://", "[") - formatted_text = formatted_text.replace("[https://www.", "[") - - return _restore_discord_timestamp_tags(formatted_text, replacements) - - -def replace_tags_in_text_message(entry: Entry, reader: Reader) -> str: +def replace_tags_in_text_message(entry: Entry) -> str: """Replace tags in custom_message. Args: entry: The entry to get the tags from. - reader: Custom Reader instance. Returns: Returns the custom_message with the tags replaced. """ feed: Feed = entry.feed - custom_message: str = get_custom_message(feed=feed, reader=reader) + custom_reader: Reader = get_reader() + custom_message: str = get_custom_message(feed=feed, custom_reader=custom_reader) content = "" if entry.content: @@ -139,8 +68,16 @@ def replace_tags_in_text_message(entry: Entry, reader: Reader) -> str: first_image: str = get_first_image(summary, content) - summary = format_entry_html_for_discord(summary) - content = format_entry_html_for_discord(content) + summary = markdownify(html=summary, strip=["img", "table", "td", "tr", "tbody", "thead"], escape_misc=False) + content = markdownify(html=content, strip=["img", "table", "td", "tr", "tbody", "thead"], escape_misc=False) + + if "[https://" in content or "[https://www." in content: + content = content.replace("[https://", "[") + content = content.replace("[https://www.", "[") + + if "[https://" in summary or "[https://www." in summary: + summary = summary.replace("[https://", "[") + summary = summary.replace("[https://www.", "[") feed_added: str = feed.added.strftime("%Y-%m-%d %H:%M:%S") if feed.added else "Never" feed_last_exception: str = feed.last_exception.value_str if feed.last_exception else "" @@ -215,7 +152,14 @@ def get_first_image(summary: str | None, content: str | None) -> str: logger.warning("Invalid URL: %s", src) continue - return str(image.attrs["src"]) + # Genshins first image is a divider, so we ignore it. + # https://hyl-static-res-prod.hoyolab.com/divider_config/PC/line_3.png + skip_images: list[str] = [ + "https://img-os-static.hoyolab.com/divider_config/", + "https://hyl-static-res-prod.hoyolab.com/divider_config/", + ] + if not str(image.attrs["src"]).startswith(tuple(skip_images)): + return str(image.attrs["src"]) if summary and (images := BeautifulSoup(summary, features="lxml").find_all("img")): for image in images: if not isinstance(image, Tag) or "src" not in image.attrs: @@ -226,22 +170,24 @@ def get_first_image(summary: str | None, content: str | None) -> str: logger.warning("Invalid URL: %s", image.attrs["src"]) continue - return str(image.attrs["src"]) + # Genshins first image is a divider, so we ignore it. + if not str(image.attrs["src"]).startswith("https://img-os-static.hoyolab.com/divider_config"): + return str(image.attrs["src"]) return "" -def replace_tags_in_embed(feed: Feed, entry: Entry, reader: Reader) -> CustomEmbed: +def replace_tags_in_embed(feed: Feed, entry: Entry) -> CustomEmbed: """Replace tags in embed. Args: feed: The feed to get the tags from. entry: The entry to get the tags from. - reader: Custom Reader instance. Returns: Returns the embed with the tags replaced. """ - embed: CustomEmbed = get_embed(feed=feed, reader=reader) + custom_reader: Reader = get_reader() + embed: CustomEmbed = get_embed(feed=feed, custom_reader=custom_reader) content = "" if entry.content: @@ -252,8 +198,16 @@ def replace_tags_in_embed(feed: Feed, entry: Entry, reader: Reader) -> CustomEmb first_image: str = get_first_image(summary, content) - summary = format_entry_html_for_discord(summary) - content = format_entry_html_for_discord(content) + summary = markdownify(html=summary, strip=["img", "table", "td", "tr", "tbody", "thead"], escape_misc=False) + content = markdownify(html=content, strip=["img", "table", "td", "tr", "tbody", "thead"], escape_misc=False) + + if "[https://" in content or "[https://www." in content: + content = content.replace("[https://", "[") + content = content.replace("[https://www.", "[") + + if "[https://" in summary or "[https://www." in summary: + summary = summary.replace("[https://", "[") + summary = summary.replace("[https://www.", "[") feed_added: str = feed.added.strftime("%Y-%m-%d %H:%M:%S") if feed.added else "Never" feed_last_updated: str = feed.last_updated.strftime("%Y-%m-%d %H:%M:%S") if feed.last_updated else "Never" @@ -332,29 +286,31 @@ def _replace_embed_tags(embed: CustomEmbed, template: str, replace_with: str) -> embed.footer_icon_url = try_to_replace(embed.footer_icon_url, template, replace_with) -def get_custom_message(reader: Reader, feed: Feed) -> str: +def get_custom_message(custom_reader: Reader, feed: Feed) -> str: """Get custom_message tag from feed. Args: - reader: What Reader to use. + custom_reader: What Reader to use. feed: The feed to get the tag from. Returns: Returns the contents from the custom_message tag. """ try: - custom_message: str = str(reader.get_tag(feed, "custom_message", "")) + custom_message: str = str(custom_reader.get_tag(feed, "custom_message")) + except TagNotFoundError: + custom_message = "" except ValueError: custom_message = "" return custom_message -def save_embed(reader: Reader, feed: Feed, embed: CustomEmbed) -> None: +def save_embed(custom_reader: Reader, feed: Feed, embed: CustomEmbed) -> None: """Set embed tag in feed. Args: - reader: What Reader to use. + custom_reader: What Reader to use. feed: The feed to set the tag in. embed: The embed to set. """ @@ -370,20 +326,20 @@ def save_embed(reader: Reader, feed: Feed, embed: CustomEmbed) -> None: "footer_text": embed.footer_text, "footer_icon_url": embed.footer_icon_url, } - reader.set_tag(feed, "embed", json.dumps(embed_dict)) # pyright: ignore[reportArgumentType] + custom_reader.set_tag(feed, "embed", json.dumps(embed_dict)) # pyright: ignore[reportArgumentType] -def get_embed(reader: Reader, feed: Feed) -> CustomEmbed: +def get_embed(custom_reader: Reader, feed: Feed) -> CustomEmbed: """Get embed tag from feed. Args: - reader: What Reader to use. + custom_reader: What Reader to use. feed: The feed to get the tag from. Returns: Returns the contents from the embed tag. """ - embed = reader.get_tag(feed, "embed", "") + embed = custom_reader.get_tag(feed, "embed", "") if embed: if not isinstance(embed, str): diff --git a/discord_rss_bot/feeds.py b/discord_rss_bot/feeds.py index 225e7ff..ccb0a14 100644 --- a/discord_rss_bot/feeds.py +++ b/discord_rss_bot/feeds.py @@ -1,45 +1,25 @@ from __future__ import annotations import datetime -import json import logging -import os import pprint -import re from typing import TYPE_CHECKING -from typing import Any -from urllib.parse import ParseResult -from urllib.parse import urlparse -import tldextract -from discord_webhook import DiscordEmbed -from discord_webhook import DiscordWebhook +from discord_webhook import DiscordEmbed, DiscordWebhook from fastapi import HTTPException -from markdownify import markdownify -from reader import Entry -from reader import EntryNotFoundError -from reader import Feed -from reader import FeedExistsError -from reader import FeedNotFoundError -from reader import Reader -from reader import ReaderError -from reader import StorageError +from reader import Entry, EntryNotFoundError, Feed, FeedExistsError, Reader, ReaderError, StorageError, TagNotFoundError -from discord_rss_bot.custom_message import CustomEmbed -from discord_rss_bot.custom_message import get_custom_message -from discord_rss_bot.custom_message import replace_tags_in_embed -from discord_rss_bot.custom_message import replace_tags_in_text_message +from discord_rss_bot.custom_message import ( + CustomEmbed, + get_custom_message, + replace_tags_in_embed, + replace_tags_in_text_message, +) from discord_rss_bot.filter.blacklist import entry_should_be_skipped -from discord_rss_bot.filter.whitelist import has_white_tags -from discord_rss_bot.filter.whitelist import should_be_sent -from discord_rss_bot.hoyolab_api import create_hoyolab_webhook -from discord_rss_bot.hoyolab_api import extract_post_id_from_hoyolab_url -from discord_rss_bot.hoyolab_api import fetch_hoyolab_post -from discord_rss_bot.hoyolab_api import is_c3kay_feed +from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent from discord_rss_bot.is_url_valid import is_url_valid -from discord_rss_bot.settings import default_custom_embed -from discord_rss_bot.settings import default_custom_message -from discord_rss_bot.settings import get_reader +from discord_rss_bot.missing_tags import add_missing_tags +from discord_rss_bot.settings import default_custom_message, get_reader if TYPE_CHECKING: from collections.abc import Iterable @@ -49,159 +29,53 @@ if TYPE_CHECKING: logger: logging.Logger = logging.getLogger(__name__) -def extract_domain(url: str) -> str: # noqa: PLR0911 - """Extract the domain name from a URL. - - Args: - url: The URL to extract the domain from. - - Returns: - str: The domain name, formatted for display. - """ - # Check for empty URL first - if not url: - return "Other" - - try: - # Special handling for YouTube feeds - if "youtube.com/feeds/videos.xml" in url: - return "YouTube" - - # Special handling for Reddit feeds - if "reddit.com" in url and ".rss" in url: - return "Reddit" - - # Parse the URL and extract the domain - parsed_url: ParseResult = urlparse(url) - domain: str = parsed_url.netloc - - # If we couldn't extract a domain, return "Other" - if not domain: - return "Other" - - # Remove www. prefix if present - domain = re.sub(r"^www\.", "", domain) - - # Special handling for common domains - domain_mapping: dict[str, str] = {"github.com": "GitHub"} - - if domain in domain_mapping: - return domain_mapping[domain] - - # Use tldextract to get the domain (SLD) - ext = tldextract.extract(url) - if ext.domain: - return ext.domain.capitalize() - return domain.capitalize() - except (ValueError, AttributeError, TypeError) as e: - logger.warning("Error extracting domain from %s: %s", url, e) - return "Other" - - -def send_entry_to_discord(entry: Entry, reader: Reader) -> str | None: # noqa: C901 +def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> str | None: """Send a single entry to Discord. Args: entry: The entry to send to Discord. - reader: The reader to use. + custom_reader: The reader to use. If None, the default reader will be used. Returns: str | None: The error message if there was an error, otherwise None. """ + # Get the default reader if we didn't get a custom one. + reader: Reader = get_reader() if custom_reader is None else custom_reader + # Get the webhook URL for the entry. webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook", "")) if not webhook_url: return "No webhook URL found." - # If https://discord.com/quests/ is in the URL, send a separate message with the URL. - send_discord_quest_notification(entry, webhook_url, reader=reader) - - # Check if this is a c3kay feed - if is_c3kay_feed(entry.feed.url): - entry_link: str | None = entry.link - if entry_link: - post_id: str | None = extract_post_id_from_hoyolab_url(entry_link) - if post_id: - post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id) - if post_data: - webhook = create_hoyolab_webhook(webhook_url, entry, post_data) - execute_webhook(webhook, entry, reader=reader) - return None - logger.warning( - "Failed to create Hoyolab webhook for feed %s, falling back to regular processing", - entry.feed.url, - ) - else: - logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url) - webhook_message: str = "" # Try to get the custom message for the feed. If the user has none, we will use the default message. # This has to be a string for some reason so don't change it to "not custom_message.get_custom_message()" if get_custom_message(reader, entry.feed) != "": # noqa: PLC1901 - webhook_message: str = replace_tags_in_text_message(entry=entry, reader=reader) + webhook_message: str = replace_tags_in_text_message(entry=entry) if not webhook_message: webhook_message = "No message found." # Create the webhook. try: - should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed", True)) + should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed")) + except TagNotFoundError: + logger.exception("No should_send_embed tag found for feed: %s", entry.feed.url) + should_send_embed = True except StorageError: logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url) should_send_embed = True - # YouTube feeds should never use embeds - if is_youtube_feed(entry.feed.url): - should_send_embed = False - if should_send_embed: - webhook = create_embed_webhook(webhook_url, entry, reader=reader) + webhook = create_embed_webhook(webhook_url, entry) else: webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True) - execute_webhook(webhook, entry, reader=reader) + execute_webhook(webhook, entry) return None -def send_discord_quest_notification(entry: Entry, webhook_url: str, reader: Reader) -> None: - """Send a separate message to Discord if the entry is a quest notification.""" - quest_regex: re.Pattern[str] = re.compile(r"https://discord\.com/quests/\d+") - - def send_notification(quest_url: str) -> None: - """Helper function to send quest notification to Discord.""" - logger.info("Sending quest notification to Discord: %s", quest_url) - webhook = DiscordWebhook( - url=webhook_url, - content=quest_url, - rate_limit_retry=True, - ) - execute_webhook(webhook, entry, reader=reader) - - # Iterate through the content of the entry - for content in entry.content: - if content.type == "text" and content.value: - match = quest_regex.search(content.value) - if match: - send_notification(match.group(0)) - return - - elif content.type == "text/html" and content.value: - # Convert HTML to text and check for quest links - text_value = markdownify( - html=content.value, - strip=["img", "table", "td", "tr", "tbody", "thead"], - escape_misc=False, - heading_style="ATX", - ) - match: re.Match[str] | None = quest_regex.search(text_value) - if match: - send_notification(match.group(0)) - return - - logger.info("No quest notification found in entry: %s", entry.id) - - def set_description(custom_embed: CustomEmbed, discord_embed: DiscordEmbed) -> None: """Set the description of the embed. @@ -234,17 +108,12 @@ def set_title(custom_embed: CustomEmbed, discord_embed: DiscordEmbed) -> None: discord_embed.set_title(embed_title) if embed_title else None -def create_embed_webhook( # noqa: C901 - webhook_url: str, - entry: Entry, - reader: Reader, -) -> DiscordWebhook: +def create_embed_webhook(webhook_url: str, entry: Entry) -> DiscordWebhook: """Create a webhook with an embed. Args: webhook_url (str): The webhook URL. entry (Entry): The entry to send to Discord. - reader (Reader): The Reader instance to use for getting embed data. Returns: DiscordWebhook: The webhook with the embed. @@ -253,7 +122,7 @@ def create_embed_webhook( # noqa: C901 feed: Feed = entry.feed # Get the embed data from the database. - custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry, reader=reader) + custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry) discord_embed: DiscordEmbed = DiscordEmbed() @@ -315,14 +184,13 @@ def get_webhook_url(reader: Reader, entry: Entry) -> str: str: The webhook URL. """ try: - webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook", "")) + webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook")) + except TagNotFoundError: + logger.exception("No webhook URL found for feed: %s", entry.feed.url) + return "" except StorageError: logger.exception("Storage error getting webhook URL for feed: %s", entry.feed.url) return "" - - if not webhook_url: - logger.error("No webhook URL found for feed: %s", entry.feed.url) - return "" return webhook_url @@ -341,53 +209,44 @@ def set_entry_as_read(reader: Reader, entry: Entry) -> None: logger.exception("Error setting entry to read: %s", entry.id) -def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: # noqa: C901, PLR0912 +def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: """Send entries to Discord. If response was not ok, we will log the error and mark the entry as unread, so it will be sent again next time. Args: - reader: If we should use a custom reader instead of the default one. + custom_reader: If we should use a custom reader instead of the default one. feed: The feed to send to Discord. do_once: If we should only send one entry. This is used in the test. """ - logger.info("Starting to send entries to Discord.") # Get the default reader if we didn't get a custom one. - effective_reader: Reader = get_reader() if reader is None else reader + reader: Reader = get_reader() if custom_reader is None else custom_reader # Check for new entries for every feed. - effective_reader.update_feeds( - scheduled=True, - workers=os.cpu_count() or 1, - ) + reader.update_feeds() # Loop through the unread entries. - entries: Iterable[Entry] = effective_reader.get_entries(feed=feed, read=False) + entries: Iterable[Entry] = reader.get_entries(feed=feed, read=False) for entry in entries: - set_entry_as_read(effective_reader, entry) + set_entry_as_read(reader, entry) if entry.added < datetime.datetime.now(tz=entry.added.tzinfo) - datetime.timedelta(days=1): logger.info("Entry is older than 24 hours: %s from %s", entry.id, entry.feed.url) continue - webhook_url: str = get_webhook_url(effective_reader, entry) + webhook_url: str = get_webhook_url(reader, entry) if not webhook_url: logger.info("No webhook URL found for feed: %s", entry.feed.url) continue - should_send_embed: bool = should_send_embed_check(effective_reader, entry) - - # Youtube feeds only need to send the link - if is_youtube_feed(entry.feed.url): - should_send_embed = False - + should_send_embed: bool = should_send_embed_check(reader, entry) if should_send_embed: - webhook = create_embed_webhook(webhook_url, entry, reader=effective_reader) + webhook = create_embed_webhook(webhook_url, entry) else: # If the user has set the custom message to an empty string, we will use the default message, otherwise we # will use the custom message. - if get_custom_message(effective_reader, entry.feed) != "": # noqa: PLC1901 - webhook_message = replace_tags_in_text_message(entry, reader=effective_reader) + if get_custom_message(reader, entry.feed) != "": # noqa: PLC1901 + webhook_message = replace_tags_in_text_message(entry) else: webhook_message: str = str(default_custom_message) @@ -397,35 +256,19 @@ def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, d webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True) # Check if the entry is blacklisted, and if it is, we will skip it. - if entry_should_be_skipped(effective_reader, entry): + if entry_should_be_skipped(reader, entry): logger.info("Entry was blacklisted: %s", entry.id) continue # Check if the feed has a whitelist, and if it does, check if the entry is whitelisted. - if has_white_tags(effective_reader, entry.feed) and not should_be_sent(effective_reader, entry): - logger.info("Entry was not whitelisted: %s", entry.id) + if has_white_tags(reader, entry.feed): + if should_be_sent(reader, entry): + execute_webhook(webhook, entry) + return continue - # Use a custom webhook for Hoyolab feeds. - if is_c3kay_feed(entry.feed.url): - entry_link: str | None = entry.link - if entry_link: - post_id: str | None = extract_post_id_from_hoyolab_url(entry_link) - if post_id: - post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id) - if post_data: - webhook = create_hoyolab_webhook(webhook_url, entry, post_data) - execute_webhook(webhook, entry, reader=effective_reader) - return - logger.warning( - "Failed to create Hoyolab webhook for feed %s, falling back to regular processing", - entry.feed.url, - ) - else: - logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url) - # Send the entry to Discord as it is not blacklisted or feed has a whitelist. - execute_webhook(webhook, entry, reader=effective_reader) + execute_webhook(webhook, entry) # If we only want to send one entry, we will break the loop. This is used when testing this function. if do_once: @@ -433,27 +276,14 @@ def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, d break -def execute_webhook(webhook: DiscordWebhook, entry: Entry, reader: Reader) -> None: +def execute_webhook(webhook: DiscordWebhook, entry: Entry) -> None: """Execute the webhook. Args: webhook (DiscordWebhook): The webhook to execute. entry (Entry): The entry to send to Discord. - reader (Reader): The Reader instance to use for checking feed status. """ - # If the feed has been paused or deleted, we will not send the entry to Discord. - entry_feed: Feed = entry.feed - if entry_feed.updates_enabled is False: - logger.warning("Feed is paused, not sending entry to Discord: %s", entry_feed.url) - return - - try: - reader.get_feed(entry_feed.url) - except FeedNotFoundError: - logger.warning("Feed not found in reader, not sending entry to Discord: %s", entry_feed.url) - return - response: Response = webhook.execute() if response.status_code not in {200, 204}: msg: str = f"Error sending entry to Discord: {response.text}\n{pprint.pformat(webhook.json)}" @@ -465,18 +295,6 @@ def execute_webhook(webhook: DiscordWebhook, entry: Entry, reader: Reader) -> No logger.info("Sent entry to Discord: %s", entry.id) -def is_youtube_feed(feed_url: str) -> bool: - """Check if the feed is a YouTube feed. - - Args: - feed_url: The feed URL to check. - - Returns: - bool: True if the feed is a YouTube feed, False otherwise. - """ - return "youtube.com/feeds/videos.xml" in feed_url - - def should_send_embed_check(reader: Reader, entry: Entry) -> bool: """Check if we should send an embed to Discord. @@ -487,12 +305,11 @@ def should_send_embed_check(reader: Reader, entry: Entry) -> bool: Returns: bool: True if we should send an embed, False otherwise. """ - # YouTube feeds should never use embeds - only links - if is_youtube_feed(entry.feed.url): - return False - try: - should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed", True)) + should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed")) + except TagNotFoundError: + logger.exception("No should_send_embed tag found for feed: %s", entry.feed.url) + should_send_embed = True except ReaderError: logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url) should_send_embed = True @@ -516,7 +333,7 @@ def truncate_webhook_message(webhook_message: str) -> str: return webhook_message -def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None: # noqa: C901 +def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None: """Add a new feed, update it and mark every entry as read. Args: @@ -547,7 +364,9 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None: reader.add_feed(clean_feed_url) except FeedExistsError: # Add the webhook to an already added feed if it doesn't have a webhook instead of trying to create a new. - if not reader.get_tag(clean_feed_url, "webhook", ""): + try: + reader.get_tag(clean_feed_url, "webhook") + except TagNotFoundError: reader.set_tag(clean_feed_url, "webhook", webhook_url) # pyright: ignore[reportArgumentType] except ReaderError as e: raise HTTPException(status_code=404, detail=f"Error adding feed: {e}") from e @@ -572,8 +391,7 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None: # This is the default message that will be sent to Discord. reader.set_tag(clean_feed_url, "custom_message", default_custom_message) # pyright: ignore[reportArgumentType] - # Set the default embed tag when creating the feed - reader.set_tag(clean_feed_url, "embed", json.dumps(default_custom_embed)) - # Update the full-text search index so our new feed is searchable. reader.update_search() + + add_missing_tags(reader) diff --git a/discord_rss_bot/filter/blacklist.py b/discord_rss_bot/filter/blacklist.py index 8260993..808d7c9 100644 --- a/discord_rss_bot/filter/blacklist.py +++ b/discord_rss_bot/filter/blacklist.py @@ -2,119 +2,59 @@ from __future__ import annotations from typing import TYPE_CHECKING -from discord_rss_bot.filter.utils import is_regex_match from discord_rss_bot.filter.utils import is_word_in_text if TYPE_CHECKING: - from reader import Entry - from reader import Feed - from reader import Reader + from reader import Entry, Feed, Reader -def feed_has_blacklist_tags(reader: Reader, feed: Feed) -> bool: +def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool: """Return True if the feed has blacklist tags. The following tags are checked: - - blacklist_author - - blacklist_content - - blacklist_summary - blacklist_title - - regex_blacklist_author - - regex_blacklist_content - - regex_blacklist_summary - - regex_blacklist_title + - blacklist_summary + - blacklist_content. Args: - reader: The reader. + custom_reader: The reader. feed: The feed to check. Returns: bool: If the feed has any of the tags. """ - blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", "")).strip() - blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", "")).strip() - blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", "")).strip() - blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", "")).strip() + blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", "")) + blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", "")) + blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", "")) - regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", "")).strip() - regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", "")).strip() - regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", "")).strip() - regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", "")).strip() - - return bool( - blacklist_title - or blacklist_author - or blacklist_content - or blacklist_summary - or regex_blacklist_author - or regex_blacklist_content - or regex_blacklist_summary - or regex_blacklist_title, - ) + return bool(blacklist_title or blacklist_summary or blacklist_content) -def entry_should_be_skipped(reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 +def entry_should_be_skipped(custom_reader: Reader, entry: Entry) -> bool: """Return True if the entry is in the blacklist. Args: - reader: The reader. + custom_reader: The reader. entry: The entry to check. Returns: bool: If the entry is in the blacklist. """ - feed = entry.feed - - blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", "")).strip() - blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", "")).strip() - blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", "")).strip() - blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", "")).strip() - - regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", "")).strip() - regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", "")).strip() - regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", "")).strip() - regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", "")).strip() + blacklist_title: str = str(custom_reader.get_tag(entry.feed, "blacklist_title", "")) + blacklist_summary: str = str(custom_reader.get_tag(entry.feed, "blacklist_summary", "")) + blacklist_content: str = str(custom_reader.get_tag(entry.feed, "blacklist_content", "")) + blacklist_author: str = str(custom_reader.get_tag(entry.feed, "blacklist_author", "")) # TODO(TheLovinator): Also add support for entry_text and more. - # Check regular blacklist if entry.title and blacklist_title and is_word_in_text(blacklist_title, entry.title): return True if entry.summary and blacklist_summary and is_word_in_text(blacklist_summary, entry.summary): return True - if ( - entry.content - and entry.content[0].value - and blacklist_content - and is_word_in_text(blacklist_content, entry.content[0].value) - ): - return True if entry.author and blacklist_author and is_word_in_text(blacklist_author, entry.author): return True - if ( - entry.content - and entry.content[0].value - and blacklist_content - and is_word_in_text(blacklist_content, entry.content[0].value) - ): - return True - - # Check regex blacklist - if entry.title and regex_blacklist_title and is_regex_match(regex_blacklist_title, entry.title): - return True - if entry.summary and regex_blacklist_summary and is_regex_match(regex_blacklist_summary, entry.summary): - return True - if ( - entry.content - and entry.content[0].value - and regex_blacklist_content - and is_regex_match(regex_blacklist_content, entry.content[0].value) - ): - return True - if entry.author and regex_blacklist_author and is_regex_match(regex_blacklist_author, entry.author): - return True return bool( entry.content and entry.content[0].value - and regex_blacklist_content - and is_regex_match(regex_blacklist_content, entry.content[0].value), + and blacklist_content + and is_word_in_text(blacklist_content, entry.content[0].value), ) diff --git a/discord_rss_bot/filter/utils.py b/discord_rss_bot/filter/utils.py index ff93e59..090518d 100644 --- a/discord_rss_bot/filter/utils.py +++ b/discord_rss_bot/filter/utils.py @@ -1,10 +1,7 @@ from __future__ import annotations -import logging import re -logger: logging.Logger = logging.getLogger(__name__) - def is_word_in_text(word_string: str, text: str) -> bool: """Check if any of the words are in the text. @@ -23,50 +20,3 @@ def is_word_in_text(word_string: str, text: str) -> bool: # Check if any pattern matches the text. return any(pattern.search(text) for pattern in patterns) - - -def is_regex_match(regex_string: str, text: str) -> bool: - """Check if any of the regex patterns match the text. - - Args: - regex_string: A string containing regex patterns, separated by newlines or commas. - text: The text to search in. - - Returns: - bool: True if any regex pattern matches the text, otherwise False. - """ - if not regex_string or not text: - return False - - # Split by newlines first, then by commas (for backward compatibility) - regex_list: list[str] = [] - - # First split by newlines - lines: list[str] = regex_string.split("\n") - for line in lines: - stripped_line: str = line.strip() - if stripped_line: - # For backward compatibility, also split by commas if there are any - if "," in stripped_line: - regex_list.extend([part.strip() for part in stripped_line.split(",") if part.strip()]) - else: - regex_list.append(stripped_line) - - # Attempt to compile and apply each regex pattern - for pattern_str in regex_list: - if not pattern_str: - logger.warning("Empty regex pattern found in the list.") - continue - - try: - pattern: re.Pattern[str] = re.compile(pattern_str, re.IGNORECASE) - if pattern.search(text): - logger.info("Regex pattern matched: %s", pattern_str) - return True - except re.error: - logger.warning("Invalid regex pattern: %s", pattern_str) - continue - - logger.info("No regex patterns matched.") - - return False diff --git a/discord_rss_bot/filter/whitelist.py b/discord_rss_bot/filter/whitelist.py index bb5303d..a55a514 100644 --- a/discord_rss_bot/filter/whitelist.py +++ b/discord_rss_bot/filter/whitelist.py @@ -2,105 +2,59 @@ from __future__ import annotations from typing import TYPE_CHECKING -from discord_rss_bot.filter.utils import is_regex_match from discord_rss_bot.filter.utils import is_word_in_text if TYPE_CHECKING: - from reader import Entry - from reader import Feed - from reader import Reader + from reader import Entry, Feed, Reader -def has_white_tags(reader: Reader, feed: Feed) -> bool: +def has_white_tags(custom_reader: Reader, feed: Feed) -> bool: """Return True if the feed has whitelist tags. The following tags are checked: - - regex_whitelist_author - - regex_whitelist_content - - regex_whitelist_summary - - regex_whitelist_title - - whitelist_author - - whitelist_content - - whitelist_summary - whitelist_title + - whitelist_summary + - whitelist_content. Args: - reader: The reader. + custom_reader: The reader. feed: The feed to check. Returns: bool: If the feed has any of the tags. """ - whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", "")).strip() - whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", "")).strip() - whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", "")).strip() - whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", "")).strip() + whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")) + whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")) + whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")) - regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", "")).strip() - regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", "")).strip() - regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", "")).strip() - regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", "")).strip() - - return bool( - whitelist_title - or whitelist_author - or whitelist_content - or whitelist_summary - or regex_whitelist_author - or regex_whitelist_content - or regex_whitelist_summary - or regex_whitelist_title, - ) + return bool(whitelist_title or whitelist_summary or whitelist_content) -def should_be_sent(reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 +def should_be_sent(custom_reader: Reader, entry: Entry) -> bool: """Return True if the entry is in the whitelist. Args: - reader: The reader. + custom_reader: The reader. entry: The entry to check. Returns: bool: If the entry is in the whitelist. """ feed: Feed = entry.feed - # Regular whitelist tags - whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", "")).strip() - whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", "")).strip() - whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", "")).strip() - whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", "")).strip() + whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")) + whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")) + whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")) + whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", "")) - # Regex whitelist tags - regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", "")).strip() - regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", "")).strip() - regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", "")).strip() - regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", "")).strip() - - # Check regular whitelist if entry.title and whitelist_title and is_word_in_text(whitelist_title, entry.title): return True if entry.summary and whitelist_summary and is_word_in_text(whitelist_summary, entry.summary): return True if entry.author and whitelist_author and is_word_in_text(whitelist_author, entry.author): return True - if ( - entry.content - and entry.content[0].value - and whitelist_content - and is_word_in_text(whitelist_content, entry.content[0].value) - ): - return True - - # Check regex whitelist - if entry.title and regex_whitelist_title and is_regex_match(regex_whitelist_title, entry.title): - return True - if entry.summary and regex_whitelist_summary and is_regex_match(regex_whitelist_summary, entry.summary): - return True - if entry.author and regex_whitelist_author and is_regex_match(regex_whitelist_author, entry.author): - return True return bool( entry.content and entry.content[0].value - and regex_whitelist_content - and is_regex_match(regex_whitelist_content, entry.content[0].value), + and whitelist_content + and is_word_in_text(whitelist_content, entry.content[0].value), ) diff --git a/discord_rss_bot/git_backup.py b/discord_rss_bot/git_backup.py deleted file mode 100644 index 49528ec..0000000 --- a/discord_rss_bot/git_backup.py +++ /dev/null @@ -1,243 +0,0 @@ -"""Git backup module for committing bot state changes to a private repository. - -Configure the backup by setting these environment variables: -- ``GIT_BACKUP_PATH``: Local filesystem path for the backup git repository. - When set, the bot will initialise a git repo there (if one doesn't exist) - and commit an export of its state after every relevant change. -- ``GIT_BACKUP_REMOTE``: Optional remote URL (e.g. ``git@github.com:you/private-repo.git``). - When set, every commit is followed by a ``git push`` to this remote. - -The exported state is written as ``state.json`` inside the backup repo. It -contains the list of feeds together with their webhook URL, filter settings -(blacklist / whitelist, regex variants), custom messages and embed settings. -Global webhooks are also included. - -Example docker-compose snippet:: - - environment: - - GIT_BACKUP_PATH=/data/backup - - GIT_BACKUP_REMOTE=git@github.com:you/private-config.git -""" - -from __future__ import annotations - -import json -import logging -import os -import shutil -import subprocess # noqa: S404 -from pathlib import Path -from typing import TYPE_CHECKING -from typing import Any - -if TYPE_CHECKING: - from reader import Reader - -logger: logging.Logger = logging.getLogger(__name__) -GIT_EXECUTABLE: str = shutil.which("git") or "git" - - -type TAG_VALUE = ( - dict[str, str | int | float | bool | dict[str, Any] | list[Any] | None] - | list[str | int | float | bool | dict[str, Any] | list[Any] | None] - | None -) - -# Tags that are exported per-feed (empty values are omitted). -_FEED_TAGS: tuple[str, ...] = ( - "webhook", - "custom_message", - "should_send_embed", - "embed", - "blacklist_title", - "blacklist_summary", - "blacklist_content", - "blacklist_author", - "regex_blacklist_title", - "regex_blacklist_summary", - "regex_blacklist_content", - "regex_blacklist_author", - "whitelist_title", - "whitelist_summary", - "whitelist_content", - "whitelist_author", - "regex_whitelist_title", - "regex_whitelist_summary", - "regex_whitelist_content", - "regex_whitelist_author", - ".reader.update", -) - - -def get_backup_path() -> Path | None: - """Return the configured backup path, or *None* if not configured. - - Returns: - Path to the backup repository, or None if ``GIT_BACKUP_PATH`` is unset. - """ - raw: str = os.environ.get("GIT_BACKUP_PATH", "").strip() - return Path(raw) if raw else None - - -def get_backup_remote() -> str: - """Return the configured remote URL, or an empty string if not set. - - Returns: - The remote URL string from ``GIT_BACKUP_REMOTE``, or ``""`` if unset. - """ - return os.environ.get("GIT_BACKUP_REMOTE", "").strip() - - -def setup_backup_repo(backup_path: Path) -> bool: - """Ensure the backup directory exists and contains a git repository. - - If the directory does not yet contain a ``.git`` folder a new repository is - initialised. A basic git identity is configured locally so that commits - succeed even in environments where a global ``~/.gitconfig`` is absent. - - Args: - backup_path: Local path for the backup repository. - - Returns: - ``True`` if the repository is ready, ``False`` on any error. - """ - try: - backup_path.mkdir(parents=True, exist_ok=True) - git_dir: Path = backup_path / ".git" - if not git_dir.exists(): - subprocess.run([GIT_EXECUTABLE, "init", str(backup_path)], check=True, capture_output=True) # noqa: S603 - logger.info("Initialised git backup repository at %s", backup_path) - - # Ensure a local identity exists so that `git commit` always works. - for key, value in (("user.email", "discord-rss-bot@localhost"), ("user.name", "discord-rss-bot")): - result: subprocess.CompletedProcess[bytes] = subprocess.run( # noqa: S603 - [GIT_EXECUTABLE, "-C", str(backup_path), "config", "--local", key], - check=False, - capture_output=True, - ) - if result.returncode != 0: - subprocess.run( # noqa: S603 - [GIT_EXECUTABLE, "-C", str(backup_path), "config", "--local", key, value], - check=True, - capture_output=True, - ) - - # Configure the remote if GIT_BACKUP_REMOTE is set. - remote_url: str = get_backup_remote() - if remote_url: - # Check if remote "origin" already exists. - check_remote: subprocess.CompletedProcess[bytes] = subprocess.run( # noqa: S603 - [GIT_EXECUTABLE, "-C", str(backup_path), "remote", "get-url", "origin"], - check=False, - capture_output=True, - ) - if check_remote.returncode != 0: - # Remote doesn't exist, add it. - subprocess.run( # noqa: S603 - [GIT_EXECUTABLE, "-C", str(backup_path), "remote", "add", "origin", remote_url], - check=True, - capture_output=True, - ) - logger.info("Added remote 'origin' with URL: %s", remote_url) - else: - # Remote exists, update it if the URL has changed. - current_url: str = check_remote.stdout.decode().strip() - if current_url != remote_url: - subprocess.run( # noqa: S603 - [GIT_EXECUTABLE, "-C", str(backup_path), "remote", "set-url", "origin", remote_url], - check=True, - capture_output=True, - ) - logger.info("Updated remote 'origin' URL from %s to %s", current_url, remote_url) - except Exception: - logger.exception("Failed to set up git backup repository at %s", backup_path) - return False - return True - - -def export_state(reader: Reader, backup_path: Path) -> None: - """Serialise the current bot state to ``state.json`` inside *backup_path*. - - Args: - reader: The :class:`reader.Reader` instance to read state from. - backup_path: Destination directory for the exported ``state.json``. - """ - feeds_state: list[dict] = [] - for feed in reader.get_feeds(): - feed_data: dict = {"url": feed.url} - for tag in _FEED_TAGS: - try: - value: TAG_VALUE = reader.get_tag(feed, tag, None) - if value is not None and value != "": # noqa: PLC1901 - feed_data[tag] = value - except Exception: - logger.exception("Failed to read tag '%s' for feed '%s' during state export", tag, feed.url) - feeds_state.append(feed_data) - - webhooks: list[str | int | float | bool | dict[str, Any] | list[Any] | None] = list( - reader.get_tag((), "webhooks", []), - ) - - # Export global update interval if set - global_update_interval: dict[str, Any] | None = None - global_update_config = reader.get_tag((), ".reader.update", None) - if isinstance(global_update_config, dict): - global_update_interval = global_update_config - - state: dict = {"feeds": feeds_state, "webhooks": webhooks} - if global_update_interval is not None: - state["global_update_interval"] = global_update_interval - state_file: Path = backup_path / "state.json" - state_file.write_text(json.dumps(state, indent=2, default=str), encoding="utf-8") - - -def commit_state_change(reader: Reader, message: str) -> None: - """Export current state and commit it to the backup repository. - - This is a no-op when ``GIT_BACKUP_PATH`` is not configured. Errors are - logged but never raised so that a backup failure never interrupts normal - bot operation. - - Args: - reader: The :class:`reader.Reader` instance to read state from. - message: Commit message describing the change (e.g. ``"Add feed example.com/rss.xml"``). - """ - backup_path: Path | None = get_backup_path() - if backup_path is None: - return - - if not setup_backup_repo(backup_path): - return - - try: - export_state(reader, backup_path) - - subprocess.run([GIT_EXECUTABLE, "-C", str(backup_path), "add", "-A"], check=True, capture_output=True) # noqa: S603 - - # Only create a commit if there are staged changes. - diff_result: subprocess.CompletedProcess[bytes] = subprocess.run( # noqa: S603 - [GIT_EXECUTABLE, "-C", str(backup_path), "diff", "--cached", "--exit-code"], - check=False, - capture_output=True, - ) - if diff_result.returncode == 0: - logger.debug("No state changes to commit for: %s", message) - return - - subprocess.run( # noqa: S603 - [GIT_EXECUTABLE, "-C", str(backup_path), "commit", "-m", message], - check=True, - capture_output=True, - ) - logger.info("Committed state change to backup repo: %s", message) - - # Push to remote if configured. - if get_backup_remote(): - subprocess.run( # noqa: S603 - [GIT_EXECUTABLE, "-C", str(backup_path), "push", "origin", "HEAD"], - check=True, - capture_output=True, - ) - logger.info("Pushed state change to remote 'origin': %s", message) - except Exception: - logger.exception("Failed to commit state change '%s' to backup repo", message) diff --git a/discord_rss_bot/hoyolab_api.py b/discord_rss_bot/hoyolab_api.py deleted file mode 100644 index 227a413..0000000 --- a/discord_rss_bot/hoyolab_api.py +++ /dev/null @@ -1,195 +0,0 @@ -from __future__ import annotations - -import contextlib -import json -import logging -import re -from typing import TYPE_CHECKING -from typing import Any - -import requests -from discord_webhook import DiscordEmbed -from discord_webhook import DiscordWebhook - -if TYPE_CHECKING: - from reader import Entry - - -logger: logging.Logger = logging.getLogger(__name__) - - -def is_c3kay_feed(feed_url: str) -> bool: - """Check if the feed is from c3kay.de. - - Args: - feed_url: The feed URL to check. - - Returns: - bool: True if the feed is from c3kay.de, False otherwise. - """ - return "feeds.c3kay.de" in feed_url - - -def extract_post_id_from_hoyolab_url(url: str) -> str | None: - """Extract the post ID from a Hoyolab URL. - - Args: - url: The Hoyolab URL to extract the post ID from. - For example: https://www.hoyolab.com/article/38588239 - - Returns: - str | None: The post ID if found, None otherwise. - """ - try: - match: re.Match[str] | None = re.search(r"/article/(\d+)", url) - if match: - return match.group(1) - except (ValueError, AttributeError, TypeError) as e: - logger.warning("Error extracting post ID from Hoyolab URL %s: %s", url, e) - - return None - - -def fetch_hoyolab_post(post_id: str) -> dict[str, Any] | None: - """Fetch post data from the Hoyolab API. - - Args: - post_id: The post ID to fetch. - - Returns: - dict[str, Any] | None: The post data if successful, None otherwise. - """ - if not post_id: - return None - - http_ok = 200 - try: - url: str = f"https://bbs-api-os.hoyolab.com/community/post/wapi/getPostFull?post_id={post_id}" - response: requests.Response = requests.get(url, timeout=10) - - if response.status_code == http_ok: - data: dict[str, Any] = response.json() - if data.get("retcode") == 0 and "data" in data and "post" in data["data"]: - return data["data"]["post"] - - logger.warning("Failed to fetch Hoyolab post %s: %s", post_id, response.text) - except (requests.RequestException, ValueError): - logger.exception("Error fetching Hoyolab post %s", post_id) - - return None - - -def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: dict[str, Any]) -> DiscordWebhook: # noqa: C901, PLR0912, PLR0914, PLR0915 - """Create a webhook with data from the Hoyolab API. - - Args: - webhook_url: The webhook URL. - entry: The entry to send to Discord. - post_data: The post data from the Hoyolab API. - - Returns: - DiscordWebhook: The webhook with the embed. - """ - entry_link: str = entry.link or entry.feed.url - webhook = DiscordWebhook(url=webhook_url, rate_limit_retry=True) - - # Extract relevant data from the post - post: dict[str, Any] = post_data.get("post", {}) - subject: str = post.get("subject", "") - content: str = post.get("content", "{}") - - logger.debug("Post subject: %s", subject) - logger.debug("Post content: %s", content) - - content_data: dict[str, str] = {} - with contextlib.suppress(json.JSONDecodeError, ValueError): - content_data = json.loads(content) - - logger.debug("Content data: %s", content_data) - - description: str = content_data.get("describe", "") - if not description: - description = post.get("desc", "") - - # Create the embed - discord_embed = DiscordEmbed() - - # Set title and description - discord_embed.set_title(subject) - discord_embed.set_url(entry_link) - - # Get post.image_list - image_list: list[dict[str, Any]] = post_data.get("image_list", []) - if image_list: - image_url: str = str(image_list[0].get("url", "")) - image_height: int = int(image_list[0].get("height", 1080)) - image_width: int = int(image_list[0].get("width", 1920)) - - logger.debug("Image URL: %s, Height: %s, Width: %s", image_url, image_height, image_width) - discord_embed.set_image(url=image_url, height=image_height, width=image_width) - - video: dict[str, str | int | bool] = post_data.get("video", {}) - if video and video.get("url"): - video_url: str = str(video.get("url", "")) - logger.debug("Video URL: %s", video_url) - with contextlib.suppress(requests.RequestException): - video_response: requests.Response = requests.get(video_url, stream=True, timeout=10) - if video_response.ok: - webhook.add_file( - file=video_response.content, - filename=f"{entry.id}.mp4", - ) - - game = post_data.get("game", {}) - - if game and game.get("color"): - game_color = str(game.get("color", "")) - discord_embed.set_color(game_color.removeprefix("#")) - - user: dict[str, str | int | bool] = post_data.get("user", {}) - author_name: str = str(user.get("nickname", "")) - avatar_url: str = str(user.get("avatar_url", "")) - if author_name: - webhook.avatar_url = avatar_url - webhook.username = author_name - - classification = post_data.get("classification", {}) - - if classification and classification.get("name"): - footer = str(classification.get("name", "")) - discord_embed.set_footer(text=footer) - - webhook.add_embed(discord_embed) - - # Only show Youtube URL if available - structured_content: str = post.get("structured_content", "") - if structured_content: # noqa: PLR1702 - try: - structured_content_data: list[dict[str, Any]] = json.loads(structured_content) - for item in structured_content_data: - if item.get("insert") and isinstance(item["insert"], dict): - video_url: str = str(item["insert"].get("video", "")) - if video_url: - video_id_match: re.Match[str] | None = re.search(r"embed/([a-zA-Z0-9_-]+)", video_url) - if video_id_match: - video_id: str = video_id_match.group(1) - logger.debug("Video ID: %s", video_id) - webhook.content = f"https://www.youtube.com/watch?v={video_id}" - webhook.remove_embeds() - - except (json.JSONDecodeError, ValueError) as e: - logger.warning("Error parsing structured content: %s", e) - - event_start_date: str = post.get("event_start_date", "") - if event_start_date and event_start_date != "0": - discord_embed.add_embed_field(name="Start", value=f"") - - event_end_date: str = post.get("event_end_date", "") - if event_end_date and event_end_date != "0": - discord_embed.add_embed_field(name="End", value=f"") - - created_at: str = post.get("created_at", "") - if created_at and created_at != "0": - discord_embed.set_timestamp(timestamp=created_at) - - return webhook diff --git a/discord_rss_bot/is_url_valid.py b/discord_rss_bot/is_url_valid.py index c986b4a..cca1491 100644 --- a/discord_rss_bot/is_url_valid.py +++ b/discord_rss_bot/is_url_valid.py @@ -1,7 +1,6 @@ from __future__ import annotations -from urllib.parse import ParseResult -from urllib.parse import urlparse +from urllib.parse import ParseResult, urlparse def is_url_valid(url: str) -> bool: diff --git a/discord_rss_bot/main.py b/discord_rss_bot/main.py index 1e5211b..3a1f0ca 100644 --- a/discord_rss_bot/main.py +++ b/discord_rss_bot/main.py @@ -7,65 +7,48 @@ import typing import urllib.parse from contextlib import asynccontextmanager from dataclasses import dataclass -from datetime import UTC -from datetime import datetime +from datetime import UTC, datetime from functools import lru_cache -from typing import TYPE_CHECKING -from typing import Annotated -from typing import Any -from typing import cast +from typing import TYPE_CHECKING, Annotated, cast import httpx import sentry_sdk import uvicorn from apscheduler.schedulers.asyncio import AsyncIOScheduler -from fastapi import Depends -from fastapi import FastAPI -from fastapi import Form -from fastapi import HTTPException -from fastapi import Request +from fastapi import FastAPI, Form, HTTPException, Request from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from httpx import Response from markdownify import markdownify -from reader import Entry -from reader import EntryNotFoundError -from reader import Feed -from reader import FeedExistsError -from reader import FeedNotFoundError -from reader import Reader -from reader import ReaderError -from reader import TagNotFoundError +from reader import Entry, EntryNotFoundError, Feed, FeedNotFoundError, Reader, TagNotFoundError from starlette.responses import RedirectResponse from discord_rss_bot import settings -from discord_rss_bot.custom_filters import entry_is_blacklisted -from discord_rss_bot.custom_filters import entry_is_whitelisted -from discord_rss_bot.custom_message import CustomEmbed -from discord_rss_bot.custom_message import get_custom_message -from discord_rss_bot.custom_message import get_embed -from discord_rss_bot.custom_message import get_first_image -from discord_rss_bot.custom_message import replace_tags_in_text_message -from discord_rss_bot.custom_message import save_embed -from discord_rss_bot.feeds import create_feed -from discord_rss_bot.feeds import extract_domain -from discord_rss_bot.feeds import send_entry_to_discord -from discord_rss_bot.feeds import send_to_discord -from discord_rss_bot.git_backup import commit_state_change -from discord_rss_bot.git_backup import get_backup_path -from discord_rss_bot.is_url_valid import is_url_valid -from discord_rss_bot.search import create_search_context +from discord_rss_bot.custom_filters import ( + entry_is_blacklisted, + entry_is_whitelisted, +) +from discord_rss_bot.custom_message import ( + CustomEmbed, + get_custom_message, + get_embed, + get_first_image, + replace_tags_in_text_message, + save_embed, +) +from discord_rss_bot.feeds import create_feed, send_entry_to_discord, send_to_discord +from discord_rss_bot.missing_tags import add_missing_tags +from discord_rss_bot.search import create_html_for_search_results from discord_rss_bot.settings import get_reader if TYPE_CHECKING: - from collections.abc import AsyncGenerator from collections.abc import Iterable from reader.types import JSONType -LOGGING_CONFIG: dict[str, Any] = { +LOGGING_CONFIG = { "version": 1, "disable_existing_loggers": True, "formatters": { @@ -101,71 +84,18 @@ LOGGING_CONFIG: dict[str, Any] = { logging.config.dictConfig(LOGGING_CONFIG) logger: logging.Logger = logging.getLogger(__name__) - - -def get_reader_dependency() -> Reader: - """Provide the app Reader instance as a FastAPI dependency. - - Returns: - Reader: The shared Reader instance. - """ - return get_reader() - - -# Time constants for relative time formatting -SECONDS_PER_MINUTE = 60 -SECONDS_PER_HOUR = 3600 -SECONDS_PER_DAY = 86400 - - -def relative_time(dt: datetime | None) -> str: - """Convert a datetime to a relative time string (e.g., '2 hours ago', 'in 5 minutes'). - - Args: - dt: The datetime to convert (should be timezone-aware). - - Returns: - A human-readable relative time string. - """ - if dt is None: - return "Never" - - now = datetime.now(tz=UTC) - diff = dt - now - seconds = int(abs(diff.total_seconds())) - is_future = diff.total_seconds() > 0 - - # Determine the appropriate unit and value - if seconds < SECONDS_PER_MINUTE: - value = seconds - unit = "s" - elif seconds < SECONDS_PER_HOUR: - value = seconds // SECONDS_PER_MINUTE - unit = "m" - elif seconds < SECONDS_PER_DAY: - value = seconds // SECONDS_PER_HOUR - unit = "h" - else: - value = seconds // SECONDS_PER_DAY - unit = "d" - - # Format based on future or past - return f"in {value}{unit}" if is_future else f"{value}{unit} ago" +reader: Reader = get_reader() @asynccontextmanager -async def lifespan(app: FastAPI) -> AsyncGenerator[None]: - """Lifespan function for the FastAPI app.""" - reader: Reader = get_reader() - scheduler: AsyncIOScheduler = AsyncIOScheduler(timezone=UTC) - scheduler.add_job( - func=send_to_discord, - trigger="interval", - minutes=1, - id="send_to_discord", - max_instances=1, - next_run_time=datetime.now(tz=UTC), - ) +async def lifespan(app: FastAPI) -> typing.AsyncGenerator[None]: + """This is needed for the ASGI server to run.""" + add_missing_tags(reader) + scheduler: AsyncIOScheduler = AsyncIOScheduler() + + # Update all feeds every 15 minutes. + # TODO(TheLovinator): Make this configurable. + scheduler.add_job(send_to_discord, "interval", minutes=15, next_run_time=datetime.now(tz=UTC)) scheduler.start() logger.info("Scheduler started.") yield @@ -180,29 +110,27 @@ templates: Jinja2Templates = Jinja2Templates(directory="discord_rss_bot/template # Add the filters to the Jinja2 environment so they can be used in html templates. templates.env.filters["encode_url"] = lambda url: urllib.parse.quote(url) if url else "" +templates.env.filters["entry_is_whitelisted"] = entry_is_whitelisted +templates.env.filters["entry_is_blacklisted"] = entry_is_blacklisted templates.env.filters["discord_markdown"] = markdownify -templates.env.filters["relative_time"] = relative_time -templates.env.globals["get_backup_path"] = get_backup_path @app.post("/add_webhook") async def post_add_webhook( webhook_name: Annotated[str, Form()], webhook_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], ) -> RedirectResponse: """Add a feed to the database. Args: webhook_name: The name of the webhook. webhook_url: The url of the webhook. - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to the index page. Raises: HTTPException: If the webhook already exists. + + Returns: + RedirectResponse: Redirect to the index page. """ # Get current webhooks from the database if they exist otherwise use an empty list. webhooks = list(reader.get_tag((), "webhooks", [])) @@ -219,8 +147,6 @@ async def post_add_webhook( reader.set_tag((), "webhooks", webhooks) # pyright: ignore[reportArgumentType] - commit_state_change(reader, f"Add webhook {webhook_name.strip()}") - return RedirectResponse(url="/", status_code=303) # TODO(TheLovinator): Show this error on the page. @@ -229,22 +155,17 @@ async def post_add_webhook( @app.post("/delete_webhook") -async def post_delete_webhook( - webhook_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: +async def post_delete_webhook(webhook_url: Annotated[str, Form()]) -> RedirectResponse: """Delete a webhook from the database. Args: webhook_url: The url of the webhook. - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to the index page. Raises: HTTPException: If the webhook could not be deleted + Returns: + RedirectResponse: Redirect to the index page. """ # TODO(TheLovinator): Check if the webhook is in use by any feeds before deleting it. # TODO(TheLovinator): Replace HTTPException with a custom exception for both of these. @@ -271,8 +192,6 @@ async def post_delete_webhook( # Add our new list of webhooks to the database. reader.set_tag((), "webhooks", webhooks) # pyright: ignore[reportArgumentType] - commit_state_change(reader, f"Delete webhook {webhook_url.strip()}") - return RedirectResponse(url="/", status_code=303) @@ -280,34 +199,27 @@ async def post_delete_webhook( async def post_create_feed( feed_url: Annotated[str, Form()], webhook_dropdown: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], ) -> RedirectResponse: """Add a feed to the database. Args: feed_url: The feed to add. webhook_dropdown: The webhook to use. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. """ clean_feed_url: str = feed_url.strip() create_feed(reader, feed_url, webhook_dropdown) - commit_state_change(reader, f"Add feed {clean_feed_url}") return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.post("/pause") -async def post_pause_feed( - feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: +async def post_pause_feed(feed_url: Annotated[str, Form()]) -> RedirectResponse: """Pause a feed. Args: feed_url: The feed to pause. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -318,15 +230,11 @@ async def post_pause_feed( @app.post("/unpause") -async def post_unpause_feed( - feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: +async def post_unpause_feed(feed_url: Annotated[str, Form()]) -> RedirectResponse: """Unpause a feed. Args: feed_url: The Feed to unpause. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -338,15 +246,10 @@ async def post_unpause_feed( @app.post("/whitelist") async def post_set_whitelist( - reader: Annotated[Reader, Depends(get_reader_dependency)], whitelist_title: Annotated[str, Form()] = "", whitelist_summary: Annotated[str, Form()] = "", whitelist_content: Annotated[str, Form()] = "", whitelist_author: Annotated[str, Form()] = "", - regex_whitelist_title: Annotated[str, Form()] = "", - regex_whitelist_summary: Annotated[str, Form()] = "", - regex_whitelist_content: Annotated[str, Form()] = "", - regex_whitelist_author: Annotated[str, Form()] = "", feed_url: Annotated[str, Form()] = "", ) -> RedirectResponse: """Set what the whitelist should be sent, if you have this set only words in the whitelist will be sent. @@ -356,12 +259,7 @@ async def post_set_whitelist( whitelist_summary: Whitelisted words for when checking the summary. whitelist_content: Whitelisted words for when checking the content. whitelist_author: Whitelisted words for when checking the author. - regex_whitelist_title: Whitelisted regex for when checking the title. - regex_whitelist_summary: Whitelisted regex for when checking the summary. - regex_whitelist_content: Whitelisted regex for when checking the content. - regex_whitelist_author: Whitelisted regex for when checking the author. feed_url: The feed we should set the whitelist for. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -371,28 +269,17 @@ async def post_set_whitelist( reader.set_tag(clean_feed_url, "whitelist_summary", whitelist_summary) # pyright: ignore[reportArgumentType][call-overload] reader.set_tag(clean_feed_url, "whitelist_content", whitelist_content) # pyright: ignore[reportArgumentType][call-overload] reader.set_tag(clean_feed_url, "whitelist_author", whitelist_author) # pyright: ignore[reportArgumentType][call-overload] - reader.set_tag(clean_feed_url, "regex_whitelist_title", regex_whitelist_title) # pyright: ignore[reportArgumentType][call-overload] - reader.set_tag(clean_feed_url, "regex_whitelist_summary", regex_whitelist_summary) # pyright: ignore[reportArgumentType][call-overload] - reader.set_tag(clean_feed_url, "regex_whitelist_content", regex_whitelist_content) # pyright: ignore[reportArgumentType][call-overload] - reader.set_tag(clean_feed_url, "regex_whitelist_author", regex_whitelist_author) # pyright: ignore[reportArgumentType][call-overload] - - commit_state_change(reader, f"Update whitelist for {clean_feed_url}") return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.get("/whitelist", response_class=HTMLResponse) -async def get_whitelist( - feed_url: str, - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def get_whitelist(feed_url: str, request: Request): """Get the whitelist. Args: feed_url: What feed we should get the whitelist for. request: The request object. - reader: The Reader instance. Returns: HTMLResponse: The whitelist page. @@ -400,14 +287,11 @@ async def get_whitelist( clean_feed_url: str = feed_url.strip() feed: Feed = reader.get_feed(urllib.parse.unquote(clean_feed_url)) + # Get previous data, this is used when creating the form. whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", "")) whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", "")) whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", "")) whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", "")) - regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", "")) - regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", "")) - regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", "")) - regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", "")) context = { "request": request, @@ -416,25 +300,16 @@ async def get_whitelist( "whitelist_summary": whitelist_summary, "whitelist_content": whitelist_content, "whitelist_author": whitelist_author, - "regex_whitelist_title": regex_whitelist_title, - "regex_whitelist_summary": regex_whitelist_summary, - "regex_whitelist_content": regex_whitelist_content, - "regex_whitelist_author": regex_whitelist_author, } return templates.TemplateResponse(request=request, name="whitelist.html", context=context) @app.post("/blacklist") async def post_set_blacklist( - reader: Annotated[Reader, Depends(get_reader_dependency)], blacklist_title: Annotated[str, Form()] = "", blacklist_summary: Annotated[str, Form()] = "", blacklist_content: Annotated[str, Form()] = "", blacklist_author: Annotated[str, Form()] = "", - regex_blacklist_title: Annotated[str, Form()] = "", - regex_blacklist_summary: Annotated[str, Form()] = "", - regex_blacklist_content: Annotated[str, Form()] = "", - regex_blacklist_author: Annotated[str, Form()] = "", feed_url: Annotated[str, Form()] = "", ) -> RedirectResponse: """Set the blacklist. @@ -447,12 +322,7 @@ async def post_set_blacklist( blacklist_summary: Blacklisted words for when checking the summary. blacklist_content: Blacklisted words for when checking the content. blacklist_author: Blacklisted words for when checking the author. - regex_blacklist_title: Blacklisted regex for when checking the title. - regex_blacklist_summary: Blacklisted regex for when checking the summary. - regex_blacklist_content: Blacklisted regex for when checking the content. - regex_blacklist_author: Blacklisted regex for when checking the author. feed_url: What feed we should set the blacklist for. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -462,40 +332,28 @@ async def post_set_blacklist( reader.set_tag(clean_feed_url, "blacklist_summary", blacklist_summary) # pyright: ignore[reportArgumentType][call-overload] reader.set_tag(clean_feed_url, "blacklist_content", blacklist_content) # pyright: ignore[reportArgumentType][call-overload] reader.set_tag(clean_feed_url, "blacklist_author", blacklist_author) # pyright: ignore[reportArgumentType][call-overload] - reader.set_tag(clean_feed_url, "regex_blacklist_title", regex_blacklist_title) # pyright: ignore[reportArgumentType][call-overload] - reader.set_tag(clean_feed_url, "regex_blacklist_summary", regex_blacklist_summary) # pyright: ignore[reportArgumentType][call-overload] - reader.set_tag(clean_feed_url, "regex_blacklist_content", regex_blacklist_content) # pyright: ignore[reportArgumentType][call-overload] - reader.set_tag(clean_feed_url, "regex_blacklist_author", regex_blacklist_author) # pyright: ignore[reportArgumentType][call-overload] - commit_state_change(reader, f"Update blacklist for {clean_feed_url}") + return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.get("/blacklist", response_class=HTMLResponse) -async def get_blacklist( - feed_url: str, - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def get_blacklist(feed_url: str, request: Request): """Get the blacklist. Args: feed_url: What feed we should get the blacklist for. request: The request object. - reader: The Reader instance. Returns: HTMLResponse: The blacklist page. """ feed: Feed = reader.get_feed(urllib.parse.unquote(feed_url)) + # Get previous data, this is used when creating the form. blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", "")) blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", "")) blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", "")) blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", "")) - regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", "")) - regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", "")) - regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", "")) - regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", "")) context = { "request": request, @@ -504,10 +362,6 @@ async def get_blacklist( "blacklist_summary": blacklist_summary, "blacklist_content": blacklist_content, "blacklist_author": blacklist_author, - "regex_blacklist_title": regex_blacklist_title, - "regex_blacklist_summary": regex_blacklist_summary, - "regex_blacklist_content": regex_blacklist_content, - "regex_blacklist_author": regex_blacklist_author, } return templates.TemplateResponse(request=request, name="blacklist.html", context=context) @@ -515,7 +369,6 @@ async def get_blacklist( @app.post("/custom") async def post_set_custom( feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], custom_message: Annotated[str, Form()] = "", ) -> RedirectResponse: """Set the custom message, this is used when sending the message. @@ -523,7 +376,6 @@ async def post_set_custom( Args: custom_message: The custom message. feed_url: The feed we should set the custom message for. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -540,22 +392,16 @@ async def post_set_custom( reader.set_tag(feed_url, "custom_message", default_custom_message) clean_feed_url: str = feed_url.strip() - commit_state_change(reader, f"Update custom message for {clean_feed_url}") return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.get("/custom", response_class=HTMLResponse) -async def get_custom( - feed_url: str, - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def get_custom(feed_url: str, request: Request): """Get the custom message. This is used when sending the message to Discord. Args: feed_url: What feed we should get the custom message for. request: The request object. - reader: The Reader instance. Returns: HTMLResponse: The custom message page. @@ -576,17 +422,12 @@ async def get_custom( @app.get("/embed", response_class=HTMLResponse) -async def get_embed_page( - feed_url: str, - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def get_embed_page(feed_url: str, request: Request): """Get the custom message. This is used when sending the message to Discord. Args: feed_url: What feed we should get the custom message for. request: The request object. - reader: The Reader instance. Returns: HTMLResponse: The embed page. @@ -620,9 +461,8 @@ async def get_embed_page( @app.post("/embed", response_class=HTMLResponse) -async def post_embed( # noqa: C901 +async def post_embed( # noqa: PLR0913, PLR0917 feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], title: Annotated[str, Form()] = "", description: Annotated[str, Form()] = "", color: Annotated[str, Form()] = "", @@ -648,7 +488,7 @@ async def post_embed( # noqa: C901 author_icon_url: The author icon url of the embed. footer_text: The footer text of the embed. footer_icon_url: The footer icon url of the embed. - reader: The Reader instance. + Returns: RedirectResponse: Redirect to the embed page. @@ -657,245 +497,59 @@ async def post_embed( # noqa: C901 feed: Feed = reader.get_feed(urllib.parse.unquote(clean_feed_url)) custom_embed: CustomEmbed = get_embed(reader, feed) - # Only overwrite fields that the user provided. This prevents accidental - # clearing of previously saved embed data when the form submits empty - # values for fields the user did not change. - if title: - custom_embed.title = title - if description: - custom_embed.description = description - if color: - custom_embed.color = color - if image_url: - custom_embed.image_url = image_url - if thumbnail_url: - custom_embed.thumbnail_url = thumbnail_url - if author_name: - custom_embed.author_name = author_name - if author_url: - custom_embed.author_url = author_url - if author_icon_url: - custom_embed.author_icon_url = author_icon_url - if footer_text: - custom_embed.footer_text = footer_text - if footer_icon_url: - custom_embed.footer_icon_url = footer_icon_url + custom_embed.title = title + custom_embed.description = description + custom_embed.color = color + custom_embed.image_url = image_url + custom_embed.thumbnail_url = thumbnail_url + custom_embed.author_name = author_name + custom_embed.author_url = author_url + custom_embed.author_icon_url = author_icon_url + custom_embed.footer_text = footer_text + custom_embed.footer_icon_url = footer_icon_url # Save the data. save_embed(reader, feed, custom_embed) - commit_state_change(reader, f"Update embed settings for {clean_feed_url}") return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.post("/use_embed") -async def post_use_embed( - feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: +async def post_use_embed(feed_url: Annotated[str, Form()]) -> RedirectResponse: """Use embed instead of text. Args: feed_url: The feed to change. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. """ clean_feed_url: str = feed_url.strip() reader.set_tag(clean_feed_url, "should_send_embed", True) # pyright: ignore[reportArgumentType] - commit_state_change(reader, f"Enable embed mode for {clean_feed_url}") return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.post("/use_text") -async def post_use_text( - feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: +async def post_use_text(feed_url: Annotated[str, Form()]) -> RedirectResponse: """Use text instead of embed. Args: feed_url: The feed to change. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. """ clean_feed_url: str = feed_url.strip() reader.set_tag(clean_feed_url, "should_send_embed", False) # pyright: ignore[reportArgumentType] - commit_state_change(reader, f"Disable embed mode for {clean_feed_url}") return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) -@app.post("/set_update_interval") -async def post_set_update_interval( - feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], - interval_minutes: Annotated[int | None, Form()] = None, - redirect_to: Annotated[str, Form()] = "", -) -> RedirectResponse: - """Set the update interval for a feed. - - Args: - feed_url: The feed to change. - interval_minutes: The update interval in minutes (None to reset to global default). - redirect_to: Optional redirect URL (defaults to feed page). - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to the specified page or feed page. - """ - clean_feed_url: str = feed_url.strip() - - # If no interval specified, reset to global default - if interval_minutes is None: - try: - reader.delete_tag(clean_feed_url, ".reader.update") - commit_state_change(reader, f"Reset update interval to default for {clean_feed_url}") - except TagNotFoundError: - pass - else: - # Validate interval (minimum 1 minute, no maximum) - interval_minutes = max(interval_minutes, 1) - reader.set_tag(clean_feed_url, ".reader.update", {"interval": interval_minutes}) # pyright: ignore[reportArgumentType] - commit_state_change(reader, f"Set update interval to {interval_minutes} minutes for {clean_feed_url}") - - # Update the feed immediately to recalculate update_after with the new interval - try: - reader.update_feed(clean_feed_url) - logger.info("Updated feed after interval change: %s", clean_feed_url) - except Exception: - logger.exception("Failed to update feed after interval change: %s", clean_feed_url) - - if redirect_to: - return RedirectResponse(url=redirect_to, status_code=303) - return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) - - -@app.post("/change_feed_url") -async def post_change_feed_url( - old_feed_url: Annotated[str, Form()], - new_feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: - """Change the URL for an existing feed. - - Args: - old_feed_url: Current feed URL. - new_feed_url: New feed URL to change to. - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to the feed page for the resulting URL. - - Raises: - HTTPException: If the old feed is not found, the new URL already exists, or change fails. - """ - clean_old_feed_url: str = old_feed_url.strip() - clean_new_feed_url: str = new_feed_url.strip() - - if not clean_old_feed_url or not clean_new_feed_url: - raise HTTPException(status_code=400, detail="Feed URLs cannot be empty") - - if clean_old_feed_url == clean_new_feed_url: - return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_old_feed_url)}", status_code=303) - - try: - reader.change_feed_url(clean_old_feed_url, clean_new_feed_url) - except FeedNotFoundError as e: - raise HTTPException(status_code=404, detail=f"Feed not found: {clean_old_feed_url}") from e - except FeedExistsError as e: - raise HTTPException(status_code=409, detail=f"Feed already exists: {clean_new_feed_url}") from e - except ReaderError as e: - raise HTTPException(status_code=400, detail=f"Failed to change feed URL: {e}") from e - - # Update the feed with the new URL so we can discover what entries it returns. - # Then mark all unread entries as read so the scheduler doesn't resend them. - try: - reader.update_feed(clean_new_feed_url) - except Exception: - logger.exception("Failed to update feed after URL change: %s", clean_new_feed_url) - - for entry in reader.get_entries(feed=clean_new_feed_url, read=False): - try: - reader.set_entry_read(entry, True) - except Exception: - logger.exception("Failed to mark entry as read after URL change: %s", entry.id) - - commit_state_change(reader, f"Change feed URL from {clean_old_feed_url} to {clean_new_feed_url}") - return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_new_feed_url)}", status_code=303) - - -@app.post("/reset_update_interval") -async def post_reset_update_interval( - feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], - redirect_to: Annotated[str, Form()] = "", -) -> RedirectResponse: - """Reset the update interval for a feed to use the global default. - - Args: - feed_url: The feed to change. - redirect_to: Optional redirect URL (defaults to feed page). - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to the specified page or feed page. - """ - clean_feed_url: str = feed_url.strip() - - try: - reader.delete_tag(clean_feed_url, ".reader.update") - commit_state_change(reader, f"Reset update interval to default for {clean_feed_url}") - except TagNotFoundError: - # Tag doesn't exist, which is fine - pass - - # Update the feed immediately to recalculate update_after with the new interval - try: - reader.update_feed(clean_feed_url) - logger.info("Updated feed after interval reset: %s", clean_feed_url) - except Exception: - logger.exception("Failed to update feed after interval reset: %s", clean_feed_url) - - if redirect_to: - return RedirectResponse(url=redirect_to, status_code=303) - return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) - - -@app.post("/set_global_update_interval") -async def post_set_global_update_interval( - interval_minutes: Annotated[int, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: - """Set the global default update interval. - - Args: - interval_minutes: The update interval in minutes. - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to the settings page. - """ - # Validate interval (minimum 1 minute, no maximum) - interval_minutes = max(interval_minutes, 1) - - reader.set_tag((), ".reader.update", {"interval": interval_minutes}) # pyright: ignore[reportArgumentType] - commit_state_change(reader, f"Set global update interval to {interval_minutes} minutes") - return RedirectResponse(url="/settings", status_code=303) - - @app.get("/add", response_class=HTMLResponse) -def get_add( - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +def get_add(request: Request): """Page for adding a new feed. Args: request: The request object. - reader: The Reader instance. Returns: HTMLResponse: The add feed page. @@ -908,25 +562,19 @@ def get_add( @app.get("/feed", response_class=HTMLResponse) -async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 - feed_url: str, - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], - starting_after: str = "", -): +async def get_feed(feed_url: str, request: Request, starting_after: str = ""): """Get a feed by URL. Args: feed_url: The feed to add. request: The request object. starting_after: The entry to start after. Used for pagination. - reader: The Reader instance. - - Returns: - HTMLResponse: The feed page. Raises: HTTPException: If the feed is not found. + + Returns: + HTMLResponse: The feed page. """ entries_per_page: int = 20 @@ -939,7 +587,7 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 # Only show button if more than 10 entries. total_entries: int = reader.get_entry_counts(feed=feed).total or 0 - is_show_more_entries_button_visible: bool = total_entries > entries_per_page + show_more_entires_button: bool = total_entries > entries_per_page # Get entries from the feed. if starting_after: @@ -950,22 +598,7 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 except EntryNotFoundError as e: current_entries = list(reader.get_entries(feed=clean_feed_url)) msg: str = f"{e}\n\n{[entry.id for entry in current_entries]}" - html: str = create_html_for_feed(reader=reader, entries=current_entries, current_feed_url=clean_feed_url) - - # Get feed and global intervals for error case too - feed_interval: int | None = None - feed_update_config = reader.get_tag(feed, ".reader.update", None) - if isinstance(feed_update_config, dict) and "interval" in feed_update_config: - interval_value = feed_update_config["interval"] - if isinstance(interval_value, int): - feed_interval = interval_value - - global_interval: int = 60 - global_update_config = reader.get_tag((), ".reader.update", None) - if isinstance(global_update_config, dict) and "interval" in global_update_config: - interval_value = global_update_config["interval"] - if isinstance(interval_value, int): - global_interval = interval_value + html: str = create_html_for_feed(current_entries) context = { "request": request, @@ -976,10 +609,8 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 "should_send_embed": False, "last_entry": None, "messages": msg, - "is_show_more_entries_button_visible": is_show_more_entries_button_visible, + "show_more_entires_button": show_more_entires_button, "total_entries": total_entries, - "feed_interval": feed_interval, - "global_interval": global_interval, } return templates.TemplateResponse(request=request, name="feed.html", context=context) @@ -1000,25 +631,13 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 last_entry = entries[-1] # Create the html for the entries. - html: str = create_html_for_feed(reader=reader, entries=entries, current_feed_url=clean_feed_url) + html: str = create_html_for_feed(entries) - should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed", True)) - - # Get the update interval for this feed - feed_interval: int | None = None - feed_update_config = reader.get_tag(feed, ".reader.update", None) - if isinstance(feed_update_config, dict) and "interval" in feed_update_config: - interval_value = feed_update_config["interval"] - if isinstance(interval_value, int): - feed_interval = interval_value - - # Get the global default update interval - global_interval: int = 60 # Default to 60 minutes if not set - global_update_config = reader.get_tag((), ".reader.update", None) - if isinstance(global_update_config, dict) and "interval" in global_update_config: - interval_value = global_update_config["interval"] - if isinstance(interval_value, int): - global_interval = interval_value + try: + should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed")) + except TagNotFoundError: + add_missing_tags(reader) + should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed")) context = { "request": request, @@ -1028,25 +647,17 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 "html": html, "should_send_embed": should_send_embed, "last_entry": last_entry, - "is_show_more_entries_button_visible": is_show_more_entries_button_visible, + "show_more_entires_button": show_more_entires_button, "total_entries": total_entries, - "feed_interval": feed_interval, - "global_interval": global_interval, } return templates.TemplateResponse(request=request, name="feed.html", context=context) -def create_html_for_feed( # noqa: C901, PLR0914 - reader: Reader, - entries: Iterable[Entry], - current_feed_url: str = "", -) -> str: +def create_html_for_feed(entries: Iterable[Entry]) -> str: """Create HTML for the search results. Args: - reader: The Reader instance to use. entries: The entries to create HTML for. - current_feed_url: The feed URL currently being viewed in /feed. Returns: str: The HTML for the search results. @@ -1062,75 +673,31 @@ def create_html_for_feed( # noqa: C901, PLR0914 first_image = get_first_image(summary, content) - text: str = replace_tags_in_text_message(entry, reader=reader) or ( - "
No content available.
" - ) + text: str = replace_tags_in_text_message(entry) or "
No content available.
" published = "" if entry.published: published: str = entry.published.strftime("%Y-%m-%d %H:%M:%S") blacklisted: str = "" - if entry_is_blacklisted(entry, reader=reader): + if entry_is_blacklisted(entry): blacklisted = "Blacklisted" whitelisted: str = "" - if entry_is_whitelisted(entry, reader=reader): + if entry_is_whitelisted(entry): whitelisted = "Whitelisted" - source_feed_url: str = getattr(entry, "original_feed_url", None) or entry.feed.url - - from_another_feed: str = "" - if current_feed_url and source_feed_url != current_feed_url: - from_another_feed = f"From another feed: {source_feed_url}" - - # Add feed link when viewing from webhook_entries or aggregated views - feed_link: str = "" - if not current_feed_url or source_feed_url != current_feed_url: - encoded_feed_url: str = urllib.parse.quote(source_feed_url) - feed_title: str = entry.feed.title if hasattr(entry.feed, "title") and entry.feed.title else source_feed_url - feed_link = ( - f"{feed_title}
" - ) - entry_id: str = urllib.parse.quote(entry.id) - encoded_source_feed_url: str = urllib.parse.quote(source_feed_url) - to_discord_html: str = ( - f"" - "Send to Discord" - ) - - # Check if this is a YouTube feed entry and the entry has a link - is_youtube_feed = "youtube.com/feeds/videos.xml" in entry.feed.url - video_embed_html = "" - - if is_youtube_feed and entry.link: - # Extract the video ID and create an embed if possible - video_id: str | None = extract_youtube_video_id(entry.link) - if video_id: - video_embed_html: str = f""" -
- -
- """ - # Don't use the first image if we have a video embed - first_image = "" - + to_discord_html: str = f"Send to Discord" image_html: str = f"" if first_image else "" html += f"""
-{blacklisted}{whitelisted}{from_another_feed}

{entry.title}

-{feed_link}{f"By {entry.author} @" if entry.author else ""}{published} - {to_discord_html} +{blacklisted}{whitelisted}

{entry.title}

+{f"By {entry.author} @" if entry.author else ""}{published} - {to_discord_html} {text} -{video_embed_html} {image_html}
-""" # noqa: E501 +""" return html.strip() @@ -1169,7 +736,6 @@ def get_data_from_hook_url(hook_name: str, hook_url: str) -> WebhookInfo: hook_name (str): The webhook name. hook_url (str): The webhook URL. - Returns: WebhookInfo: The webhook username, avatar, guild id, etc. """ @@ -1190,64 +756,12 @@ def get_data_from_hook_url(hook_name: str, hook_url: str) -> WebhookInfo: return our_hook -@app.get("/settings", response_class=HTMLResponse) -async def get_settings( - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): - """Settings page. - - Args: - request: The request object. - reader: The Reader instance. - - Returns: - HTMLResponse: The settings page. - """ - # Get the global default update interval - global_interval: int = 60 # Default to 60 minutes if not set - global_update_config = reader.get_tag((), ".reader.update", None) - if isinstance(global_update_config, dict) and "interval" in global_update_config: - interval_value = global_update_config["interval"] - if isinstance(interval_value, int): - global_interval = interval_value - - # Get all feeds with their intervals - feeds: Iterable[Feed] = reader.get_feeds() - feed_intervals = [] - for feed in feeds: - feed_interval: int | None = None - feed_update_config = reader.get_tag(feed, ".reader.update", None) - if isinstance(feed_update_config, dict) and "interval" in feed_update_config: - interval_value = feed_update_config["interval"] - if isinstance(interval_value, int): - feed_interval = interval_value - - feed_intervals.append({ - "feed": feed, - "interval": feed_interval, - "effective_interval": feed_interval or global_interval, - "domain": extract_domain(feed.url), - }) - - context = { - "request": request, - "global_interval": global_interval, - "feed_intervals": feed_intervals, - } - return templates.TemplateResponse(request=request, name="settings.html", context=context) - - @app.get("/webhooks", response_class=HTMLResponse) -async def get_webhooks( - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def get_webhooks(request: Request): """Page for adding a new webhook. Args: request: The request object. - reader: The Reader instance. Returns: HTMLResponse: The add webhook page. @@ -1268,241 +782,136 @@ async def get_webhooks( @app.get("/", response_class=HTMLResponse) -def get_index( - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], - message: str = "", -): +def get_index(request: Request): """This is the root of the website. Args: request: The request object. - message: Optional message to display to the user. - reader: The Reader instance. Returns: HTMLResponse: The index page. """ - return templates.TemplateResponse( - request=request, - name="index.html", - context=make_context_index(request, message, reader), - ) + return templates.TemplateResponse(request=request, name="index.html", context=make_context_index(request)) -def make_context_index(request: Request, message: str = "", reader: Reader | None = None): +def make_context_index(request: Request): """Create the needed context for the index page. Args: request: The request object. - message: Optional message to display to the user. - reader: The Reader instance. Returns: dict: The context for the index page. """ - effective_reader: Reader = reader or get_reader_dependency() - hooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(effective_reader.get_tag((), "webhooks", []))) + hooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(reader.get_tag((), "webhooks", []))) - feed_list: list[dict[str, JSONType | Feed | str]] = [] - broken_feeds: list[Feed] = [] - feeds_without_attached_webhook: list[Feed] = [] + feed_list = [] + broken_feeds = [] + feeds_without_attached_webhook = [] - # Get all feeds and organize them - feeds: Iterable[Feed] = effective_reader.get_feeds() + feeds: Iterable[Feed] = reader.get_feeds() for feed in feeds: - webhook: str = str(effective_reader.get_tag(feed.url, "webhook", "")) - if not webhook: + try: + webhook = reader.get_tag(feed.url, "webhook") + feed_list.append({"feed": feed, "webhook": webhook}) + except TagNotFoundError: broken_feeds.append(feed) continue - feed_list.append({"feed": feed, "webhook": webhook, "domain": extract_domain(feed.url)}) - - webhook_list: list[str] = [hook["url"] for hook in hooks] + webhook_list = [hook["url"] for hook in hooks] if webhook not in webhook_list: feeds_without_attached_webhook.append(feed) return { "request": request, "feeds": feed_list, - "feed_count": effective_reader.get_feed_counts(), - "entry_count": effective_reader.get_entry_counts(), + "feed_count": reader.get_feed_counts(), + "entry_count": reader.get_entry_counts(), "webhooks": hooks, "broken_feeds": broken_feeds, "feeds_without_attached_webhook": feeds_without_attached_webhook, - "messages": message or None, } @app.post("/remove", response_class=HTMLResponse) -async def remove_feed( - feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def remove_feed(feed_url: Annotated[str, Form()]): """Get a feed by URL. Args: feed_url: The feed to add. - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to the index page. Raises: HTTPException: Feed not found + + Returns: + RedirectResponse: Redirect to the index page. """ try: reader.delete_feed(urllib.parse.unquote(feed_url)) except FeedNotFoundError as e: raise HTTPException(status_code=404, detail="Feed not found") from e - commit_state_change(reader, f"Remove feed {urllib.parse.unquote(feed_url)}") - return RedirectResponse(url="/", status_code=303) -@app.get("/update", response_class=HTMLResponse) -async def update_feed( - request: Request, - feed_url: str, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): - """Update a feed. - - Args: - request: The request object. - feed_url: The feed URL to update. - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to the feed page. - - Raises: - HTTPException: If the feed is not found. - """ - try: - reader.update_feed(urllib.parse.unquote(feed_url)) - except FeedNotFoundError as e: - raise HTTPException(status_code=404, detail="Feed not found") from e - - logger.info("Manually updated feed: %s", feed_url) - return RedirectResponse(url="/feed?feed_url=" + urllib.parse.quote(feed_url), status_code=303) - - -@app.post("/backup") -async def manual_backup( - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: - """Manually trigger a git backup of the current state. - - Args: - request: The request object. - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to the index page with a success or error message. - """ - backup_path = get_backup_path() - if backup_path is None: - message = "Git backup is not configured. Set GIT_BACKUP_PATH environment variable to enable backups." - logger.warning("Manual git backup attempted but GIT_BACKUP_PATH is not configured") - return RedirectResponse(url=f"/?message={urllib.parse.quote(message)}", status_code=303) - - try: - commit_state_change(reader, "Manual backup triggered from web UI") - message = "Successfully created git backup!" - logger.info("Manual git backup completed successfully") - except Exception as e: - message = f"Failed to create git backup: {e}" - logger.exception("Manual git backup failed") - - return RedirectResponse(url=f"/?message={urllib.parse.quote(message)}", status_code=303) - - @app.get("/search", response_class=HTMLResponse) -async def search( - request: Request, - query: str, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def search(request: Request, query: str): """Get entries matching a full-text search query. Args: query: The query to search for. request: The request object. - reader: The Reader instance. Returns: HTMLResponse: The search page. """ reader.update_search() - context = create_search_context(query, reader=reader) - return templates.TemplateResponse(request=request, name="search.html", context={"request": request, **context}) + + context = { + "request": request, + "search_html": create_html_for_search_results(query), + "query": query, + "search_amount": reader.search_entry_counts(query), + } + return templates.TemplateResponse(request=request, name="search.html", context=context) @app.get("/post_entry", response_class=HTMLResponse) -async def post_entry( - entry_id: str, - reader: Annotated[Reader, Depends(get_reader_dependency)], - feed_url: str = "", -): +async def post_entry(entry_id: str): """Send single entry to Discord. Args: entry_id: The entry to send. - feed_url: Optional feed URL used to disambiguate entries with identical IDs. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. """ unquoted_entry_id: str = urllib.parse.unquote(entry_id) - clean_feed_url: str = urllib.parse.unquote(feed_url.strip()) if feed_url else "" - - # Prefer feed-scoped lookup when feed_url is provided. This avoids ambiguity when - # multiple feeds contain entries with the same ID. - entry: Entry | None = None - if clean_feed_url: - entry = next( - (entry for entry in reader.get_entries(feed=clean_feed_url) if entry.id == unquoted_entry_id), - None, - ) - else: - entry = next((entry for entry in reader.get_entries() if entry.id == unquoted_entry_id), None) - + entry: Entry | None = next((entry for entry in reader.get_entries() if entry.id == unquoted_entry_id), None) if entry is None: return HTMLResponse(status_code=404, content=f"Entry '{entry_id}' not found.") - if result := send_entry_to_discord(entry=entry, reader=reader): + if result := send_entry_to_discord(entry=entry): return result # Redirect to the feed page. - redirect_feed_url: str = entry.feed.url.strip() - return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(redirect_feed_url)}", status_code=303) + clean_feed_url: str = entry.feed.url.strip() + return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.post("/modify_webhook", response_class=HTMLResponse) -def modify_webhook( - old_hook: Annotated[str, Form()], - new_hook: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], - redirect_to: Annotated[str, Form()] = "", -): +def modify_webhook(old_hook: Annotated[str, Form()], new_hook: Annotated[str, Form()]): """Modify a webhook. Args: old_hook: The webhook to modify. new_hook: The new webhook. - redirect_to: Optional redirect URL after the update. - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to the webhook page. Raises: HTTPException: Webhook could not be modified. + Returns: + RedirectResponse: Redirect to the webhook page. """ # Get current webhooks from the database if they exist otherwise use an empty list. webhooks = list(reader.get_tag((), "webhooks", [])) @@ -1510,20 +919,15 @@ def modify_webhook( # Webhooks are stored as a list of dictionaries. # Example: [{"name": "webhook_name", "url": "webhook_url"}] webhooks = cast("list[dict[str, str]]", webhooks) - old_hook_clean: str = old_hook.strip() - new_hook_clean: str = new_hook.strip() - webhook_modified: bool = False for hook in webhooks: - if hook["url"] in old_hook_clean: - hook["url"] = new_hook_clean + if hook["url"] in old_hook.strip(): + hook["url"] = new_hook.strip() # Check if it has been modified. - if hook["url"] != new_hook_clean: + if hook["url"] != new_hook.strip(): raise HTTPException(status_code=500, detail="Webhook could not be modified") - webhook_modified = True - # Add our new list of webhooks to the database. reader.set_tag((), "webhooks", webhooks) # pyright: ignore[reportArgumentType] @@ -1531,506 +935,16 @@ def modify_webhook( # matches the old one. feeds: Iterable[Feed] = reader.get_feeds() for feed in feeds: - webhook: str = str(reader.get_tag(feed, "webhook", "")) + try: + webhook = reader.get_tag(feed, "webhook") + except TagNotFoundError: + continue - if webhook == old_hook_clean: - reader.set_tag(feed.url, "webhook", new_hook_clean) # pyright: ignore[reportArgumentType] + if webhook == old_hook.strip(): + reader.set_tag(feed.url, "webhook", new_hook.strip()) # pyright: ignore[reportArgumentType] - if webhook_modified and old_hook_clean != new_hook_clean: - commit_state_change(reader, f"Modify webhook URL from {old_hook_clean} to {new_hook_clean}") - - redirect_url: str = redirect_to.strip() or "/webhooks" - if redirect_to: - redirect_url = redirect_url.replace(urllib.parse.quote(old_hook_clean), urllib.parse.quote(new_hook_clean)) - redirect_url = redirect_url.replace(old_hook_clean, new_hook_clean) - - # Redirect to the requested page. - return RedirectResponse(url=redirect_url, status_code=303) - - -def extract_youtube_video_id(url: str) -> str | None: - """Extract YouTube video ID from a YouTube video URL. - - Args: - url: The YouTube video URL. - - Returns: - The video ID if found, None otherwise. - """ - if not url: - return None - - # Handle standard YouTube URLs (youtube.com/watch?v=VIDEO_ID) - if "youtube.com/watch" in url and "v=" in url: - return url.split("v=")[1].split("&", maxsplit=1)[0] - - # Handle shortened YouTube URLs (youtu.be/VIDEO_ID) - if "youtu.be/" in url: - return url.split("youtu.be/")[1].split("?", maxsplit=1)[0] - - return None - - -def resolve_final_feed_url(url: str) -> tuple[str, str | None]: - """Resolve a feed URL by following redirects. - - Args: - url: The feed URL to resolve. - - Returns: - tuple[str, str | None]: A tuple with (resolved_url, error_message). - error_message is None when resolution succeeded. - """ - clean_url: str = url.strip() - if not clean_url: - return "", "URL is empty" - - if not is_url_valid(clean_url): - return clean_url, "URL is invalid" - - try: - response: Response = httpx.get(clean_url, follow_redirects=True, timeout=10.0) - except httpx.HTTPError as e: - return clean_url, str(e) - - if not response.is_success: - return clean_url, f"HTTP {response.status_code}" - - return str(response.url), None - - -def create_webhook_feed_url_preview( - webhook_feeds: list[Feed], - replace_from: str, - replace_to: str, - resolve_urls: bool, # noqa: FBT001 - force_update: bool = False, # noqa: FBT001, FBT002 - existing_feed_urls: set[str] | None = None, -) -> list[dict[str, str | bool | None]]: - """Create preview rows for bulk feed URL replacement. - - Args: - webhook_feeds: Feeds attached to a webhook. - replace_from: Text to replace in each URL. - replace_to: Replacement text. - resolve_urls: Whether to resolve resulting URLs via HTTP redirects. - force_update: Whether conflicts should be marked as force-overwritable. - existing_feed_urls: Optional set of all tracked feed URLs used for conflict detection. - - Returns: - list[dict[str, str | bool | None]]: Rows used in the preview table. - """ - known_feed_urls: set[str] = existing_feed_urls or {feed.url for feed in webhook_feeds} - preview_rows: list[dict[str, str | bool | None]] = [] - for feed in webhook_feeds: - old_url: str = feed.url - has_match: bool = bool(replace_from and replace_from in old_url) - - candidate_url: str = old_url - if has_match: - candidate_url = old_url.replace(replace_from, replace_to) - - resolved_url: str = candidate_url - resolution_error: str | None = None - if has_match and candidate_url != old_url and resolve_urls: - resolved_url, resolution_error = resolve_final_feed_url(candidate_url) - - will_force_ignore_errors: bool = bool( - force_update and bool(resolution_error) and has_match and old_url != candidate_url, - ) - - target_exists: bool = bool( - has_match and not resolution_error and resolved_url != old_url and resolved_url in known_feed_urls, - ) - will_force_overwrite: bool = bool(target_exists and force_update) - will_change: bool = bool( - has_match - and old_url != (candidate_url if will_force_ignore_errors else resolved_url) - and (not target_exists or will_force_overwrite) - and (not resolution_error or will_force_ignore_errors), - ) - - preview_rows.append({ - "old_url": old_url, - "candidate_url": candidate_url, - "resolved_url": resolved_url, - "has_match": has_match, - "will_change": will_change, - "target_exists": target_exists, - "will_force_overwrite": will_force_overwrite, - "will_force_ignore_errors": will_force_ignore_errors, - "resolution_error": resolution_error, - }) - - return preview_rows - - -def build_webhook_mass_update_context( - webhook_feeds: list[Feed], - all_feeds: list[Feed], - replace_from: str, - replace_to: str, - resolve_urls: bool, # noqa: FBT001 - force_update: bool = False, # noqa: FBT001, FBT002 -) -> dict[str, str | bool | int | list[dict[str, str | bool | None]] | dict[str, int]]: - """Build context data used by the webhook mass URL update preview UI. - - Args: - webhook_feeds: Feeds attached to the selected webhook. - all_feeds: All tracked feeds. - replace_from: Text to replace in URLs. - replace_to: Replacement text. - resolve_urls: Whether to resolve resulting URLs. - force_update: Whether to allow overwriting existing target URLs. - - Returns: - dict[str, ...]: Context values for rendering preview controls and table. - """ - clean_replace_from: str = replace_from.strip() - clean_replace_to: str = replace_to.strip() - - preview_rows: list[dict[str, str | bool | None]] = [] - if clean_replace_from: - preview_rows = create_webhook_feed_url_preview( - webhook_feeds=webhook_feeds, - replace_from=clean_replace_from, - replace_to=clean_replace_to, - resolve_urls=resolve_urls, - force_update=force_update, - existing_feed_urls={feed.url for feed in all_feeds}, - ) - - preview_summary: dict[str, int] = { - "total": len(preview_rows), - "matched": sum(1 for row in preview_rows if row["has_match"]), - "will_update": sum(1 for row in preview_rows if row["will_change"]), - "conflicts": sum(1 for row in preview_rows if row["target_exists"] and not row["will_force_overwrite"]), - "force_overwrite": sum(1 for row in preview_rows if row["will_force_overwrite"]), - "force_ignore_errors": sum(1 for row in preview_rows if row["will_force_ignore_errors"]), - "resolve_errors": sum(1 for row in preview_rows if row["resolution_error"]), - } - preview_summary["no_match"] = preview_summary["total"] - preview_summary["matched"] - preview_summary["no_change"] = sum( - 1 for row in preview_rows if row["has_match"] and not row["resolution_error"] and not row["will_change"] - ) - - return { - "replace_from": clean_replace_from, - "replace_to": clean_replace_to, - "resolve_urls": resolve_urls, - "force_update": force_update, - "preview_rows": preview_rows, - "preview_summary": preview_summary, - "preview_change_count": preview_summary["will_update"], - } - - -@app.get("/webhook_entries_mass_update_preview", response_class=HTMLResponse) -async def get_webhook_entries_mass_update_preview( - webhook_url: str, - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], - replace_from: str = "", - replace_to: str = "", - resolve_urls: bool = True, # noqa: FBT001, FBT002 - force_update: bool = False, # noqa: FBT001, FBT002 -) -> HTMLResponse: - """Render the mass-update preview fragment for a webhook using HTMX. - - Args: - webhook_url: Webhook URL whose feeds are being updated. - request: The request object. - reader: The Reader instance. - replace_from: Text to find in URLs. - replace_to: Replacement text. - resolve_urls: Whether to resolve resulting URLs. - force_update: Whether to allow overwriting existing target URLs. - - Returns: - HTMLResponse: Rendered partial template containing summary + preview table. - """ - clean_webhook_url: str = urllib.parse.unquote(webhook_url.strip()) - all_feeds: list[Feed] = list(reader.get_feeds()) - webhook_feeds: list[Feed] = [ - feed for feed in all_feeds if str(reader.get_tag(feed.url, "webhook", "")) == clean_webhook_url - ] - - context = { - "request": request, - "webhook_url": clean_webhook_url, - **build_webhook_mass_update_context( - webhook_feeds=webhook_feeds, - all_feeds=all_feeds, - replace_from=replace_from, - replace_to=replace_to, - resolve_urls=resolve_urls, - force_update=force_update, - ), - } - return templates.TemplateResponse(request=request, name="_webhook_mass_update_preview.html", context=context) - - -@app.get("/webhook_entries", response_class=HTMLResponse) -async def get_webhook_entries( # noqa: C901, PLR0914 - webhook_url: str, - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], - starting_after: str = "", - replace_from: str = "", - replace_to: str = "", - resolve_urls: bool = True, # noqa: FBT001, FBT002 - force_update: bool = False, # noqa: FBT001, FBT002 - message: str = "", -) -> HTMLResponse: - """Get all latest entries from all feeds for a specific webhook. - - Args: - webhook_url: The webhook URL to get entries for. - request: The request object. - starting_after: The entry to start after. Used for pagination. - replace_from: Optional URL substring to find for bulk URL replacement preview. - replace_to: Optional replacement substring used in bulk URL replacement preview. - resolve_urls: Whether to resolve replaced URLs by following redirects. - force_update: Whether to allow overwriting existing target URLs during apply. - message: Optional status message shown in the UI. - reader: The Reader instance. - - Returns: - HTMLResponse: The webhook entries page. - - Raises: - HTTPException: If no feeds are found for this webhook or webhook doesn't exist. - """ - entries_per_page: int = 20 - clean_webhook_url: str = urllib.parse.unquote(webhook_url.strip()) - - # Get the webhook name from the webhooks list - webhooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(reader.get_tag((), "webhooks", []))) - webhook_name: str = "" - for hook in webhooks: - if hook["url"] == clean_webhook_url: - webhook_name = hook["name"] - break - - if not webhook_name: - raise HTTPException(status_code=404, detail=f"Webhook not found: {clean_webhook_url}") - - hook_info: WebhookInfo = get_data_from_hook_url(hook_name=webhook_name, hook_url=clean_webhook_url) - - # Get all feeds associated with this webhook - all_feeds: list[Feed] = list(reader.get_feeds()) - webhook_feeds: list[Feed] = [] - - for feed in all_feeds: - feed_webhook: str = str(reader.get_tag(feed.url, "webhook", "")) - if feed_webhook == clean_webhook_url: - webhook_feeds.append(feed) - - # Get all entries from all feeds for this webhook, sorted by published date - all_entries: list[Entry] = [entry for feed in webhook_feeds for entry in reader.get_entries(feed=feed)] - - # Sort entries by published date (newest first), with undated entries last. - all_entries.sort( - key=lambda e: ( - e.published is not None, - e.published or datetime.min.replace(tzinfo=UTC), - ), - reverse=True, - ) - - # Handle pagination - if starting_after: - try: - start_after_entry: Entry | None = reader.get_entry(( - starting_after.split("|", maxsplit=1)[0], - starting_after.split("|")[1], - )) - except (FeedNotFoundError, EntryNotFoundError): - start_after_entry = None - else: - start_after_entry = None - - # Find the index of the starting entry - start_index: int = 0 - if start_after_entry: - for idx, entry in enumerate(all_entries): - if entry.id == start_after_entry.id and entry.feed.url == start_after_entry.feed.url: - start_index = idx + 1 - break - - # Get the page of entries - paginated_entries: list[Entry] = all_entries[start_index : start_index + entries_per_page] - - # Get the last entry for pagination - last_entry: Entry | None = None - if paginated_entries: - last_entry = paginated_entries[-1] - - # Create the html for the entries - html: str = create_html_for_feed(reader=reader, entries=paginated_entries) - - mass_update_context = build_webhook_mass_update_context( - webhook_feeds=webhook_feeds, - all_feeds=all_feeds, - replace_from=replace_from, - replace_to=replace_to, - resolve_urls=resolve_urls, - force_update=force_update, - ) - - # Check if there are more entries available - total_entries: int = len(all_entries) - is_show_more_entries_button_visible: bool = (start_index + entries_per_page) < total_entries - - context = { - "request": request, - "hook_info": hook_info, - "webhook_name": webhook_name, - "webhook_url": clean_webhook_url, - "webhook_feeds": webhook_feeds, - "entries": paginated_entries, - "html": html, - "last_entry": last_entry, - "is_show_more_entries_button_visible": is_show_more_entries_button_visible, - "total_entries": total_entries, - "feeds_count": len(webhook_feeds), - "message": urllib.parse.unquote(message) if message else "", - **mass_update_context, - } - return templates.TemplateResponse(request=request, name="webhook_entries.html", context=context) - - -@app.post("/bulk_change_feed_urls", response_class=HTMLResponse) -async def post_bulk_change_feed_urls( # noqa: C901, PLR0914, PLR0912, PLR0915 - webhook_url: Annotated[str, Form()], - replace_from: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], - replace_to: Annotated[str, Form()] = "", - resolve_urls: Annotated[bool, Form()] = True, # noqa: FBT002 - force_update: Annotated[bool, Form()] = False, # noqa: FBT002 -) -> RedirectResponse: - """Bulk-change feed URLs attached to a webhook. - - Args: - webhook_url: The webhook URL whose feeds should be updated. - replace_from: Text to find in each URL. - replace_to: Text to replace with. - resolve_urls: Whether to resolve resulting URLs via redirects. - force_update: Whether existing target feed URLs should be overwritten. - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to webhook detail with status message. - - Raises: - HTTPException: If webhook is missing or replace_from is empty. - """ - clean_webhook_url: str = urllib.parse.unquote(webhook_url.strip()) - clean_replace_from: str = replace_from.strip() - clean_replace_to: str = replace_to.strip() - - if not clean_replace_from: - raise HTTPException(status_code=400, detail="replace_from cannot be empty") - - webhooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(reader.get_tag((), "webhooks", []))) - if not any(hook["url"] == clean_webhook_url for hook in webhooks): - raise HTTPException(status_code=404, detail=f"Webhook not found: {clean_webhook_url}") - - all_feeds: list[Feed] = list(reader.get_feeds()) - webhook_feeds: list[Feed] = [] - for feed in all_feeds: - feed_webhook: str = str(reader.get_tag(feed.url, "webhook", "")) - if feed_webhook == clean_webhook_url: - webhook_feeds.append(feed) - - preview_rows: list[dict[str, str | bool | None]] = create_webhook_feed_url_preview( - webhook_feeds=webhook_feeds, - replace_from=clean_replace_from, - replace_to=clean_replace_to, - resolve_urls=resolve_urls, - force_update=force_update, - existing_feed_urls={feed.url for feed in all_feeds}, - ) - - changed_count: int = 0 - skipped_count: int = 0 - failed_count: int = 0 - conflict_count: int = 0 - force_overwrite_count: int = 0 - - for row in preview_rows: - if not row["has_match"]: - continue - - if row["resolution_error"] and not force_update: - skipped_count += 1 - continue - - if row["target_exists"] and not force_update: - conflict_count += 1 - skipped_count += 1 - continue - - old_url: str = str(row["old_url"]) - new_url: str = str(row["candidate_url"] if row["will_force_ignore_errors"] else row["resolved_url"]) - - if old_url == new_url: - skipped_count += 1 - continue - - if row["target_exists"] and force_update: - try: - reader.delete_feed(new_url) - force_overwrite_count += 1 - except FeedNotFoundError: - pass - except ReaderError: - failed_count += 1 - continue - - try: - reader.change_feed_url(old_url, new_url) - except FeedExistsError: - skipped_count += 1 - continue - except FeedNotFoundError: - skipped_count += 1 - continue - except ReaderError: - failed_count += 1 - continue - - try: - reader.update_feed(new_url) - except Exception: - logger.exception("Failed to update feed after URL change: %s", new_url) - - for entry in reader.get_entries(feed=new_url, read=False): - try: - reader.set_entry_read(entry, True) - except Exception: - logger.exception("Failed to mark entry as read after URL change: %s", entry.id) - - changed_count += 1 - - if changed_count > 0: - commit_state_change( - reader, - f"Bulk change {changed_count} feed URL(s) for webhook {clean_webhook_url}", - ) - - status_message: str = ( - f"Updated {changed_count} feed URL(s). " - f"Force overwrote {force_overwrite_count}. " - f"Conflicts {conflict_count}. " - f"Skipped {skipped_count}. " - f"Failed {failed_count}." - ) - redirect_url: str = ( - f"/webhook_entries?webhook_url={urllib.parse.quote(clean_webhook_url)}" - f"&message={urllib.parse.quote(status_message)}" - ) - return RedirectResponse(url=redirect_url, status_code=303) + # Redirect to the webhook page. + return RedirectResponse(url="/webhooks", status_code=303) if __name__ == "__main__": @@ -2043,9 +957,9 @@ if __name__ == "__main__": uvicorn.run( "main:app", - log_level="debug", + log_level="info", host="0.0.0.0", # noqa: S104 - port=3000, + port=5000, proxy_headers=True, forwarded_allow_ips="*", ) diff --git a/discord_rss_bot/missing_tags.py b/discord_rss_bot/missing_tags.py new file mode 100644 index 0000000..84f375e --- /dev/null +++ b/discord_rss_bot/missing_tags.py @@ -0,0 +1,106 @@ +from __future__ import annotations + +from reader import Feed, Reader, TagNotFoundError + +from discord_rss_bot.settings import default_custom_embed, default_custom_message + + +def add_custom_message(reader: Reader, feed: Feed) -> None: + """Add the custom message tag to the feed if it doesn't exist. + + Args: + reader: What Reader to use. + feed: The feed to add the tag to. + """ + try: + reader.get_tag(feed, "custom_message") + except TagNotFoundError: + reader.set_tag(feed.url, "custom_message", default_custom_message) # pyright: ignore[reportArgumentType] + reader.set_tag(feed.url, "has_custom_message", True) # pyright: ignore[reportArgumentType] + + +def add_has_custom_message(reader: Reader, feed: Feed) -> None: + """Add the has_custom_message tag to the feed if it doesn't exist. + + Args: + reader: What Reader to use. + feed: The feed to add the tag to. + """ + try: + reader.get_tag(feed, "has_custom_message") + except TagNotFoundError: + if reader.get_tag(feed, "custom_message") == default_custom_message: + reader.set_tag(feed.url, "has_custom_message", False) # pyright: ignore[reportArgumentType] + else: + reader.set_tag(feed.url, "has_custom_message", True) # pyright: ignore[reportArgumentType] + + +def add_if_embed(reader: Reader, feed: Feed) -> None: + """Add the if_embed tag to the feed if it doesn't exist. + + Args: + reader: What Reader to use. + feed: The feed to add the tag to. + """ + try: + reader.get_tag(feed, "if_embed") + except TagNotFoundError: + reader.set_tag(feed.url, "if_embed", True) # pyright: ignore[reportArgumentType] + + +def add_custom_embed(reader: Reader, feed: Feed) -> None: + """Add the custom embed tag to the feed if it doesn't exist. + + Args: + reader: What Reader to use. + feed: The feed to add the tag to. + """ + try: + reader.get_tag(feed, "embed") + except TagNotFoundError: + reader.set_tag(feed.url, "embed", default_custom_embed) # pyright: ignore[reportArgumentType] + reader.set_tag(feed.url, "has_custom_embed", True) # pyright: ignore[reportArgumentType] + + +def add_has_custom_embed(reader: Reader, feed: Feed) -> None: + """Add the has_custom_embed tag to the feed if it doesn't exist. + + Args: + reader: What Reader to use. + feed: The feed to add the tag to. + """ + try: + reader.get_tag(feed, "has_custom_embed") + except TagNotFoundError: + if reader.get_tag(feed, "embed") == default_custom_embed: + reader.set_tag(feed.url, "has_custom_embed", False) # pyright: ignore[reportArgumentType] + else: + reader.set_tag(feed.url, "has_custom_embed", True) # pyright: ignore[reportArgumentType] + + +def add_should_send_embed(reader: Reader, feed: Feed) -> None: + """Add the should_send_embed tag to the feed if it doesn't exist. + + Args: + reader: What Reader to use. + feed: The feed to add the tag to. + """ + try: + reader.get_tag(feed, "should_send_embed") + except TagNotFoundError: + reader.set_tag(feed.url, "should_send_embed", True) # pyright: ignore[reportArgumentType] + + +def add_missing_tags(reader: Reader) -> None: + """Add missing tags to feeds. + + Args: + reader: What Reader to use. + """ + for feed in reader.get_feeds(): + add_custom_message(reader, feed) + add_has_custom_message(reader, feed) + add_if_embed(reader, feed) + add_custom_embed(reader, feed) + add_has_custom_embed(reader, feed) + add_should_send_embed(reader, feed) diff --git a/discord_rss_bot/search.py b/discord_rss_bot/search.py index 85129ac..c81b398 100644 --- a/discord_rss_bot/search.py +++ b/discord_rss_bot/search.py @@ -3,78 +3,66 @@ from __future__ import annotations import urllib.parse from typing import TYPE_CHECKING +from discord_rss_bot.settings import get_reader + if TYPE_CHECKING: from collections.abc import Iterable - from reader import EntrySearchResult - from reader import Feed - from reader import HighlightedString - from reader import Reader + from reader import EntrySearchResult, Feed, HighlightedString, Reader -def create_search_context(query: str, reader: Reader) -> dict: - """Build context for search.html template. +def create_html_for_search_results(query: str, custom_reader: Reader | None = None) -> str: + """Create HTML for the search results. Args: - query (str): The search query. - reader (Reader): Custom Reader instance. + query: Our search query + custom_reader: The reader. If None, we will get the reader from the settings. Returns: - dict: Context dictionary for rendering the search results. + str: The HTML. """ + # TODO(TheLovinator): There is a .content that also contains text, we should use that if .summary is not available. + # TODO(TheLovinator): We should also add tags to the title. + + # Get the default reader if we didn't get a custom one. + reader: Reader = get_reader() if custom_reader is None else custom_reader + search_results: Iterable[EntrySearchResult] = reader.search_entries(query) - results: list[dict] = [] + html: str = "" for result in search_results: - feed: Feed = reader.get_feed(result.feed_url) - feed_url: str = urllib.parse.quote(feed.url) - - # Prefer summary, fall back to content if ".summary" in result.content: - highlighted = result.content[".summary"] - else: - content_keys = [k for k in result.content if k.startswith(".content")] - highlighted = result.content[content_keys[0]] if content_keys else None + result_summary: str = add_span_with_slice(result.content[".summary"]) + feed: Feed = reader.get_feed(result.feed_url) + feed_url: str = urllib.parse.quote(feed.url) - summary: str = add_spans(highlighted) if highlighted else "(no preview available)" + html += f""" +
+ +

{result.metadata[".title"]}

+
+ {result_summary} +
+ """ - results.append({ - "title": add_spans(result.metadata.get(".title")), - "summary": summary, - "feed_url": feed_url, - }) - - return { - "query": query, - "search_amount": {"total": len(results)}, - "results": results, - } + return html -def add_spans(highlighted_string: HighlightedString | None) -> str: - """Wrap all highlighted parts with tags. +def add_span_with_slice(highlighted_string: HighlightedString) -> str: + """Add span tags to the string to highlight the search results. Args: - highlighted_string (HighlightedString | None): The highlighted string to process. + highlighted_string: The highlighted string. Returns: - str: The processed string with tags around highlighted parts. + str: The string with added tags. """ - if highlighted_string is None: - return "" - - value: str = highlighted_string.value - parts: list[str] = [] - last_index = 0 + # TODO(TheLovinator): We are looping through the highlights and only using the last one. We should use all of them. + before_span, span_part, after_span = "", "", "" for txt_slice in highlighted_string.highlights: - parts.extend(( - value[last_index : txt_slice.start], - f"{value[txt_slice.start : txt_slice.stop]}", - )) - last_index = txt_slice.stop + before_span: str = f"{highlighted_string.value[: txt_slice.start]}" + span_part: str = f"{highlighted_string.value[txt_slice.start : txt_slice.stop]}" + after_span: str = f"{highlighted_string.value[txt_slice.stop :]}" - # add any trailing text - parts.append(value[last_index:]) - - return "".join(parts) + return f"{before_span}{span_part}{after_span}" diff --git a/discord_rss_bot/settings.py b/discord_rss_bot/settings.py index 194bf08..a99733e 100644 --- a/discord_rss_bot/settings.py +++ b/discord_rss_bot/settings.py @@ -1,23 +1,16 @@ from __future__ import annotations -import os import typing from functools import lru_cache from pathlib import Path from platformdirs import user_data_dir -from reader import Reader -from reader import make_reader +from reader import Reader, make_reader if typing.TYPE_CHECKING: from reader.types import JSONType -data_dir: str = os.getenv("DISCORD_RSS_BOT_DATA_DIR", "").strip() or user_data_dir( - appname="discord_rss_bot", - appauthor="TheLovinator", - roaming=True, - ensure_exists=True, -) +data_dir: str = user_data_dir(appname="discord_rss_bot", appauthor="TheLovinator", roaming=True, ensure_exists=True) # TODO(TheLovinator): Add default things to the database and make the edible. @@ -31,7 +24,7 @@ default_custom_embed: dict[str, str] = { } -@lru_cache(maxsize=1) +@lru_cache def get_reader(custom_location: Path | None = None) -> Reader: """Get the reader. @@ -42,13 +35,5 @@ def get_reader(custom_location: Path | None = None) -> Reader: The reader. """ db_location: Path = custom_location or Path(data_dir) / "db.sqlite" - reader: Reader = make_reader(url=str(db_location)) - # https://reader.readthedocs.io/en/latest/api.html#reader.types.UpdateConfig - # Set the default update interval to 15 minutes if not already configured - # Users can change this via the Settings page or per-feed in the feed page - if reader.get_tag((), ".reader.update", None) is None: - # Set default - reader.set_tag((), ".reader.update", {"interval": 15}) - - return reader + return make_reader(url=str(db_location)) diff --git a/discord_rss_bot/static/styles.css b/discord_rss_bot/static/styles.css index 266f951..db0cfba 100644 --- a/discord_rss_bot/static/styles.css +++ b/discord_rss_bot/static/styles.css @@ -13,7 +13,3 @@ body { .form-text { color: #acabab; } - -.interval-input { - max-width: 120px; -} diff --git a/discord_rss_bot/templates/_webhook_mass_update_preview.html b/discord_rss_bot/templates/_webhook_mass_update_preview.html deleted file mode 100644 index a59e97b..0000000 --- a/discord_rss_bot/templates/_webhook_mass_update_preview.html +++ /dev/null @@ -1,73 +0,0 @@ -{% if preview_rows %} -

- {{ preview_change_count }} feed URL{{ 's' if preview_change_count != 1 else '' }} ready to update. -

-
- Total: {{ preview_summary.total }} - Matched: {{ preview_summary.matched }} - Will update: {{ preview_summary.will_update }} - Conflicts: {{ preview_summary.conflicts }} - Force overwrite: {{ preview_summary.force_overwrite }} - Force ignore errors: {{ preview_summary.force_ignore_errors }} - Resolve errors: {{ preview_summary.resolve_errors }} - No change: {{ preview_summary.no_change }} - No match: {{ preview_summary.no_match }} -
-
- - - - - - -
-
- - - - - - - - - - {% for row in preview_rows %} - - - - - - {% endfor %} - -
Old URLNew URLStatus
- {{ row.old_url }} - - {{ row.resolved_url if resolve_urls else row.candidate_url }} - - {% if not row.has_match %} - No match - {% elif row.will_force_ignore_errors %} - Will force update (ignore resolve error) - {% elif row.resolution_error %} - {{ row.resolution_error }} - {% elif row.will_force_overwrite %} - Will force overwrite - {% elif row.target_exists %} - Conflict: target URL exists - {% elif row.will_change %} - Will update - {% else %} - No change - {% endif %} -
-
-{% elif replace_from %} -

No preview rows found for that replacement pattern.

-{% endif %} diff --git a/discord_rss_bot/templates/base.html b/discord_rss_bot/templates/base.html index 9146b35..a8640dd 100644 --- a/discord_rss_bot/templates/base.html +++ b/discord_rss_bot/templates/base.html @@ -1,12 +1,13 @@ + + content="Stay updated with the latest news and events with our easy-to-use RSS bot. Never miss a message or announcement again with real-time notifications directly to your Discord server." /> + content="discord, rss, bot, notifications, announcements, updates, real-time, server, messages, news, events, feed." /> @@ -17,20 +18,19 @@ {% block head %} {% endblock head %} + {% include "nav.html" %}
{% if messages %} - + {% endif %} + {% block content %} {% endblock content %}
@@ -41,20 +41,18 @@
- + diff --git a/discord_rss_bot/templates/blacklist.html b/discord_rss_bot/templates/blacklist.html index ec16bce..3632277 100644 --- a/discord_rss_bot/templates/blacklist.html +++ b/discord_rss_bot/templates/blacklist.html @@ -42,49 +42,6 @@ - -
-
-
    -
  • - Regular expression patterns for advanced filtering. Each pattern should be on a new - line. -
  • -
  • Patterns are case-insensitive.
  • -
  • - Examples: - -
    -^New Release:.*
    -\b(update|version|patch)\s+\d+\.\d+
    -.*\[(important|notice)\].*
    -
    -
    -
  • -
-
- - - - - - - - - - - -
diff --git a/discord_rss_bot/templates/custom.html b/discord_rss_bot/templates/custom.html index 48cb3bc..f018d3a 100644 --- a/discord_rss_bot/templates/custom.html +++ b/discord_rss_bot/templates/custom.html @@ -14,90 +14,90 @@
  • You can use \n to create a new line.
  • You can remove the embed from links by adding < and> around the link. (For example < - {% raw %} {{entry_link}} {% endraw %}>) + {% raw %} {{ entry_link }} {% endraw %}>)

  • {% raw %} - {{feed_author}} + {{ feed_author }} {% endraw %} {{ feed.author }}
  • {% raw %} - {{feed_added}} + {{ feed_added }} {% endraw %} {{ feed.added }}
  • {% raw %} - {{feed_last_exception}} + {{ feed_last_exception }} {% endraw %} {{ feed.last_exception }}
  • {% raw %} - {{feed_last_updated}} + {{ feed_last_updated }} {% endraw %} {{ feed.last_updated }}
  • {% raw %} - {{feed_link}} + {{ feed_link }} {% endraw %} {{ feed.link }}
  • {% raw %} - {{feed_subtitle}} + {{ feed_subtitle }} {% endraw %} {{ feed.subtitle }}
  • {% raw %} - {{feed_title}} + {{ feed_title }} {% endraw %} {{ feed.title }}
  • {% raw %} - {{feed_updated}} + {{ feed_updated }} {% endraw %} {{ feed.updated }}
  • {% raw %} - {{feed_updates_enabled}} + {{ feed_updates_enabled }} {% endraw %} {{ feed.updates_enabled }}
  • {% raw %} - {{feed_url}} + {{ feed_url }} {% endraw %} {{ feed.url }}
  • {% raw %} - {{feed_user_title}} + {{ feed_user_title }} {% endraw %} {{ feed.user_title }}
  • {% raw %} - {{feed_version}} + {{ feed_version }} {% endraw %} {{ feed.version }}
  • @@ -106,14 +106,14 @@
  • {% raw %} - {{entry_added}} + {{ entry_added }} {% endraw %} {{ entry.added }}
  • {% raw %} - {{entry_author}} + {{ entry_author }} {% endraw %} {{ entry.author }}
  • @@ -121,14 +121,14 @@
  • {% raw %} - {{entry_content}} + {{ entry_content }} {% endraw %} {{ entry.content[0].value|discord_markdown }}
  • {% raw %} - {{entry_content_raw}} + {{ entry_content_raw }} {% endraw %} {{ entry.content[0].value }}
  • @@ -136,42 +136,42 @@
  • {% raw %} - {{entry_id}} + {{ entry_id }} {% endraw %} {{ entry.id }}
  • {% raw %} - {{entry_important}} + {{ entry_important }} {% endraw %} {{ entry.important }}
  • {% raw %} - {{entry_link}} + {{ entry_link }} {% endraw %} {{ entry.link }}
  • {% raw %} - {{entry_published}} + {{ entry_published }} {% endraw %} {{ entry.published }}
  • {% raw %} - {{entry_read}} + {{ entry_read }} {% endraw %} {{ entry.read }}
  • {% raw %} - {{entry_read_modified}} + {{ entry_read_modified }} {% endraw %} {{ entry.read_modified }}
  • @@ -179,14 +179,14 @@
  • {% raw %} - {{entry_summary}} + {{ entry_summary }} {% endraw %} {{ entry.summary|discord_markdown }}
  • {% raw %} - {{entry_summary_raw}} + {{ entry_summary_raw }} {% endraw %} {{ entry.summary }}
  • @@ -194,21 +194,21 @@
  • {% raw %} - {{entry_title}} + {{ entry_title }} {% endraw %} {{ entry.title }}
  • {% raw %} - {{entry_text}} + {{ entry_text }} {% endraw %} Same as entry_content if it exists, otherwise entry_summary
  • {% raw %} - {{entry_updated}} + {{ entry_updated }} {% endraw %} {{ entry.updated }}
  • @@ -216,7 +216,7 @@
  • {% raw %} - {{image_1}} + {{ image_1 }} {% endraw %} First image in the entry if it exists
  • @@ -226,7 +226,7 @@
  • {% raw %} - {{feed_title}}\n{{entry_content}} + {{ feed_title }}\n{{ entry_content }} {% endraw %}
  • diff --git a/discord_rss_bot/templates/feed.html b/discord_rss_bot/templates/feed.html index eb3e601..5dd85c0 100644 --- a/discord_rss_bot/templates/feed.html +++ b/discord_rss_bot/templates/feed.html @@ -1,172 +1,84 @@ {% extends "base.html" %} {% block title %} - | {{ feed.title }} +| {{ feed.title }} {% endblock title %} {% block content %} -
    - -

    - {{ feed.title }} ({{ total_entries }} entries) -

    - {% if not feed.updates_enabled %}Disabled{% endif %} - {% if feed.last_exception %} -
    -
    {{ feed.last_exception.type_name }}:
    - {{ feed.last_exception.value_str }} - -
    -
    {{ feed.last_exception.traceback_str }}
    -
    -
    - {% endif %} - -
    - Update -
    - -
    - {% if not feed.updates_enabled %} -
    - -
    - {% else %} -
    - -
    - {% endif %} - {% if not "youtube.com/feeds/videos.xml" in feed.url %} - {% if should_send_embed %} -
    - -
    - {% else %} -
    - -
    - {% endif %} - {% endif %} -
    - - - -
    -
    Feed URL
    -
    - -
    - - -
    -
    -
    - -
    -
    Feed Information
    -
    -
    - Added: {{ feed.added | relative_time }} -
    -
    - Last Updated: {{ feed.last_updated | relative_time }} -
    -
    - Last Retrieved: {{ feed.last_retrieved | relative_time }} -
    -
    - Next Update: {{ feed.update_after | relative_time }} -
    -
    - Updates: {{ 'Enabled' if feed.updates_enabled else 'Disabled' }} -
    -
    -
    - -
    -
    - Update Interval - {% if feed_interval %} - Custom - {% else %} - Using global default - {% endif %} - -
    -
    - Current: - {% if feed_interval %} - {{ feed_interval }} - {% if feed_interval >= 60 %}({{ (feed_interval / 60) | round(1) }} hours){% endif %} - {% else %} - {{ global_interval }} - {% if global_interval >= 60 %}({{ (global_interval / 60) | round(1) }} hours){% endif %} - {% endif %} - minutes -
    - - - -
    - {% if feed_interval %} -
    - - -
    - {% endif %} +
    + +

    + {{ feed.title }} ({{ total_entries }} entries) +

    + {% if not feed.updates_enabled %} + Disabled + {% endif %} + + {% if feed.last_exception %} +
    +
    {{ feed.last_exception.type_name }}:
    + {{ feed.last_exception.value_str }} + +
    +
    {{ feed.last_exception.traceback_str }}
    + {% endif %} + + +
    +
    + +
    + + {% if not feed.updates_enabled %} +
    + +
    + {% else %} +
    + +
    + {% endif %} + + {% if should_send_embed %} +
    + +
    + {% else %} +
    + +
    + {% endif %} +
    + + +
    + {# Rendered HTML content #}
    {{ html|safe }}
    -{% if is_show_more_entries_button_visible %} - - Show more entries - + +{% if show_more_entires_button %} + + Show more entries + {% endif %} + {% endblock content %} diff --git a/discord_rss_bot/templates/index.html b/discord_rss_bot/templates/index.html index 341ec38..78f0729 100644 --- a/discord_rss_bot/templates/index.html +++ b/discord_rss_bot/templates/index.html @@ -1,155 +1,92 @@ {% extends "base.html" %} {% block content %} - -
    + {% endfor %} + {% else %} +

    + Hello there! +
    + You need to add a webhook here to get started. After that, you can + add feeds here. You can find both of these links in the navigation bar + above. +
    +
    + If you have any questions or suggestions, feel free to contact me on tlovinator@gmail.com or TheLovinator#9276 on Discord. +
    +
    + Thanks! +

    + {% endif %} + + {% if broken_feeds %} +
    +
      + Feeds without webhook: + {% for broken_feed in broken_feeds %} + {{ broken_feed.url }} + {% endfor %} +
    +
    + {% endif %} + + {% if feeds_without_attached_webhook %} +
    +
      + Feeds without attached webhook: + {% for feed in feeds_without_attached_webhook %} + {{ feed.url }} + {% endfor %} +
    +
    + {% endif %} + {% endblock content %} diff --git a/discord_rss_bot/templates/nav.html b/discord_rss_bot/templates/nav.html index 7442554..8b9ee37 100644 --- a/discord_rss_bot/templates/nav.html +++ b/discord_rss_bot/templates/nav.html @@ -1,9 +1,6 @@