diff --git a/.env.example b/.env.example deleted file mode 100644 index 2a098da..0000000 --- a/.env.example +++ /dev/null @@ -1,19 +0,0 @@ -# You can optionally store backups of your bot's configuration in a git repository. -# This allows you to track changes by subscribing to the repository or using a RSS feed. -# Local path for the backup git repository (e.g., /data/backup or /home/user/backups/discord-rss-bot) -# When set, the bot will initialize a git repo here and commit state.json after every configuration change -# GIT_BACKUP_PATH= - -# Remote URL for pushing backup commits (e.g., git@github.com:username/private-config.git) -# Optional - only set if you want automatic pushes to a remote repository -# Leave empty to keep git history local only -# GIT_BACKUP_REMOTE= - -# Sentry Configuration (Optional) -# Sentry DSN for error tracking and monitoring -# Leave empty to disable Sentry integration -# SENTRY_DSN= - -# Testing Configuration -# Discord webhook URL used for testing (optional, only needed when running tests) -# TEST_WEBHOOK_URL= diff --git a/.forgejo/workflows/build.yml b/.forgejo/workflows/build.yml deleted file mode 100644 index c2d854d..0000000 --- a/.forgejo/workflows/build.yml +++ /dev/null @@ -1,100 +0,0 @@ ---- -# Required setup for self-hosted runner: -# 1. Install dependencies: -# sudo pacman -S qemu-user-static qemu-user-static-binfmt docker docker-buildx -# 2. Add runner to docker group: -# sudo usermod -aG docker forgejo-runner -# 3. Restart runner service to apply group membership: -# sudo systemctl restart forgejo-runner -# 4. Install uv and ruff for the runner user -# 5. Login to GitHub Container Registry: -# echo "ghp_YOUR_TOKEN_HERE" | sudo -u forgejo-runner docker login ghcr.io -u TheLovinator1 --password-stdin -# 6. Configure sudoers for deployment (sudo EDITOR=nvim visudo): -# forgejo-runner ALL=(discord-rss) NOPASSWD: /usr/bin/git -C /home/discord-rss/discord-rss-bot pull -# forgejo-runner ALL=(discord-rss) NOPASSWD: /usr/bin/uv sync -U --directory /home/discord-rss/discord-rss-bot -# forgejo-runner ALL=(root) NOPASSWD: /bin/systemctl restart discord-rss-bot - -name: Test and build Docker image -on: - push: - branches: - - master - pull_request: - workflow_dispatch: - schedule: - - cron: "0 0 1 * *" - -jobs: - docker: - runs-on: self-hosted - steps: - # Download the latest commit from the master branch - - uses: actions/checkout@v6 - - # Verify local tools are available on the self-hosted runner - - name: Check local toolchain - run: | - python --version - uv --version - ruff --version - docker version - - # Bootstrap a local Buildx builder for multi-arch builds - # (requires qemu-user-static and qemu-user-static-binfmt installed via pacman) - - name: Configure local buildx for multi-arch - run: | - docker buildx inspect local-multiarch-builder >/dev/null 2>&1 || \ - docker buildx create --name local-multiarch-builder --driver docker-container - docker buildx use local-multiarch-builder - docker buildx inspect --bootstrap - - - name: Lint Python code - run: ruff check --exit-non-zero-on-fix --verbose - - - name: Check Python formatting - run: ruff format --check --verbose - - - name: Lint Dockerfile - run: docker build --check . - - - name: Install dependencies - run: uv sync --all-extras --all-groups - - - name: Run tests - run: uv run pytest - - - id: tags - name: Compute image tags - run: | - IMAGE="ghcr.io/thelovinator1/discord-rss-bot" - if [ "${FORGEJO_REF}" = "refs/heads/master" ]; then - echo "tags=${IMAGE}:latest,${IMAGE}:master" >> "$FORGEJO_OUTPUT" - else - SHORT_SHA="$(echo "$FORGEJO_SHA" | cut -c1-12)" - echo "tags=${IMAGE}:sha-${SHORT_SHA}" >> "$FORGEJO_OUTPUT" - fi - - # Build (and optionally push) Docker image - - name: Build and push Docker image - env: - TAGS: ${{ steps.tags.outputs.tags }} - run: | - IFS=',' read -r -a tag_array <<< "$TAGS" - tag_args=() - for tag in "${tag_array[@]}"; do - tag_args+=( -t "$tag" ) - done - - if [ "${{ forge.event_name }}" = "pull_request" ]; then - docker buildx build --platform linux/amd64,linux/arm64 "${tag_args[@]}" --load . - else - docker buildx build --platform linux/amd64,linux/arm64 "${tag_args[@]}" --push . - fi - - # Deploy to production server - - name: Deploy to Server - if: success() && forge.ref == 'refs/heads/master' - run: | - sudo -u discord-rss git -C /home/discord-rss/discord-rss-bot pull - sudo -u discord-rss uv sync -U --directory /home/discord-rss/discord-rss-bot - sudo systemctl restart discord-rss-bot diff --git a/.gitea/workflows/build.yml b/.gitea/workflows/build.yml new file mode 100644 index 0000000..f340331 --- /dev/null +++ b/.gitea/workflows/build.yml @@ -0,0 +1,98 @@ +--- +name: Test and build Docker image +on: + push: + branches: + - master + pull_request: + workflow_dispatch: + schedule: + - cron: "@daily" + +env: + TEST_WEBHOOK_URL: ${{ secrets.TEST_WEBHOOK_URL }} + +jobs: + docker: + runs-on: ubuntu-latest + steps: + # GitHub Container Registry + - uses: https://github.com/docker/login-action@v3 + if: github.event_name != 'pull_request' + with: + registry: ghcr.io + username: thelovinator1 + password: ${{ secrets.PACKAGES_WRITE_GITHUB_TOKEN }} + + # Gitea Container Registry + - uses: https://github.com/docker/login-action@v3 + if: github.event_name != 'pull_request' + with: + registry: git.lovinator.space + username: thelovinator + password: ${{ secrets.PACKAGES_WRITE_GITEA_TOKEN }} + + # Download the latest commit from the master branch + - uses: https://github.com/actions/checkout@v4 + + # Set up QEMU + - id: qemu + uses: https://github.com/docker/setup-qemu-action@v3 + with: + image: tonistiigi/binfmt:master + platforms: linux/amd64,linux/arm64 + cache-image: false + + # Set up Buildx so we can build multi-arch images + - uses: https://github.com/docker/setup-buildx-action@v3 + + # Install the latest version of ruff + - uses: https://github.com/astral-sh/ruff-action@v3 + with: + version: "latest" + + # Lint the Python code using ruff + - run: ruff check --exit-non-zero-on-fix --verbose + + # Check if the Python code needs formatting + - run: ruff format --check --verbose + + # Lint Dockerfile + - run: docker build --check . + + # Set up Python 3.13 + - uses: actions/setup-python@v5 + with: + python-version: 3.13 + + # Install dependencies + - uses: astral-sh/setup-uv@v5 + with: + version: "latest" + - run: uv sync --all-extras --all-groups + + # Run tests + - run: uv run pytest + + # Extract metadata (tags, labels) from Git reference and GitHub events for Docker + - id: meta + uses: https://github.com/docker/metadata-action@v5 + env: + DOCKER_METADATA_ANNOTATIONS_LEVELS: manifest,index + with: + images: | + ghcr.io/thelovinator1/discord-rss-bot + git.lovinator.space/thelovinator/discord-rss-bot + tags: | + type=raw,value=latest,enable=${{ gitea.ref == format('refs/heads/{0}', 'master') }} + type=raw,value=master,enable=${{ gitea.ref == format('refs/heads/{0}', 'master') }} + + # Build and push the Docker image + - uses: https://github.com/docker/build-push-action@v6 + with: + context: . + platforms: linux/amd64,linux/arm64 + push: ${{ gitea.event_name != 'pull_request' }} + labels: ${{ steps.meta.outputs.labels }} + tags: ${{ steps.meta.outputs.tags }} + annotations: ${{ steps.meta.outputs.annotations }} diff --git a/.forgejo/renovate.json b/.github/renovate.json similarity index 82% rename from .forgejo/renovate.json rename to .github/renovate.json index 7884adb..734986c 100644 --- a/.forgejo/renovate.json +++ b/.github/renovate.json @@ -1,8 +1,6 @@ { "$schema": "https://docs.renovatebot.com/renovate-schema.json", - "extends": [ - "config:recommended" - ], + "extends": ["config:recommended"], "automerge": true, "configMigration": true, "dependencyDashboard": false, diff --git a/.gitignore b/.gitignore index 6817461..1ac2c11 100644 --- a/.gitignore +++ b/.gitignore @@ -92,7 +92,7 @@ ipython_config.py # However, in case of collaboration, if having platform-specific dependencies or dependencies # having no cross-platform support, pipenv may install dependencies that don't work, or not # install all needed dependencies. -# Pipfile.lock +Pipfile.lock # UV # Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. @@ -105,12 +105,11 @@ uv.lock # This is especially recommended for binary packages to ensure reproducibility, and is more # commonly ignored for libraries. # https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control -# poetry.lock -# poetry.toml +poetry.lock # pdm # Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. -# pdm.lock +#pdm.lock # pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it # in version control. # https://pdm.fming.dev/latest/usage/project/#working-with-version-control @@ -166,20 +165,7 @@ cython_debug/ # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -# .idea/ - -# Abstra -# Abstra is an AI-powered process automation framework. -# Ignore directories containing user credentials, local state, and settings. -# Learn more at https://abstra.io/docs -.abstra/ - -# Visual Studio Code -# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore -# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore -# and can be added to the global gitignore or merged into this file. However, if you prefer, -# you could uncomment the following to ignore the entire vscode folder -# .vscode/ +#.idea/ # Ruff stuff: .ruff_cache/ @@ -187,13 +173,6 @@ cython_debug/ # PyPI configuration file .pypirc -# Cursor -# Cursor is an AI-powered code editor. `.cursorignore` specifies files/directories to -# exclude from AI features like autocomplete and code analysis. Recommended for sensitive data -# refer to https://docs.cursor.com/context/ignore-files -.cursorignore -.cursorindexingignore - # Database stuff *.sqlite *.sqlite-shm diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 16a9a4f..aca9273 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,13 +1,13 @@ repos: # Automatically add trailing commas to calls and literals. - repo: https://github.com/asottile/add-trailing-comma - rev: v4.0.0 + rev: v3.1.0 hooks: - id: add-trailing-comma # Some out-of-the-box hooks for pre-commit. - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v6.0.0 + rev: v5.0.0 hooks: - id: check-added-large-files - id: check-ast @@ -31,14 +31,14 @@ repos: # Run Pyupgrade on all Python files. This will upgrade the code to Python 3.12. - repo: https://github.com/asottile/pyupgrade - rev: v3.21.2 + rev: v3.19.1 hooks: - id: pyupgrade args: ["--py312-plus"] # An extremely fast Python linter and formatter. - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.15.5 + rev: v0.11.8 hooks: - id: ruff-format - id: ruff @@ -46,6 +46,6 @@ repos: # Static checker for GitHub Actions workflow files. - repo: https://github.com/rhysd/actionlint - rev: v1.7.11 + rev: v1.7.7 hooks: - id: actionlint diff --git a/.vscode/launch.json b/.vscode/launch.json index 266d7f2..bb222ab 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -12,7 +12,7 @@ "--host", "0.0.0.0", "--port", - "3000", + "5000", ], "jinja": true, "justMyCode": true diff --git a/.vscode/settings.json b/.vscode/settings.json index 8bd0ea9..85832f8 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,6 +1,5 @@ { "cSpell.words": [ - "autoexport", "botuser", "Genshins", "healthcheck", @@ -10,10 +9,7 @@ "markdownified", "markdownify", "pipx", - "pyproject", - "thead", - "thelovinator", - "uvicorn" + "thead" ], "python.analysis.typeCheckingMode": "basic" } diff --git a/Dockerfile b/Dockerfile index f27eed9..0905265 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,10 +1,11 @@ -FROM python:3.14-slim +FROM python:3.13-slim COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/ RUN useradd --create-home botuser && \ mkdir -p /home/botuser/discord-rss-bot/ /home/botuser/.local/share/discord_rss_bot/ && \ chown -R botuser:botuser /home/botuser/ USER botuser WORKDIR /home/botuser/discord-rss-bot +COPY --chown=botuser:botuser requirements.txt /home/botuser/discord-rss-bot/ RUN --mount=type=cache,target=/root/.cache/uv \ --mount=type=bind,source=pyproject.toml,target=pyproject.toml \ uv sync --no-install-project diff --git a/README.md b/README.md index 09b6bbc..8232dea 100644 --- a/README.md +++ b/README.md @@ -2,10 +2,6 @@ Subscribe to RSS feeds and get updates to a Discord webhook. -Email: [tlovinator@gmail.com](mailto:tlovinator@gmail.com) - -Discord: TheLovinator#9276 - ## Features - Subscribe to RSS feeds and get updates to a Discord webhook. @@ -14,7 +10,6 @@ Discord: TheLovinator#9276 - Choose between Discord embed or plain text. - Regex filters for RSS feeds. - Blacklist/whitelist words in the title/description/author/etc. -- Set different update frequencies for each feed or use a global default. - Gets extra information from APIs if available, currently for: - [https://feeds.c3kay.de/](https://feeds.c3kay.de/) - Genshin Impact News @@ -30,7 +25,9 @@ or [install directly on your computer](#install-directly-on-your-computer). ### Docker - Open a terminal in the repository folder. - - Shift + right-click in the folder and `Open PowerShell window here` + - Windows 10: Shift + right-click in the folder and select `Open PowerShell window here` + - Windows 11: Shift + right-click in the folder and Show more options + and `Open PowerShell window here` - Run the Docker Compose file: - `docker-compose up` - You can stop the bot with Ctrl + c. @@ -44,68 +41,34 @@ or [install directly on your computer](#install-directly-on-your-computer). ### Install directly on your computer -- Install the latest of [uv](https://docs.astral.sh/uv/#installation): - - `powershell -ExecutionPolicy ByPass -c "irm | iex"` +This is not recommended if you don't have an init system (e.g., systemd) + +- Install the latest version of needed software: + - [Python](https://www.python.org/) + - You should use the latest version. + - You want to add Python to your PATH. + - Windows: Find `App execution aliases` and disable python.exe and python3.exe + - [Poetry](https://python-poetry.org/docs/master/#installation) + - Windows: You have to add `%appdata%\Python\Scripts` to your PATH for Poetry to work. - Download the project from GitHub with Git or download the [ZIP](https://github.com/TheLovinator1/discord-rss-bot/archive/refs/heads/master.zip). - If you want to update the bot, you can run `git pull` in the project folder or download the ZIP again. - Open a terminal in the repository folder. - - Shift + right-click in the folder and `Open PowerShell window here` + - Windows 10: Shift + right-click in the folder and select `Open PowerShell window here` + - Windows 11: Shift + right-click in the folder and Show more options + and `Open PowerShell window here` +- Install requirements: + - Type `poetry install` into the PowerShell window. Make sure you are + in the repository folder where the [pyproject.toml](pyproject.toml) file is located. + - (You may have to restart your terminal if it can't find the `poetry` command. Also double check it is in + your PATH.) - Start the bot: - - Type `uv run discord_rss_bot/main.py` into the PowerShell window. + - Type `poetry run python discord_rss_bot/main.py` into the PowerShell window. - You can stop the bot with Ctrl + c. -- Bot is now running on port 3000. -- You should run this bot behind a reverse proxy like [Caddy](https://caddyserver.com/) - or [Nginx](https://www.nginx.com/) if you want to access it from the internet. Remember to add authentication. -- You can access the web interface at `http://localhost:3000/`. -- To run automatically on boot: - - Use [Windows Task Scheduler](https://en.wikipedia.org/wiki/Windows_Task_Scheduler). - - Or add a shortcut to `%userprofile%\AppData\Roaming\Microsoft\Windows\Start Menu\Programs\Startup`. +Note: You will need to run `poetry install` again if [poetry.lock](poetry.lock) has been modified. -## Git Backup (State Version Control) +## Contact -The bot can commit every configuration change (adding/removing feeds, webhook -changes, blacklist/whitelist updates) to a separate private Git repository so -you get a full, auditable history of state changes — similar to `etckeeper`. - -### Configuration - -Set the following environment variables (e.g. in `docker-compose.yml` or a -`.env` file): - -| Variable | Required | Description | -| ------------------- | -------- | ----------------------------------------------------------------------------------------------------------------------------------- | -| `GIT_BACKUP_PATH` | Yes | Local path where the backup git repository is stored. The bot will initialise it automatically if it does not yet exist. | -| `GIT_BACKUP_REMOTE` | No | Remote URL to push to after each commit (e.g. `git@github.com:you/private-config.git`). Leave unset to keep the history local only. | - -### What is backed up - -After every relevant change a `state.json` file is written and committed. -The file contains: - -- All feed URLs together with their webhook URL, custom message, embed - settings, and any blacklist/whitelist filters. -- The global list of Discord webhooks. - -### Docker example - -```yaml -services: - discord-rss-bot: - image: ghcr.io/thelovinator1/discord-rss-bot:latest - volumes: - - ./data:/data - environment: - - GIT_BACKUP_PATH=/data/backup - - GIT_BACKUP_REMOTE=git@github.com:you/private-config.git -``` - -For SSH-based remotes mount your SSH key into the container and make sure the -host key is trusted, e.g.: - -```yaml - volumes: - - ./data:/data - - ~/.ssh:/root/.ssh:ro -``` +Email: [mailto:tlovinator@gmail.com](tlovinator@gmail.com) +Discord: TheLovinator#9276 diff --git a/discord_rss_bot/custom_filters.py b/discord_rss_bot/custom_filters.py index fd9461c..99fe77d 100644 --- a/discord_rss_bot/custom_filters.py +++ b/discord_rss_bot/custom_filters.py @@ -4,14 +4,15 @@ import urllib.parse from functools import lru_cache from typing import TYPE_CHECKING -from discord_rss_bot.filter.blacklist import entry_should_be_skipped -from discord_rss_bot.filter.blacklist import feed_has_blacklist_tags -from discord_rss_bot.filter.whitelist import has_white_tags -from discord_rss_bot.filter.whitelist import should_be_sent +from discord_rss_bot.filter.blacklist import entry_should_be_skipped, feed_has_blacklist_tags +from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent +from discord_rss_bot.settings import get_reader if TYPE_CHECKING: - from reader import Entry - from reader import Reader + from reader import Entry, Reader + +# Our reader +reader: Reader = get_reader() @lru_cache @@ -30,12 +31,11 @@ def encode_url(url_to_quote: str) -> str: return urllib.parse.quote(string=url_to_quote) if url_to_quote else "" -def entry_is_whitelisted(entry_to_check: Entry, reader: Reader) -> bool: +def entry_is_whitelisted(entry_to_check: Entry) -> bool: """Check if the entry is whitelisted. Args: entry_to_check: The feed to check. - reader: Custom Reader instance. Returns: bool: True if the feed is whitelisted, False otherwise. @@ -44,12 +44,11 @@ def entry_is_whitelisted(entry_to_check: Entry, reader: Reader) -> bool: return bool(has_white_tags(reader, entry_to_check.feed) and should_be_sent(reader, entry_to_check)) -def entry_is_blacklisted(entry_to_check: Entry, reader: Reader) -> bool: +def entry_is_blacklisted(entry_to_check: Entry) -> bool: """Check if the entry is blacklisted. Args: entry_to_check: The feed to check. - reader: Custom Reader instance. Returns: bool: True if the feed is blacklisted, False otherwise. diff --git a/discord_rss_bot/custom_message.py b/discord_rss_bot/custom_message.py index 1626e39..d3ca74d 100644 --- a/discord_rss_bot/custom_message.py +++ b/discord_rss_bot/custom_message.py @@ -1,27 +1,18 @@ from __future__ import annotations -import html import json import logging -import re from dataclasses import dataclass -from typing import TYPE_CHECKING -from bs4 import BeautifulSoup -from bs4 import Tag +from bs4 import BeautifulSoup, Tag from markdownify import markdownify +from reader import Entry, Feed, Reader, TagNotFoundError from discord_rss_bot.is_url_valid import is_url_valid - -if TYPE_CHECKING: - from reader import Entry - from reader import Feed - from reader import Reader +from discord_rss_bot.settings import get_reader logger: logging.Logger = logging.getLogger(__name__) -DISCORD_TIMESTAMP_TAG_RE: re.Pattern[str] = re.compile(r"") - @dataclass(slots=True) class CustomEmbed: @@ -55,80 +46,18 @@ def try_to_replace(custom_message: str, template: str, replace_with: str) -> str return custom_message -def _preserve_discord_timestamp_tags(text: str) -> tuple[str, dict[str, str]]: - """Replace Discord timestamp tags with placeholders before markdown conversion. - - Args: - text: The text to replace tags in. - - Returns: - The text with Discord timestamp tags replaced by placeholders and a mapping of placeholders to original tags. - """ - replacements: dict[str, str] = {} - - def replace_match(match: re.Match[str]) -> str: - placeholder: str = f"DISCORDTIMESTAMPPLACEHOLDER{len(replacements)}" - replacements[placeholder] = match.group(0) - return placeholder - - return DISCORD_TIMESTAMP_TAG_RE.sub(replace_match, text), replacements - - -def _restore_discord_timestamp_tags(text: str, replacements: dict[str, str]) -> str: - """Restore preserved Discord timestamp tags after markdown conversion. - - Args: - text: The text to restore tags in. - replacements: A mapping of placeholders to original Discord timestamp tags. - - Returns: - The text with placeholders replaced by the original Discord timestamp tags. - """ - for placeholder, original_value in replacements.items(): - text = text.replace(placeholder, original_value) - return text - - -def format_entry_html_for_discord(text: str) -> str: - """Convert entry HTML to Discord-friendly markdown while preserving Discord timestamp tags. - - Args: - text: The HTML text to format. - - Returns: - The formatted text with Discord timestamp tags preserved. - """ - if not text: - return "" - - unescaped_text: str = html.unescape(text) - protected_text, replacements = _preserve_discord_timestamp_tags(unescaped_text) - formatted_text: str = markdownify( - html=protected_text, - strip=["img", "table", "td", "tr", "tbody", "thead"], - escape_misc=False, - heading_style="ATX", - ) - - if "[https://" in formatted_text or "[https://www." in formatted_text: - formatted_text = formatted_text.replace("[https://", "[") - formatted_text = formatted_text.replace("[https://www.", "[") - - return _restore_discord_timestamp_tags(formatted_text, replacements) - - -def replace_tags_in_text_message(entry: Entry, reader: Reader) -> str: +def replace_tags_in_text_message(entry: Entry) -> str: """Replace tags in custom_message. Args: entry: The entry to get the tags from. - reader: Custom Reader instance. Returns: Returns the custom_message with the tags replaced. """ feed: Feed = entry.feed - custom_message: str = get_custom_message(feed=feed, reader=reader) + custom_reader: Reader = get_reader() + custom_message: str = get_custom_message(feed=feed, custom_reader=custom_reader) content = "" if entry.content: @@ -139,8 +68,16 @@ def replace_tags_in_text_message(entry: Entry, reader: Reader) -> str: first_image: str = get_first_image(summary, content) - summary = format_entry_html_for_discord(summary) - content = format_entry_html_for_discord(content) + summary = markdownify(html=summary, strip=["img", "table", "td", "tr", "tbody", "thead"], escape_misc=False) + content = markdownify(html=content, strip=["img", "table", "td", "tr", "tbody", "thead"], escape_misc=False) + + if "[https://" in content or "[https://www." in content: + content = content.replace("[https://", "[") + content = content.replace("[https://www.", "[") + + if "[https://" in summary or "[https://www." in summary: + summary = summary.replace("[https://", "[") + summary = summary.replace("[https://www.", "[") feed_added: str = feed.added.strftime("%Y-%m-%d %H:%M:%S") if feed.added else "Never" feed_last_exception: str = feed.last_exception.value_str if feed.last_exception else "" @@ -230,18 +167,18 @@ def get_first_image(summary: str | None, content: str | None) -> str: return "" -def replace_tags_in_embed(feed: Feed, entry: Entry, reader: Reader) -> CustomEmbed: +def replace_tags_in_embed(feed: Feed, entry: Entry) -> CustomEmbed: """Replace tags in embed. Args: feed: The feed to get the tags from. entry: The entry to get the tags from. - reader: Custom Reader instance. Returns: Returns the embed with the tags replaced. """ - embed: CustomEmbed = get_embed(feed=feed, reader=reader) + custom_reader: Reader = get_reader() + embed: CustomEmbed = get_embed(feed=feed, custom_reader=custom_reader) content = "" if entry.content: @@ -252,8 +189,16 @@ def replace_tags_in_embed(feed: Feed, entry: Entry, reader: Reader) -> CustomEmb first_image: str = get_first_image(summary, content) - summary = format_entry_html_for_discord(summary) - content = format_entry_html_for_discord(content) + summary = markdownify(html=summary, strip=["img", "table", "td", "tr", "tbody", "thead"], escape_misc=False) + content = markdownify(html=content, strip=["img", "table", "td", "tr", "tbody", "thead"], escape_misc=False) + + if "[https://" in content or "[https://www." in content: + content = content.replace("[https://", "[") + content = content.replace("[https://www.", "[") + + if "[https://" in summary or "[https://www." in summary: + summary = summary.replace("[https://", "[") + summary = summary.replace("[https://www.", "[") feed_added: str = feed.added.strftime("%Y-%m-%d %H:%M:%S") if feed.added else "Never" feed_last_updated: str = feed.last_updated.strftime("%Y-%m-%d %H:%M:%S") if feed.last_updated else "Never" @@ -332,29 +277,31 @@ def _replace_embed_tags(embed: CustomEmbed, template: str, replace_with: str) -> embed.footer_icon_url = try_to_replace(embed.footer_icon_url, template, replace_with) -def get_custom_message(reader: Reader, feed: Feed) -> str: +def get_custom_message(custom_reader: Reader, feed: Feed) -> str: """Get custom_message tag from feed. Args: - reader: What Reader to use. + custom_reader: What Reader to use. feed: The feed to get the tag from. Returns: Returns the contents from the custom_message tag. """ try: - custom_message: str = str(reader.get_tag(feed, "custom_message", "")) + custom_message: str = str(custom_reader.get_tag(feed, "custom_message")) + except TagNotFoundError: + custom_message = "" except ValueError: custom_message = "" return custom_message -def save_embed(reader: Reader, feed: Feed, embed: CustomEmbed) -> None: +def save_embed(custom_reader: Reader, feed: Feed, embed: CustomEmbed) -> None: """Set embed tag in feed. Args: - reader: What Reader to use. + custom_reader: What Reader to use. feed: The feed to set the tag in. embed: The embed to set. """ @@ -370,20 +317,20 @@ def save_embed(reader: Reader, feed: Feed, embed: CustomEmbed) -> None: "footer_text": embed.footer_text, "footer_icon_url": embed.footer_icon_url, } - reader.set_tag(feed, "embed", json.dumps(embed_dict)) # pyright: ignore[reportArgumentType] + custom_reader.set_tag(feed, "embed", json.dumps(embed_dict)) # pyright: ignore[reportArgumentType] -def get_embed(reader: Reader, feed: Feed) -> CustomEmbed: +def get_embed(custom_reader: Reader, feed: Feed) -> CustomEmbed: """Get embed tag from feed. Args: - reader: What Reader to use. + custom_reader: What Reader to use. feed: The feed to get the tag from. Returns: Returns the contents from the embed tag. """ - embed = reader.get_tag(feed, "embed", "") + embed = custom_reader.get_tag(feed, "embed", "") if embed: if not isinstance(embed, str): diff --git a/discord_rss_bot/feeds.py b/discord_rss_bot/feeds.py index 225e7ff..a8388a9 100644 --- a/discord_rss_bot/feeds.py +++ b/discord_rss_bot/feeds.py @@ -1,45 +1,35 @@ from __future__ import annotations import datetime -import json import logging import os import pprint import re -from typing import TYPE_CHECKING -from typing import Any -from urllib.parse import ParseResult -from urllib.parse import urlparse +from typing import TYPE_CHECKING, Any +from urllib.parse import ParseResult, urlparse import tldextract -from discord_webhook import DiscordEmbed -from discord_webhook import DiscordWebhook +from discord_webhook import DiscordEmbed, DiscordWebhook from fastapi import HTTPException -from markdownify import markdownify -from reader import Entry -from reader import EntryNotFoundError -from reader import Feed -from reader import FeedExistsError -from reader import FeedNotFoundError -from reader import Reader -from reader import ReaderError -from reader import StorageError +from reader import Entry, EntryNotFoundError, Feed, FeedExistsError, Reader, ReaderError, StorageError, TagNotFoundError -from discord_rss_bot.custom_message import CustomEmbed -from discord_rss_bot.custom_message import get_custom_message -from discord_rss_bot.custom_message import replace_tags_in_embed -from discord_rss_bot.custom_message import replace_tags_in_text_message +from discord_rss_bot.custom_message import ( + CustomEmbed, + get_custom_message, + replace_tags_in_embed, + replace_tags_in_text_message, +) from discord_rss_bot.filter.blacklist import entry_should_be_skipped -from discord_rss_bot.filter.whitelist import has_white_tags -from discord_rss_bot.filter.whitelist import should_be_sent -from discord_rss_bot.hoyolab_api import create_hoyolab_webhook -from discord_rss_bot.hoyolab_api import extract_post_id_from_hoyolab_url -from discord_rss_bot.hoyolab_api import fetch_hoyolab_post -from discord_rss_bot.hoyolab_api import is_c3kay_feed +from discord_rss_bot.filter.whitelist import has_white_tags, should_be_sent +from discord_rss_bot.hoyolab_api import ( + create_hoyolab_webhook, + extract_post_id_from_hoyolab_url, + fetch_hoyolab_post, + is_c3kay_feed, +) from discord_rss_bot.is_url_valid import is_url_valid -from discord_rss_bot.settings import default_custom_embed -from discord_rss_bot.settings import default_custom_message -from discord_rss_bot.settings import get_reader +from discord_rss_bot.missing_tags import add_missing_tags +from discord_rss_bot.settings import default_custom_message, get_reader if TYPE_CHECKING: from collections.abc import Iterable @@ -68,7 +58,7 @@ def extract_domain(url: str) -> str: # noqa: PLR0911 return "YouTube" # Special handling for Reddit feeds - if "reddit.com" in url and ".rss" in url: + if "reddit.com" in url or (".rss" in url and "r/" in url): return "Reddit" # Parse the URL and extract the domain @@ -98,24 +88,24 @@ def extract_domain(url: str) -> str: # noqa: PLR0911 return "Other" -def send_entry_to_discord(entry: Entry, reader: Reader) -> str | None: # noqa: C901 +def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> str | None: # noqa: PLR0912 """Send a single entry to Discord. Args: entry: The entry to send to Discord. - reader: The reader to use. + custom_reader: The reader to use. If None, the default reader will be used. Returns: str | None: The error message if there was an error, otherwise None. """ + # Get the default reader if we didn't get a custom one. + reader: Reader = get_reader() if custom_reader is None else custom_reader + # Get the webhook URL for the entry. webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook", "")) if not webhook_url: return "No webhook URL found." - # If https://discord.com/quests/ is in the URL, send a separate message with the URL. - send_discord_quest_notification(entry, webhook_url, reader=reader) - # Check if this is a c3kay feed if is_c3kay_feed(entry.feed.url): entry_link: str | None = entry.link @@ -125,7 +115,7 @@ def send_entry_to_discord(entry: Entry, reader: Reader) -> str | None: # noqa: post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id) if post_data: webhook = create_hoyolab_webhook(webhook_url, entry, post_data) - execute_webhook(webhook, entry, reader=reader) + execute_webhook(webhook, entry) return None logger.warning( "Failed to create Hoyolab webhook for feed %s, falling back to regular processing", @@ -139,14 +129,17 @@ def send_entry_to_discord(entry: Entry, reader: Reader) -> str | None: # noqa: # Try to get the custom message for the feed. If the user has none, we will use the default message. # This has to be a string for some reason so don't change it to "not custom_message.get_custom_message()" if get_custom_message(reader, entry.feed) != "": # noqa: PLC1901 - webhook_message: str = replace_tags_in_text_message(entry=entry, reader=reader) + webhook_message: str = replace_tags_in_text_message(entry=entry) if not webhook_message: webhook_message = "No message found." # Create the webhook. try: - should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed", True)) + should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed")) + except TagNotFoundError: + logger.exception("No should_send_embed tag found for feed: %s", entry.feed.url) + should_send_embed = True except StorageError: logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url) should_send_embed = True @@ -156,52 +149,14 @@ def send_entry_to_discord(entry: Entry, reader: Reader) -> str | None: # noqa: should_send_embed = False if should_send_embed: - webhook = create_embed_webhook(webhook_url, entry, reader=reader) + webhook = create_embed_webhook(webhook_url, entry) else: webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True) - execute_webhook(webhook, entry, reader=reader) + execute_webhook(webhook, entry) return None -def send_discord_quest_notification(entry: Entry, webhook_url: str, reader: Reader) -> None: - """Send a separate message to Discord if the entry is a quest notification.""" - quest_regex: re.Pattern[str] = re.compile(r"https://discord\.com/quests/\d+") - - def send_notification(quest_url: str) -> None: - """Helper function to send quest notification to Discord.""" - logger.info("Sending quest notification to Discord: %s", quest_url) - webhook = DiscordWebhook( - url=webhook_url, - content=quest_url, - rate_limit_retry=True, - ) - execute_webhook(webhook, entry, reader=reader) - - # Iterate through the content of the entry - for content in entry.content: - if content.type == "text" and content.value: - match = quest_regex.search(content.value) - if match: - send_notification(match.group(0)) - return - - elif content.type == "text/html" and content.value: - # Convert HTML to text and check for quest links - text_value = markdownify( - html=content.value, - strip=["img", "table", "td", "tr", "tbody", "thead"], - escape_misc=False, - heading_style="ATX", - ) - match: re.Match[str] | None = quest_regex.search(text_value) - if match: - send_notification(match.group(0)) - return - - logger.info("No quest notification found in entry: %s", entry.id) - - def set_description(custom_embed: CustomEmbed, discord_embed: DiscordEmbed) -> None: """Set the description of the embed. @@ -234,17 +189,12 @@ def set_title(custom_embed: CustomEmbed, discord_embed: DiscordEmbed) -> None: discord_embed.set_title(embed_title) if embed_title else None -def create_embed_webhook( # noqa: C901 - webhook_url: str, - entry: Entry, - reader: Reader, -) -> DiscordWebhook: +def create_embed_webhook(webhook_url: str, entry: Entry) -> DiscordWebhook: """Create a webhook with an embed. Args: webhook_url (str): The webhook URL. entry (Entry): The entry to send to Discord. - reader (Reader): The Reader instance to use for getting embed data. Returns: DiscordWebhook: The webhook with the embed. @@ -253,7 +203,7 @@ def create_embed_webhook( # noqa: C901 feed: Feed = entry.feed # Get the embed data from the database. - custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry, reader=reader) + custom_embed: CustomEmbed = replace_tags_in_embed(feed=feed, entry=entry) discord_embed: DiscordEmbed = DiscordEmbed() @@ -315,14 +265,13 @@ def get_webhook_url(reader: Reader, entry: Entry) -> str: str: The webhook URL. """ try: - webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook", "")) + webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook")) + except TagNotFoundError: + logger.exception("No webhook URL found for feed: %s", entry.feed.url) + return "" except StorageError: logger.exception("Storage error getting webhook URL for feed: %s", entry.feed.url) return "" - - if not webhook_url: - logger.error("No webhook URL found for feed: %s", entry.feed.url) - return "" return webhook_url @@ -341,53 +290,52 @@ def set_entry_as_read(reader: Reader, entry: Entry) -> None: logger.exception("Error setting entry to read: %s", entry.id) -def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: # noqa: C901, PLR0912 +def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = None, *, do_once: bool = False) -> None: # noqa: PLR0912 """Send entries to Discord. If response was not ok, we will log the error and mark the entry as unread, so it will be sent again next time. Args: - reader: If we should use a custom reader instead of the default one. + custom_reader: If we should use a custom reader instead of the default one. feed: The feed to send to Discord. do_once: If we should only send one entry. This is used in the test. """ - logger.info("Starting to send entries to Discord.") # Get the default reader if we didn't get a custom one. - effective_reader: Reader = get_reader() if reader is None else reader + reader: Reader = get_reader() if custom_reader is None else custom_reader # Check for new entries for every feed. - effective_reader.update_feeds( + reader.update_feeds( scheduled=True, workers=os.cpu_count() or 1, ) # Loop through the unread entries. - entries: Iterable[Entry] = effective_reader.get_entries(feed=feed, read=False) + entries: Iterable[Entry] = reader.get_entries(feed=feed, read=False) for entry in entries: - set_entry_as_read(effective_reader, entry) + set_entry_as_read(reader, entry) if entry.added < datetime.datetime.now(tz=entry.added.tzinfo) - datetime.timedelta(days=1): logger.info("Entry is older than 24 hours: %s from %s", entry.id, entry.feed.url) continue - webhook_url: str = get_webhook_url(effective_reader, entry) + webhook_url: str = get_webhook_url(reader, entry) if not webhook_url: logger.info("No webhook URL found for feed: %s", entry.feed.url) continue - should_send_embed: bool = should_send_embed_check(effective_reader, entry) + should_send_embed: bool = should_send_embed_check(reader, entry) # Youtube feeds only need to send the link if is_youtube_feed(entry.feed.url): should_send_embed = False if should_send_embed: - webhook = create_embed_webhook(webhook_url, entry, reader=effective_reader) + webhook = create_embed_webhook(webhook_url, entry) else: # If the user has set the custom message to an empty string, we will use the default message, otherwise we # will use the custom message. - if get_custom_message(effective_reader, entry.feed) != "": # noqa: PLC1901 - webhook_message = replace_tags_in_text_message(entry, reader=effective_reader) + if get_custom_message(reader, entry.feed) != "": # noqa: PLC1901 + webhook_message = replace_tags_in_text_message(entry) else: webhook_message: str = str(default_custom_message) @@ -397,12 +345,12 @@ def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, d webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True) # Check if the entry is blacklisted, and if it is, we will skip it. - if entry_should_be_skipped(effective_reader, entry): + if entry_should_be_skipped(reader, entry): logger.info("Entry was blacklisted: %s", entry.id) continue # Check if the feed has a whitelist, and if it does, check if the entry is whitelisted. - if has_white_tags(effective_reader, entry.feed) and not should_be_sent(effective_reader, entry): + if has_white_tags(reader, entry.feed) and not should_be_sent(reader, entry): logger.info("Entry was not whitelisted: %s", entry.id) continue @@ -415,7 +363,7 @@ def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, d post_data: dict[str, Any] | None = fetch_hoyolab_post(post_id) if post_data: webhook = create_hoyolab_webhook(webhook_url, entry, post_data) - execute_webhook(webhook, entry, reader=effective_reader) + execute_webhook(webhook, entry) return logger.warning( "Failed to create Hoyolab webhook for feed %s, falling back to regular processing", @@ -425,7 +373,7 @@ def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, d logger.warning("No entry link found for feed %s, falling back to regular processing", entry.feed.url) # Send the entry to Discord as it is not blacklisted or feed has a whitelist. - execute_webhook(webhook, entry, reader=effective_reader) + execute_webhook(webhook, entry) # If we only want to send one entry, we will break the loop. This is used when testing this function. if do_once: @@ -433,27 +381,14 @@ def send_to_discord(reader: Reader | None = None, feed: Feed | None = None, *, d break -def execute_webhook(webhook: DiscordWebhook, entry: Entry, reader: Reader) -> None: +def execute_webhook(webhook: DiscordWebhook, entry: Entry) -> None: """Execute the webhook. Args: webhook (DiscordWebhook): The webhook to execute. entry (Entry): The entry to send to Discord. - reader (Reader): The Reader instance to use for checking feed status. """ - # If the feed has been paused or deleted, we will not send the entry to Discord. - entry_feed: Feed = entry.feed - if entry_feed.updates_enabled is False: - logger.warning("Feed is paused, not sending entry to Discord: %s", entry_feed.url) - return - - try: - reader.get_feed(entry_feed.url) - except FeedNotFoundError: - logger.warning("Feed not found in reader, not sending entry to Discord: %s", entry_feed.url) - return - response: Response = webhook.execute() if response.status_code not in {200, 204}: msg: str = f"Error sending entry to Discord: {response.text}\n{pprint.pformat(webhook.json)}" @@ -492,7 +427,10 @@ def should_send_embed_check(reader: Reader, entry: Entry) -> bool: return False try: - should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed", True)) + should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed")) + except TagNotFoundError: + logger.exception("No should_send_embed tag found for feed: %s", entry.feed.url) + should_send_embed = True except ReaderError: logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url) should_send_embed = True @@ -516,7 +454,7 @@ def truncate_webhook_message(webhook_message: str) -> str: return webhook_message -def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None: # noqa: C901 +def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None: """Add a new feed, update it and mark every entry as read. Args: @@ -547,7 +485,9 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None: reader.add_feed(clean_feed_url) except FeedExistsError: # Add the webhook to an already added feed if it doesn't have a webhook instead of trying to create a new. - if not reader.get_tag(clean_feed_url, "webhook", ""): + try: + reader.get_tag(clean_feed_url, "webhook") + except TagNotFoundError: reader.set_tag(clean_feed_url, "webhook", webhook_url) # pyright: ignore[reportArgumentType] except ReaderError as e: raise HTTPException(status_code=404, detail=f"Error adding feed: {e}") from e @@ -572,8 +512,7 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None: # This is the default message that will be sent to Discord. reader.set_tag(clean_feed_url, "custom_message", default_custom_message) # pyright: ignore[reportArgumentType] - # Set the default embed tag when creating the feed - reader.set_tag(clean_feed_url, "embed", json.dumps(default_custom_embed)) - # Update the full-text search index so our new feed is searchable. reader.update_search() + + add_missing_tags(reader) diff --git a/discord_rss_bot/filter/blacklist.py b/discord_rss_bot/filter/blacklist.py index 8260993..87b4913 100644 --- a/discord_rss_bot/filter/blacklist.py +++ b/discord_rss_bot/filter/blacklist.py @@ -2,16 +2,13 @@ from __future__ import annotations from typing import TYPE_CHECKING -from discord_rss_bot.filter.utils import is_regex_match -from discord_rss_bot.filter.utils import is_word_in_text +from discord_rss_bot.filter.utils import is_regex_match, is_word_in_text if TYPE_CHECKING: - from reader import Entry - from reader import Feed - from reader import Reader + from reader import Entry, Feed, Reader -def feed_has_blacklist_tags(reader: Reader, feed: Feed) -> bool: +def feed_has_blacklist_tags(custom_reader: Reader, feed: Feed) -> bool: """Return True if the feed has blacklist tags. The following tags are checked: @@ -25,21 +22,21 @@ def feed_has_blacklist_tags(reader: Reader, feed: Feed) -> bool: - regex_blacklist_title Args: - reader: The reader. + custom_reader: The reader. feed: The feed to check. Returns: bool: If the feed has any of the tags. """ - blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", "")).strip() - blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", "")).strip() - blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", "")).strip() - blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", "")).strip() + blacklist_author: str = str(custom_reader.get_tag(feed, "blacklist_author", "")).strip() + blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", "")).strip() + blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", "")).strip() + blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", "")).strip() - regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", "")).strip() - regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", "")).strip() - regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", "")).strip() - regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", "")).strip() + regex_blacklist_author: str = str(custom_reader.get_tag(feed, "regex_blacklist_author", "")).strip() + regex_blacklist_content: str = str(custom_reader.get_tag(feed, "regex_blacklist_content", "")).strip() + regex_blacklist_summary: str = str(custom_reader.get_tag(feed, "regex_blacklist_summary", "")).strip() + regex_blacklist_title: str = str(custom_reader.get_tag(feed, "regex_blacklist_title", "")).strip() return bool( blacklist_title @@ -53,11 +50,11 @@ def feed_has_blacklist_tags(reader: Reader, feed: Feed) -> bool: ) -def entry_should_be_skipped(reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 +def entry_should_be_skipped(custom_reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 """Return True if the entry is in the blacklist. Args: - reader: The reader. + custom_reader: The reader. entry: The entry to check. Returns: @@ -65,15 +62,15 @@ def entry_should_be_skipped(reader: Reader, entry: Entry) -> bool: # noqa: PLR0 """ feed = entry.feed - blacklist_title: str = str(reader.get_tag(feed, "blacklist_title", "")).strip() - blacklist_summary: str = str(reader.get_tag(feed, "blacklist_summary", "")).strip() - blacklist_content: str = str(reader.get_tag(feed, "blacklist_content", "")).strip() - blacklist_author: str = str(reader.get_tag(feed, "blacklist_author", "")).strip() + blacklist_title: str = str(custom_reader.get_tag(feed, "blacklist_title", "")).strip() + blacklist_summary: str = str(custom_reader.get_tag(feed, "blacklist_summary", "")).strip() + blacklist_content: str = str(custom_reader.get_tag(feed, "blacklist_content", "")).strip() + blacklist_author: str = str(custom_reader.get_tag(feed, "blacklist_author", "")).strip() - regex_blacklist_title: str = str(reader.get_tag(feed, "regex_blacklist_title", "")).strip() - regex_blacklist_summary: str = str(reader.get_tag(feed, "regex_blacklist_summary", "")).strip() - regex_blacklist_content: str = str(reader.get_tag(feed, "regex_blacklist_content", "")).strip() - regex_blacklist_author: str = str(reader.get_tag(feed, "regex_blacklist_author", "")).strip() + regex_blacklist_title: str = str(custom_reader.get_tag(feed, "regex_blacklist_title", "")).strip() + regex_blacklist_summary: str = str(custom_reader.get_tag(feed, "regex_blacklist_summary", "")).strip() + regex_blacklist_content: str = str(custom_reader.get_tag(feed, "regex_blacklist_content", "")).strip() + regex_blacklist_author: str = str(custom_reader.get_tag(feed, "regex_blacklist_author", "")).strip() # TODO(TheLovinator): Also add support for entry_text and more. # Check regular blacklist diff --git a/discord_rss_bot/filter/whitelist.py b/discord_rss_bot/filter/whitelist.py index bb5303d..b4b5c23 100644 --- a/discord_rss_bot/filter/whitelist.py +++ b/discord_rss_bot/filter/whitelist.py @@ -2,16 +2,13 @@ from __future__ import annotations from typing import TYPE_CHECKING -from discord_rss_bot.filter.utils import is_regex_match -from discord_rss_bot.filter.utils import is_word_in_text +from discord_rss_bot.filter.utils import is_regex_match, is_word_in_text if TYPE_CHECKING: - from reader import Entry - from reader import Feed - from reader import Reader + from reader import Entry, Feed, Reader -def has_white_tags(reader: Reader, feed: Feed) -> bool: +def has_white_tags(custom_reader: Reader, feed: Feed) -> bool: """Return True if the feed has whitelist tags. The following tags are checked: @@ -25,21 +22,21 @@ def has_white_tags(reader: Reader, feed: Feed) -> bool: - whitelist_title Args: - reader: The reader. + custom_reader: The reader. feed: The feed to check. Returns: bool: If the feed has any of the tags. """ - whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", "")).strip() - whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", "")).strip() - whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", "")).strip() - whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", "")).strip() + whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")).strip() + whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")).strip() + whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")).strip() + whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", "")).strip() - regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", "")).strip() - regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", "")).strip() - regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", "")).strip() - regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", "")).strip() + regex_whitelist_title: str = str(custom_reader.get_tag(feed, "regex_whitelist_title", "")).strip() + regex_whitelist_summary: str = str(custom_reader.get_tag(feed, "regex_whitelist_summary", "")).strip() + regex_whitelist_content: str = str(custom_reader.get_tag(feed, "regex_whitelist_content", "")).strip() + regex_whitelist_author: str = str(custom_reader.get_tag(feed, "regex_whitelist_author", "")).strip() return bool( whitelist_title @@ -53,11 +50,11 @@ def has_white_tags(reader: Reader, feed: Feed) -> bool: ) -def should_be_sent(reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 +def should_be_sent(custom_reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 """Return True if the entry is in the whitelist. Args: - reader: The reader. + custom_reader: The reader. entry: The entry to check. Returns: @@ -65,16 +62,16 @@ def should_be_sent(reader: Reader, entry: Entry) -> bool: # noqa: PLR0911 """ feed: Feed = entry.feed # Regular whitelist tags - whitelist_title: str = str(reader.get_tag(feed, "whitelist_title", "")).strip() - whitelist_summary: str = str(reader.get_tag(feed, "whitelist_summary", "")).strip() - whitelist_content: str = str(reader.get_tag(feed, "whitelist_content", "")).strip() - whitelist_author: str = str(reader.get_tag(feed, "whitelist_author", "")).strip() + whitelist_title: str = str(custom_reader.get_tag(feed, "whitelist_title", "")).strip() + whitelist_summary: str = str(custom_reader.get_tag(feed, "whitelist_summary", "")).strip() + whitelist_content: str = str(custom_reader.get_tag(feed, "whitelist_content", "")).strip() + whitelist_author: str = str(custom_reader.get_tag(feed, "whitelist_author", "")).strip() # Regex whitelist tags - regex_whitelist_title: str = str(reader.get_tag(feed, "regex_whitelist_title", "")).strip() - regex_whitelist_summary: str = str(reader.get_tag(feed, "regex_whitelist_summary", "")).strip() - regex_whitelist_content: str = str(reader.get_tag(feed, "regex_whitelist_content", "")).strip() - regex_whitelist_author: str = str(reader.get_tag(feed, "regex_whitelist_author", "")).strip() + regex_whitelist_title: str = str(custom_reader.get_tag(feed, "regex_whitelist_title", "")).strip() + regex_whitelist_summary: str = str(custom_reader.get_tag(feed, "regex_whitelist_summary", "")).strip() + regex_whitelist_content: str = str(custom_reader.get_tag(feed, "regex_whitelist_content", "")).strip() + regex_whitelist_author: str = str(custom_reader.get_tag(feed, "regex_whitelist_author", "")).strip() # Check regular whitelist if entry.title and whitelist_title and is_word_in_text(whitelist_title, entry.title): diff --git a/discord_rss_bot/git_backup.py b/discord_rss_bot/git_backup.py deleted file mode 100644 index 49528ec..0000000 --- a/discord_rss_bot/git_backup.py +++ /dev/null @@ -1,243 +0,0 @@ -"""Git backup module for committing bot state changes to a private repository. - -Configure the backup by setting these environment variables: -- ``GIT_BACKUP_PATH``: Local filesystem path for the backup git repository. - When set, the bot will initialise a git repo there (if one doesn't exist) - and commit an export of its state after every relevant change. -- ``GIT_BACKUP_REMOTE``: Optional remote URL (e.g. ``git@github.com:you/private-repo.git``). - When set, every commit is followed by a ``git push`` to this remote. - -The exported state is written as ``state.json`` inside the backup repo. It -contains the list of feeds together with their webhook URL, filter settings -(blacklist / whitelist, regex variants), custom messages and embed settings. -Global webhooks are also included. - -Example docker-compose snippet:: - - environment: - - GIT_BACKUP_PATH=/data/backup - - GIT_BACKUP_REMOTE=git@github.com:you/private-config.git -""" - -from __future__ import annotations - -import json -import logging -import os -import shutil -import subprocess # noqa: S404 -from pathlib import Path -from typing import TYPE_CHECKING -from typing import Any - -if TYPE_CHECKING: - from reader import Reader - -logger: logging.Logger = logging.getLogger(__name__) -GIT_EXECUTABLE: str = shutil.which("git") or "git" - - -type TAG_VALUE = ( - dict[str, str | int | float | bool | dict[str, Any] | list[Any] | None] - | list[str | int | float | bool | dict[str, Any] | list[Any] | None] - | None -) - -# Tags that are exported per-feed (empty values are omitted). -_FEED_TAGS: tuple[str, ...] = ( - "webhook", - "custom_message", - "should_send_embed", - "embed", - "blacklist_title", - "blacklist_summary", - "blacklist_content", - "blacklist_author", - "regex_blacklist_title", - "regex_blacklist_summary", - "regex_blacklist_content", - "regex_blacklist_author", - "whitelist_title", - "whitelist_summary", - "whitelist_content", - "whitelist_author", - "regex_whitelist_title", - "regex_whitelist_summary", - "regex_whitelist_content", - "regex_whitelist_author", - ".reader.update", -) - - -def get_backup_path() -> Path | None: - """Return the configured backup path, or *None* if not configured. - - Returns: - Path to the backup repository, or None if ``GIT_BACKUP_PATH`` is unset. - """ - raw: str = os.environ.get("GIT_BACKUP_PATH", "").strip() - return Path(raw) if raw else None - - -def get_backup_remote() -> str: - """Return the configured remote URL, or an empty string if not set. - - Returns: - The remote URL string from ``GIT_BACKUP_REMOTE``, or ``""`` if unset. - """ - return os.environ.get("GIT_BACKUP_REMOTE", "").strip() - - -def setup_backup_repo(backup_path: Path) -> bool: - """Ensure the backup directory exists and contains a git repository. - - If the directory does not yet contain a ``.git`` folder a new repository is - initialised. A basic git identity is configured locally so that commits - succeed even in environments where a global ``~/.gitconfig`` is absent. - - Args: - backup_path: Local path for the backup repository. - - Returns: - ``True`` if the repository is ready, ``False`` on any error. - """ - try: - backup_path.mkdir(parents=True, exist_ok=True) - git_dir: Path = backup_path / ".git" - if not git_dir.exists(): - subprocess.run([GIT_EXECUTABLE, "init", str(backup_path)], check=True, capture_output=True) # noqa: S603 - logger.info("Initialised git backup repository at %s", backup_path) - - # Ensure a local identity exists so that `git commit` always works. - for key, value in (("user.email", "discord-rss-bot@localhost"), ("user.name", "discord-rss-bot")): - result: subprocess.CompletedProcess[bytes] = subprocess.run( # noqa: S603 - [GIT_EXECUTABLE, "-C", str(backup_path), "config", "--local", key], - check=False, - capture_output=True, - ) - if result.returncode != 0: - subprocess.run( # noqa: S603 - [GIT_EXECUTABLE, "-C", str(backup_path), "config", "--local", key, value], - check=True, - capture_output=True, - ) - - # Configure the remote if GIT_BACKUP_REMOTE is set. - remote_url: str = get_backup_remote() - if remote_url: - # Check if remote "origin" already exists. - check_remote: subprocess.CompletedProcess[bytes] = subprocess.run( # noqa: S603 - [GIT_EXECUTABLE, "-C", str(backup_path), "remote", "get-url", "origin"], - check=False, - capture_output=True, - ) - if check_remote.returncode != 0: - # Remote doesn't exist, add it. - subprocess.run( # noqa: S603 - [GIT_EXECUTABLE, "-C", str(backup_path), "remote", "add", "origin", remote_url], - check=True, - capture_output=True, - ) - logger.info("Added remote 'origin' with URL: %s", remote_url) - else: - # Remote exists, update it if the URL has changed. - current_url: str = check_remote.stdout.decode().strip() - if current_url != remote_url: - subprocess.run( # noqa: S603 - [GIT_EXECUTABLE, "-C", str(backup_path), "remote", "set-url", "origin", remote_url], - check=True, - capture_output=True, - ) - logger.info("Updated remote 'origin' URL from %s to %s", current_url, remote_url) - except Exception: - logger.exception("Failed to set up git backup repository at %s", backup_path) - return False - return True - - -def export_state(reader: Reader, backup_path: Path) -> None: - """Serialise the current bot state to ``state.json`` inside *backup_path*. - - Args: - reader: The :class:`reader.Reader` instance to read state from. - backup_path: Destination directory for the exported ``state.json``. - """ - feeds_state: list[dict] = [] - for feed in reader.get_feeds(): - feed_data: dict = {"url": feed.url} - for tag in _FEED_TAGS: - try: - value: TAG_VALUE = reader.get_tag(feed, tag, None) - if value is not None and value != "": # noqa: PLC1901 - feed_data[tag] = value - except Exception: - logger.exception("Failed to read tag '%s' for feed '%s' during state export", tag, feed.url) - feeds_state.append(feed_data) - - webhooks: list[str | int | float | bool | dict[str, Any] | list[Any] | None] = list( - reader.get_tag((), "webhooks", []), - ) - - # Export global update interval if set - global_update_interval: dict[str, Any] | None = None - global_update_config = reader.get_tag((), ".reader.update", None) - if isinstance(global_update_config, dict): - global_update_interval = global_update_config - - state: dict = {"feeds": feeds_state, "webhooks": webhooks} - if global_update_interval is not None: - state["global_update_interval"] = global_update_interval - state_file: Path = backup_path / "state.json" - state_file.write_text(json.dumps(state, indent=2, default=str), encoding="utf-8") - - -def commit_state_change(reader: Reader, message: str) -> None: - """Export current state and commit it to the backup repository. - - This is a no-op when ``GIT_BACKUP_PATH`` is not configured. Errors are - logged but never raised so that a backup failure never interrupts normal - bot operation. - - Args: - reader: The :class:`reader.Reader` instance to read state from. - message: Commit message describing the change (e.g. ``"Add feed example.com/rss.xml"``). - """ - backup_path: Path | None = get_backup_path() - if backup_path is None: - return - - if not setup_backup_repo(backup_path): - return - - try: - export_state(reader, backup_path) - - subprocess.run([GIT_EXECUTABLE, "-C", str(backup_path), "add", "-A"], check=True, capture_output=True) # noqa: S603 - - # Only create a commit if there are staged changes. - diff_result: subprocess.CompletedProcess[bytes] = subprocess.run( # noqa: S603 - [GIT_EXECUTABLE, "-C", str(backup_path), "diff", "--cached", "--exit-code"], - check=False, - capture_output=True, - ) - if diff_result.returncode == 0: - logger.debug("No state changes to commit for: %s", message) - return - - subprocess.run( # noqa: S603 - [GIT_EXECUTABLE, "-C", str(backup_path), "commit", "-m", message], - check=True, - capture_output=True, - ) - logger.info("Committed state change to backup repo: %s", message) - - # Push to remote if configured. - if get_backup_remote(): - subprocess.run( # noqa: S603 - [GIT_EXECUTABLE, "-C", str(backup_path), "push", "origin", "HEAD"], - check=True, - capture_output=True, - ) - logger.info("Pushed state change to remote 'origin': %s", message) - except Exception: - logger.exception("Failed to commit state change '%s' to backup repo", message) diff --git a/discord_rss_bot/hoyolab_api.py b/discord_rss_bot/hoyolab_api.py index 227a413..cb1ed71 100644 --- a/discord_rss_bot/hoyolab_api.py +++ b/discord_rss_bot/hoyolab_api.py @@ -4,12 +4,10 @@ import contextlib import json import logging import re -from typing import TYPE_CHECKING -from typing import Any +from typing import TYPE_CHECKING, Any import requests -from discord_webhook import DiscordEmbed -from discord_webhook import DiscordWebhook +from discord_webhook import DiscordEmbed, DiscordWebhook if TYPE_CHECKING: from reader import Entry diff --git a/discord_rss_bot/is_url_valid.py b/discord_rss_bot/is_url_valid.py index c986b4a..cca1491 100644 --- a/discord_rss_bot/is_url_valid.py +++ b/discord_rss_bot/is_url_valid.py @@ -1,7 +1,6 @@ from __future__ import annotations -from urllib.parse import ParseResult -from urllib.parse import urlparse +from urllib.parse import ParseResult, urlparse def is_url_valid(url: str) -> bool: diff --git a/discord_rss_bot/main.py b/discord_rss_bot/main.py index 1e5211b..3103fe7 100644 --- a/discord_rss_bot/main.py +++ b/discord_rss_bot/main.py @@ -7,65 +7,48 @@ import typing import urllib.parse from contextlib import asynccontextmanager from dataclasses import dataclass -from datetime import UTC -from datetime import datetime +from datetime import UTC, datetime from functools import lru_cache -from typing import TYPE_CHECKING -from typing import Annotated -from typing import Any -from typing import cast +from typing import TYPE_CHECKING, Annotated, cast import httpx import sentry_sdk import uvicorn from apscheduler.schedulers.asyncio import AsyncIOScheduler -from fastapi import Depends -from fastapi import FastAPI -from fastapi import Form -from fastapi import HTTPException -from fastapi import Request +from fastapi import FastAPI, Form, HTTPException, Request from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from httpx import Response from markdownify import markdownify -from reader import Entry -from reader import EntryNotFoundError -from reader import Feed -from reader import FeedExistsError -from reader import FeedNotFoundError -from reader import Reader -from reader import ReaderError -from reader import TagNotFoundError +from reader import Entry, EntryNotFoundError, Feed, FeedNotFoundError, Reader, TagNotFoundError from starlette.responses import RedirectResponse from discord_rss_bot import settings -from discord_rss_bot.custom_filters import entry_is_blacklisted -from discord_rss_bot.custom_filters import entry_is_whitelisted -from discord_rss_bot.custom_message import CustomEmbed -from discord_rss_bot.custom_message import get_custom_message -from discord_rss_bot.custom_message import get_embed -from discord_rss_bot.custom_message import get_first_image -from discord_rss_bot.custom_message import replace_tags_in_text_message -from discord_rss_bot.custom_message import save_embed -from discord_rss_bot.feeds import create_feed -from discord_rss_bot.feeds import extract_domain -from discord_rss_bot.feeds import send_entry_to_discord -from discord_rss_bot.feeds import send_to_discord -from discord_rss_bot.git_backup import commit_state_change -from discord_rss_bot.git_backup import get_backup_path -from discord_rss_bot.is_url_valid import is_url_valid -from discord_rss_bot.search import create_search_context +from discord_rss_bot.custom_filters import ( + entry_is_blacklisted, + entry_is_whitelisted, +) +from discord_rss_bot.custom_message import ( + CustomEmbed, + get_custom_message, + get_embed, + get_first_image, + replace_tags_in_text_message, + save_embed, +) +from discord_rss_bot.feeds import create_feed, extract_domain, send_entry_to_discord, send_to_discord +from discord_rss_bot.missing_tags import add_missing_tags +from discord_rss_bot.search import create_html_for_search_results from discord_rss_bot.settings import get_reader if TYPE_CHECKING: - from collections.abc import AsyncGenerator - from collections.abc import Iterable + from collections.abc import AsyncGenerator, Iterable from reader.types import JSONType -LOGGING_CONFIG: dict[str, Any] = { +LOGGING_CONFIG = { "version": 1, "disable_existing_loggers": True, "formatters": { @@ -101,71 +84,25 @@ LOGGING_CONFIG: dict[str, Any] = { logging.config.dictConfig(LOGGING_CONFIG) logger: logging.Logger = logging.getLogger(__name__) - - -def get_reader_dependency() -> Reader: - """Provide the app Reader instance as a FastAPI dependency. - - Returns: - Reader: The shared Reader instance. - """ - return get_reader() - - -# Time constants for relative time formatting -SECONDS_PER_MINUTE = 60 -SECONDS_PER_HOUR = 3600 -SECONDS_PER_DAY = 86400 - - -def relative_time(dt: datetime | None) -> str: - """Convert a datetime to a relative time string (e.g., '2 hours ago', 'in 5 minutes'). - - Args: - dt: The datetime to convert (should be timezone-aware). - - Returns: - A human-readable relative time string. - """ - if dt is None: - return "Never" - - now = datetime.now(tz=UTC) - diff = dt - now - seconds = int(abs(diff.total_seconds())) - is_future = diff.total_seconds() > 0 - - # Determine the appropriate unit and value - if seconds < SECONDS_PER_MINUTE: - value = seconds - unit = "s" - elif seconds < SECONDS_PER_HOUR: - value = seconds // SECONDS_PER_MINUTE - unit = "m" - elif seconds < SECONDS_PER_DAY: - value = seconds // SECONDS_PER_HOUR - unit = "h" - else: - value = seconds // SECONDS_PER_DAY - unit = "d" - - # Format based on future or past - return f"in {value}{unit}" if is_future else f"{value}{unit} ago" +reader: Reader = get_reader() @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None]: - """Lifespan function for the FastAPI app.""" - reader: Reader = get_reader() - scheduler: AsyncIOScheduler = AsyncIOScheduler(timezone=UTC) - scheduler.add_job( - func=send_to_discord, - trigger="interval", - minutes=1, - id="send_to_discord", - max_instances=1, - next_run_time=datetime.now(tz=UTC), - ) + """Lifespan for the FastAPI app. + + Args: + app: The FastAPI app. + + Yields: + None: Nothing. + """ + add_missing_tags(reader) + scheduler: AsyncIOScheduler = AsyncIOScheduler() + + # Run job every minute to check for new entries. Feeds will be checked every 15 minutes. + # TODO(TheLovinator): Make this configurable. + scheduler.add_job(send_to_discord, "interval", minutes=1, next_run_time=datetime.now(tz=UTC)) scheduler.start() logger.info("Scheduler started.") yield @@ -180,29 +117,27 @@ templates: Jinja2Templates = Jinja2Templates(directory="discord_rss_bot/template # Add the filters to the Jinja2 environment so they can be used in html templates. templates.env.filters["encode_url"] = lambda url: urllib.parse.quote(url) if url else "" +templates.env.filters["entry_is_whitelisted"] = entry_is_whitelisted +templates.env.filters["entry_is_blacklisted"] = entry_is_blacklisted templates.env.filters["discord_markdown"] = markdownify -templates.env.filters["relative_time"] = relative_time -templates.env.globals["get_backup_path"] = get_backup_path @app.post("/add_webhook") async def post_add_webhook( webhook_name: Annotated[str, Form()], webhook_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], ) -> RedirectResponse: """Add a feed to the database. Args: webhook_name: The name of the webhook. webhook_url: The url of the webhook. - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to the index page. Raises: HTTPException: If the webhook already exists. + + Returns: + RedirectResponse: Redirect to the index page. """ # Get current webhooks from the database if they exist otherwise use an empty list. webhooks = list(reader.get_tag((), "webhooks", [])) @@ -219,8 +154,6 @@ async def post_add_webhook( reader.set_tag((), "webhooks", webhooks) # pyright: ignore[reportArgumentType] - commit_state_change(reader, f"Add webhook {webhook_name.strip()}") - return RedirectResponse(url="/", status_code=303) # TODO(TheLovinator): Show this error on the page. @@ -229,22 +162,17 @@ async def post_add_webhook( @app.post("/delete_webhook") -async def post_delete_webhook( - webhook_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: +async def post_delete_webhook(webhook_url: Annotated[str, Form()]) -> RedirectResponse: """Delete a webhook from the database. Args: webhook_url: The url of the webhook. - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to the index page. Raises: HTTPException: If the webhook could not be deleted + Returns: + RedirectResponse: Redirect to the index page. """ # TODO(TheLovinator): Check if the webhook is in use by any feeds before deleting it. # TODO(TheLovinator): Replace HTTPException with a custom exception for both of these. @@ -271,8 +199,6 @@ async def post_delete_webhook( # Add our new list of webhooks to the database. reader.set_tag((), "webhooks", webhooks) # pyright: ignore[reportArgumentType] - commit_state_change(reader, f"Delete webhook {webhook_url.strip()}") - return RedirectResponse(url="/", status_code=303) @@ -280,34 +206,27 @@ async def post_delete_webhook( async def post_create_feed( feed_url: Annotated[str, Form()], webhook_dropdown: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], ) -> RedirectResponse: """Add a feed to the database. Args: feed_url: The feed to add. webhook_dropdown: The webhook to use. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. """ clean_feed_url: str = feed_url.strip() create_feed(reader, feed_url, webhook_dropdown) - commit_state_change(reader, f"Add feed {clean_feed_url}") return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.post("/pause") -async def post_pause_feed( - feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: +async def post_pause_feed(feed_url: Annotated[str, Form()]) -> RedirectResponse: """Pause a feed. Args: feed_url: The feed to pause. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -318,15 +237,11 @@ async def post_pause_feed( @app.post("/unpause") -async def post_unpause_feed( - feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: +async def post_unpause_feed(feed_url: Annotated[str, Form()]) -> RedirectResponse: """Unpause a feed. Args: feed_url: The Feed to unpause. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -338,7 +253,6 @@ async def post_unpause_feed( @app.post("/whitelist") async def post_set_whitelist( - reader: Annotated[Reader, Depends(get_reader_dependency)], whitelist_title: Annotated[str, Form()] = "", whitelist_summary: Annotated[str, Form()] = "", whitelist_content: Annotated[str, Form()] = "", @@ -361,7 +275,6 @@ async def post_set_whitelist( regex_whitelist_content: Whitelisted regex for when checking the content. regex_whitelist_author: Whitelisted regex for when checking the author. feed_url: The feed we should set the whitelist for. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -376,23 +289,16 @@ async def post_set_whitelist( reader.set_tag(clean_feed_url, "regex_whitelist_content", regex_whitelist_content) # pyright: ignore[reportArgumentType][call-overload] reader.set_tag(clean_feed_url, "regex_whitelist_author", regex_whitelist_author) # pyright: ignore[reportArgumentType][call-overload] - commit_state_change(reader, f"Update whitelist for {clean_feed_url}") - return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.get("/whitelist", response_class=HTMLResponse) -async def get_whitelist( - feed_url: str, - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def get_whitelist(feed_url: str, request: Request): """Get the whitelist. Args: feed_url: What feed we should get the whitelist for. request: The request object. - reader: The Reader instance. Returns: HTMLResponse: The whitelist page. @@ -426,7 +332,6 @@ async def get_whitelist( @app.post("/blacklist") async def post_set_blacklist( - reader: Annotated[Reader, Depends(get_reader_dependency)], blacklist_title: Annotated[str, Form()] = "", blacklist_summary: Annotated[str, Form()] = "", blacklist_content: Annotated[str, Form()] = "", @@ -452,7 +357,6 @@ async def post_set_blacklist( regex_blacklist_content: Blacklisted regex for when checking the content. regex_blacklist_author: Blacklisted regex for when checking the author. feed_url: What feed we should set the blacklist for. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -466,22 +370,16 @@ async def post_set_blacklist( reader.set_tag(clean_feed_url, "regex_blacklist_summary", regex_blacklist_summary) # pyright: ignore[reportArgumentType][call-overload] reader.set_tag(clean_feed_url, "regex_blacklist_content", regex_blacklist_content) # pyright: ignore[reportArgumentType][call-overload] reader.set_tag(clean_feed_url, "regex_blacklist_author", regex_blacklist_author) # pyright: ignore[reportArgumentType][call-overload] - commit_state_change(reader, f"Update blacklist for {clean_feed_url}") return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.get("/blacklist", response_class=HTMLResponse) -async def get_blacklist( - feed_url: str, - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def get_blacklist(feed_url: str, request: Request): """Get the blacklist. Args: feed_url: What feed we should get the blacklist for. request: The request object. - reader: The Reader instance. Returns: HTMLResponse: The blacklist page. @@ -515,7 +413,6 @@ async def get_blacklist( @app.post("/custom") async def post_set_custom( feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], custom_message: Annotated[str, Form()] = "", ) -> RedirectResponse: """Set the custom message, this is used when sending the message. @@ -523,7 +420,6 @@ async def post_set_custom( Args: custom_message: The custom message. feed_url: The feed we should set the custom message for. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. @@ -540,22 +436,16 @@ async def post_set_custom( reader.set_tag(feed_url, "custom_message", default_custom_message) clean_feed_url: str = feed_url.strip() - commit_state_change(reader, f"Update custom message for {clean_feed_url}") return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.get("/custom", response_class=HTMLResponse) -async def get_custom( - feed_url: str, - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def get_custom(feed_url: str, request: Request): """Get the custom message. This is used when sending the message to Discord. Args: feed_url: What feed we should get the custom message for. request: The request object. - reader: The Reader instance. Returns: HTMLResponse: The custom message page. @@ -576,17 +466,12 @@ async def get_custom( @app.get("/embed", response_class=HTMLResponse) -async def get_embed_page( - feed_url: str, - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def get_embed_page(feed_url: str, request: Request): """Get the custom message. This is used when sending the message to Discord. Args: feed_url: What feed we should get the custom message for. request: The request object. - reader: The Reader instance. Returns: HTMLResponse: The embed page. @@ -620,9 +505,8 @@ async def get_embed_page( @app.post("/embed", response_class=HTMLResponse) -async def post_embed( # noqa: C901 +async def post_embed( feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], title: Annotated[str, Form()] = "", description: Annotated[str, Form()] = "", color: Annotated[str, Form()] = "", @@ -648,7 +532,7 @@ async def post_embed( # noqa: C901 author_icon_url: The author icon url of the embed. footer_text: The footer text of the embed. footer_icon_url: The footer icon url of the embed. - reader: The Reader instance. + Returns: RedirectResponse: Redirect to the embed page. @@ -657,245 +541,59 @@ async def post_embed( # noqa: C901 feed: Feed = reader.get_feed(urllib.parse.unquote(clean_feed_url)) custom_embed: CustomEmbed = get_embed(reader, feed) - # Only overwrite fields that the user provided. This prevents accidental - # clearing of previously saved embed data when the form submits empty - # values for fields the user did not change. - if title: - custom_embed.title = title - if description: - custom_embed.description = description - if color: - custom_embed.color = color - if image_url: - custom_embed.image_url = image_url - if thumbnail_url: - custom_embed.thumbnail_url = thumbnail_url - if author_name: - custom_embed.author_name = author_name - if author_url: - custom_embed.author_url = author_url - if author_icon_url: - custom_embed.author_icon_url = author_icon_url - if footer_text: - custom_embed.footer_text = footer_text - if footer_icon_url: - custom_embed.footer_icon_url = footer_icon_url + custom_embed.title = title + custom_embed.description = description + custom_embed.color = color + custom_embed.image_url = image_url + custom_embed.thumbnail_url = thumbnail_url + custom_embed.author_name = author_name + custom_embed.author_url = author_url + custom_embed.author_icon_url = author_icon_url + custom_embed.footer_text = footer_text + custom_embed.footer_icon_url = footer_icon_url # Save the data. save_embed(reader, feed, custom_embed) - commit_state_change(reader, f"Update embed settings for {clean_feed_url}") return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.post("/use_embed") -async def post_use_embed( - feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: +async def post_use_embed(feed_url: Annotated[str, Form()]) -> RedirectResponse: """Use embed instead of text. Args: feed_url: The feed to change. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. """ clean_feed_url: str = feed_url.strip() reader.set_tag(clean_feed_url, "should_send_embed", True) # pyright: ignore[reportArgumentType] - commit_state_change(reader, f"Enable embed mode for {clean_feed_url}") return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.post("/use_text") -async def post_use_text( - feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: +async def post_use_text(feed_url: Annotated[str, Form()]) -> RedirectResponse: """Use text instead of embed. Args: feed_url: The feed to change. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. """ clean_feed_url: str = feed_url.strip() reader.set_tag(clean_feed_url, "should_send_embed", False) # pyright: ignore[reportArgumentType] - commit_state_change(reader, f"Disable embed mode for {clean_feed_url}") return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) -@app.post("/set_update_interval") -async def post_set_update_interval( - feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], - interval_minutes: Annotated[int | None, Form()] = None, - redirect_to: Annotated[str, Form()] = "", -) -> RedirectResponse: - """Set the update interval for a feed. - - Args: - feed_url: The feed to change. - interval_minutes: The update interval in minutes (None to reset to global default). - redirect_to: Optional redirect URL (defaults to feed page). - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to the specified page or feed page. - """ - clean_feed_url: str = feed_url.strip() - - # If no interval specified, reset to global default - if interval_minutes is None: - try: - reader.delete_tag(clean_feed_url, ".reader.update") - commit_state_change(reader, f"Reset update interval to default for {clean_feed_url}") - except TagNotFoundError: - pass - else: - # Validate interval (minimum 1 minute, no maximum) - interval_minutes = max(interval_minutes, 1) - reader.set_tag(clean_feed_url, ".reader.update", {"interval": interval_minutes}) # pyright: ignore[reportArgumentType] - commit_state_change(reader, f"Set update interval to {interval_minutes} minutes for {clean_feed_url}") - - # Update the feed immediately to recalculate update_after with the new interval - try: - reader.update_feed(clean_feed_url) - logger.info("Updated feed after interval change: %s", clean_feed_url) - except Exception: - logger.exception("Failed to update feed after interval change: %s", clean_feed_url) - - if redirect_to: - return RedirectResponse(url=redirect_to, status_code=303) - return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) - - -@app.post("/change_feed_url") -async def post_change_feed_url( - old_feed_url: Annotated[str, Form()], - new_feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: - """Change the URL for an existing feed. - - Args: - old_feed_url: Current feed URL. - new_feed_url: New feed URL to change to. - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to the feed page for the resulting URL. - - Raises: - HTTPException: If the old feed is not found, the new URL already exists, or change fails. - """ - clean_old_feed_url: str = old_feed_url.strip() - clean_new_feed_url: str = new_feed_url.strip() - - if not clean_old_feed_url or not clean_new_feed_url: - raise HTTPException(status_code=400, detail="Feed URLs cannot be empty") - - if clean_old_feed_url == clean_new_feed_url: - return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_old_feed_url)}", status_code=303) - - try: - reader.change_feed_url(clean_old_feed_url, clean_new_feed_url) - except FeedNotFoundError as e: - raise HTTPException(status_code=404, detail=f"Feed not found: {clean_old_feed_url}") from e - except FeedExistsError as e: - raise HTTPException(status_code=409, detail=f"Feed already exists: {clean_new_feed_url}") from e - except ReaderError as e: - raise HTTPException(status_code=400, detail=f"Failed to change feed URL: {e}") from e - - # Update the feed with the new URL so we can discover what entries it returns. - # Then mark all unread entries as read so the scheduler doesn't resend them. - try: - reader.update_feed(clean_new_feed_url) - except Exception: - logger.exception("Failed to update feed after URL change: %s", clean_new_feed_url) - - for entry in reader.get_entries(feed=clean_new_feed_url, read=False): - try: - reader.set_entry_read(entry, True) - except Exception: - logger.exception("Failed to mark entry as read after URL change: %s", entry.id) - - commit_state_change(reader, f"Change feed URL from {clean_old_feed_url} to {clean_new_feed_url}") - return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_new_feed_url)}", status_code=303) - - -@app.post("/reset_update_interval") -async def post_reset_update_interval( - feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], - redirect_to: Annotated[str, Form()] = "", -) -> RedirectResponse: - """Reset the update interval for a feed to use the global default. - - Args: - feed_url: The feed to change. - redirect_to: Optional redirect URL (defaults to feed page). - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to the specified page or feed page. - """ - clean_feed_url: str = feed_url.strip() - - try: - reader.delete_tag(clean_feed_url, ".reader.update") - commit_state_change(reader, f"Reset update interval to default for {clean_feed_url}") - except TagNotFoundError: - # Tag doesn't exist, which is fine - pass - - # Update the feed immediately to recalculate update_after with the new interval - try: - reader.update_feed(clean_feed_url) - logger.info("Updated feed after interval reset: %s", clean_feed_url) - except Exception: - logger.exception("Failed to update feed after interval reset: %s", clean_feed_url) - - if redirect_to: - return RedirectResponse(url=redirect_to, status_code=303) - return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) - - -@app.post("/set_global_update_interval") -async def post_set_global_update_interval( - interval_minutes: Annotated[int, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: - """Set the global default update interval. - - Args: - interval_minutes: The update interval in minutes. - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to the settings page. - """ - # Validate interval (minimum 1 minute, no maximum) - interval_minutes = max(interval_minutes, 1) - - reader.set_tag((), ".reader.update", {"interval": interval_minutes}) # pyright: ignore[reportArgumentType] - commit_state_change(reader, f"Set global update interval to {interval_minutes} minutes") - return RedirectResponse(url="/settings", status_code=303) - - @app.get("/add", response_class=HTMLResponse) -def get_add( - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +def get_add(request: Request): """Page for adding a new feed. Args: request: The request object. - reader: The Reader instance. Returns: HTMLResponse: The add feed page. @@ -908,25 +606,19 @@ def get_add( @app.get("/feed", response_class=HTMLResponse) -async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 - feed_url: str, - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], - starting_after: str = "", -): +async def get_feed(feed_url: str, request: Request, starting_after: str = ""): """Get a feed by URL. Args: feed_url: The feed to add. request: The request object. starting_after: The entry to start after. Used for pagination. - reader: The Reader instance. - - Returns: - HTMLResponse: The feed page. Raises: HTTPException: If the feed is not found. + + Returns: + HTMLResponse: The feed page. """ entries_per_page: int = 20 @@ -939,7 +631,7 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 # Only show button if more than 10 entries. total_entries: int = reader.get_entry_counts(feed=feed).total or 0 - is_show_more_entries_button_visible: bool = total_entries > entries_per_page + show_more_entires_button: bool = total_entries > entries_per_page # Get entries from the feed. if starting_after: @@ -950,22 +642,7 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 except EntryNotFoundError as e: current_entries = list(reader.get_entries(feed=clean_feed_url)) msg: str = f"{e}\n\n{[entry.id for entry in current_entries]}" - html: str = create_html_for_feed(reader=reader, entries=current_entries, current_feed_url=clean_feed_url) - - # Get feed and global intervals for error case too - feed_interval: int | None = None - feed_update_config = reader.get_tag(feed, ".reader.update", None) - if isinstance(feed_update_config, dict) and "interval" in feed_update_config: - interval_value = feed_update_config["interval"] - if isinstance(interval_value, int): - feed_interval = interval_value - - global_interval: int = 60 - global_update_config = reader.get_tag((), ".reader.update", None) - if isinstance(global_update_config, dict) and "interval" in global_update_config: - interval_value = global_update_config["interval"] - if isinstance(interval_value, int): - global_interval = interval_value + html: str = create_html_for_feed(current_entries) context = { "request": request, @@ -976,10 +653,8 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 "should_send_embed": False, "last_entry": None, "messages": msg, - "is_show_more_entries_button_visible": is_show_more_entries_button_visible, + "show_more_entires_button": show_more_entires_button, "total_entries": total_entries, - "feed_interval": feed_interval, - "global_interval": global_interval, } return templates.TemplateResponse(request=request, name="feed.html", context=context) @@ -1000,25 +675,13 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 last_entry = entries[-1] # Create the html for the entries. - html: str = create_html_for_feed(reader=reader, entries=entries, current_feed_url=clean_feed_url) + html: str = create_html_for_feed(entries) - should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed", True)) - - # Get the update interval for this feed - feed_interval: int | None = None - feed_update_config = reader.get_tag(feed, ".reader.update", None) - if isinstance(feed_update_config, dict) and "interval" in feed_update_config: - interval_value = feed_update_config["interval"] - if isinstance(interval_value, int): - feed_interval = interval_value - - # Get the global default update interval - global_interval: int = 60 # Default to 60 minutes if not set - global_update_config = reader.get_tag((), ".reader.update", None) - if isinstance(global_update_config, dict) and "interval" in global_update_config: - interval_value = global_update_config["interval"] - if isinstance(interval_value, int): - global_interval = interval_value + try: + should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed")) + except TagNotFoundError: + add_missing_tags(reader) + should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed")) context = { "request": request, @@ -1028,25 +691,17 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915 "html": html, "should_send_embed": should_send_embed, "last_entry": last_entry, - "is_show_more_entries_button_visible": is_show_more_entries_button_visible, + "show_more_entires_button": show_more_entires_button, "total_entries": total_entries, - "feed_interval": feed_interval, - "global_interval": global_interval, } return templates.TemplateResponse(request=request, name="feed.html", context=context) -def create_html_for_feed( # noqa: C901, PLR0914 - reader: Reader, - entries: Iterable[Entry], - current_feed_url: str = "", -) -> str: +def create_html_for_feed(entries: Iterable[Entry]) -> str: """Create HTML for the search results. Args: - reader: The Reader instance to use. entries: The entries to create HTML for. - current_feed_url: The feed URL currently being viewed in /feed. Returns: str: The HTML for the search results. @@ -1062,43 +717,21 @@ def create_html_for_feed( # noqa: C901, PLR0914 first_image = get_first_image(summary, content) - text: str = replace_tags_in_text_message(entry, reader=reader) or ( - "
No content available.
" - ) + text: str = replace_tags_in_text_message(entry) or "
No content available.
" published = "" if entry.published: published: str = entry.published.strftime("%Y-%m-%d %H:%M:%S") blacklisted: str = "" - if entry_is_blacklisted(entry, reader=reader): + if entry_is_blacklisted(entry): blacklisted = "Blacklisted" whitelisted: str = "" - if entry_is_whitelisted(entry, reader=reader): + if entry_is_whitelisted(entry): whitelisted = "Whitelisted" - source_feed_url: str = getattr(entry, "original_feed_url", None) or entry.feed.url - - from_another_feed: str = "" - if current_feed_url and source_feed_url != current_feed_url: - from_another_feed = f"From another feed: {source_feed_url}" - - # Add feed link when viewing from webhook_entries or aggregated views - feed_link: str = "" - if not current_feed_url or source_feed_url != current_feed_url: - encoded_feed_url: str = urllib.parse.quote(source_feed_url) - feed_title: str = entry.feed.title if hasattr(entry.feed, "title") and entry.feed.title else source_feed_url - feed_link = ( - f"{feed_title}
" - ) - entry_id: str = urllib.parse.quote(entry.id) - encoded_source_feed_url: str = urllib.parse.quote(source_feed_url) - to_discord_html: str = ( - f"" - "Send to Discord" - ) + to_discord_html: str = f"Send to Discord" # Check if this is a YouTube feed entry and the entry has a link is_youtube_feed = "youtube.com/feeds/videos.xml" in entry.feed.url @@ -1123,14 +756,14 @@ def create_html_for_feed( # noqa: C901, PLR0914 image_html: str = f"" if first_image else "" html += f"""
-{blacklisted}{whitelisted}{from_another_feed}

{entry.title}

-{feed_link}{f"By {entry.author} @" if entry.author else ""}{published} - {to_discord_html} +{blacklisted}{whitelisted}

{entry.title}

+{f"By {entry.author} @" if entry.author else ""}{published} - {to_discord_html} {text} {video_embed_html} {image_html}
-""" # noqa: E501 +""" return html.strip() @@ -1169,7 +802,6 @@ def get_data_from_hook_url(hook_name: str, hook_url: str) -> WebhookInfo: hook_name (str): The webhook name. hook_url (str): The webhook URL. - Returns: WebhookInfo: The webhook username, avatar, guild id, etc. """ @@ -1190,64 +822,12 @@ def get_data_from_hook_url(hook_name: str, hook_url: str) -> WebhookInfo: return our_hook -@app.get("/settings", response_class=HTMLResponse) -async def get_settings( - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): - """Settings page. - - Args: - request: The request object. - reader: The Reader instance. - - Returns: - HTMLResponse: The settings page. - """ - # Get the global default update interval - global_interval: int = 60 # Default to 60 minutes if not set - global_update_config = reader.get_tag((), ".reader.update", None) - if isinstance(global_update_config, dict) and "interval" in global_update_config: - interval_value = global_update_config["interval"] - if isinstance(interval_value, int): - global_interval = interval_value - - # Get all feeds with their intervals - feeds: Iterable[Feed] = reader.get_feeds() - feed_intervals = [] - for feed in feeds: - feed_interval: int | None = None - feed_update_config = reader.get_tag(feed, ".reader.update", None) - if isinstance(feed_update_config, dict) and "interval" in feed_update_config: - interval_value = feed_update_config["interval"] - if isinstance(interval_value, int): - feed_interval = interval_value - - feed_intervals.append({ - "feed": feed, - "interval": feed_interval, - "effective_interval": feed_interval or global_interval, - "domain": extract_domain(feed.url), - }) - - context = { - "request": request, - "global_interval": global_interval, - "feed_intervals": feed_intervals, - } - return templates.TemplateResponse(request=request, name="settings.html", context=context) - - @app.get("/webhooks", response_class=HTMLResponse) -async def get_webhooks( - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def get_webhooks(request: Request): """Page for adding a new webhook. Args: request: The request object. - reader: The Reader instance. Returns: HTMLResponse: The add webhook page. @@ -1268,241 +848,137 @@ async def get_webhooks( @app.get("/", response_class=HTMLResponse) -def get_index( - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], - message: str = "", -): +def get_index(request: Request): """This is the root of the website. Args: request: The request object. - message: Optional message to display to the user. - reader: The Reader instance. Returns: HTMLResponse: The index page. """ - return templates.TemplateResponse( - request=request, - name="index.html", - context=make_context_index(request, message, reader), - ) + return templates.TemplateResponse(request=request, name="index.html", context=make_context_index(request)) -def make_context_index(request: Request, message: str = "", reader: Reader | None = None): +def make_context_index(request: Request): """Create the needed context for the index page. Args: request: The request object. - message: Optional message to display to the user. - reader: The Reader instance. Returns: dict: The context for the index page. """ - effective_reader: Reader = reader or get_reader_dependency() - hooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(effective_reader.get_tag((), "webhooks", []))) + hooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(reader.get_tag((), "webhooks", []))) - feed_list: list[dict[str, JSONType | Feed | str]] = [] - broken_feeds: list[Feed] = [] - feeds_without_attached_webhook: list[Feed] = [] + feed_list = [] + broken_feeds = [] + feeds_without_attached_webhook = [] # Get all feeds and organize them - feeds: Iterable[Feed] = effective_reader.get_feeds() + feeds: Iterable[Feed] = reader.get_feeds() for feed in feeds: - webhook: str = str(effective_reader.get_tag(feed.url, "webhook", "")) - if not webhook: + try: + webhook = reader.get_tag(feed.url, "webhook") + feed_list.append({"feed": feed, "webhook": webhook, "domain": extract_domain(feed.url)}) + except TagNotFoundError: broken_feeds.append(feed) continue - feed_list.append({"feed": feed, "webhook": webhook, "domain": extract_domain(feed.url)}) - - webhook_list: list[str] = [hook["url"] for hook in hooks] + webhook_list = [hook["url"] for hook in hooks] if webhook not in webhook_list: feeds_without_attached_webhook.append(feed) return { "request": request, "feeds": feed_list, - "feed_count": effective_reader.get_feed_counts(), - "entry_count": effective_reader.get_entry_counts(), + "feed_count": reader.get_feed_counts(), + "entry_count": reader.get_entry_counts(), "webhooks": hooks, "broken_feeds": broken_feeds, "feeds_without_attached_webhook": feeds_without_attached_webhook, - "messages": message or None, } @app.post("/remove", response_class=HTMLResponse) -async def remove_feed( - feed_url: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def remove_feed(feed_url: Annotated[str, Form()]): """Get a feed by URL. Args: feed_url: The feed to add. - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to the index page. Raises: HTTPException: Feed not found + + Returns: + RedirectResponse: Redirect to the index page. """ try: reader.delete_feed(urllib.parse.unquote(feed_url)) except FeedNotFoundError as e: raise HTTPException(status_code=404, detail="Feed not found") from e - commit_state_change(reader, f"Remove feed {urllib.parse.unquote(feed_url)}") - return RedirectResponse(url="/", status_code=303) -@app.get("/update", response_class=HTMLResponse) -async def update_feed( - request: Request, - feed_url: str, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): - """Update a feed. - - Args: - request: The request object. - feed_url: The feed URL to update. - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to the feed page. - - Raises: - HTTPException: If the feed is not found. - """ - try: - reader.update_feed(urllib.parse.unquote(feed_url)) - except FeedNotFoundError as e: - raise HTTPException(status_code=404, detail="Feed not found") from e - - logger.info("Manually updated feed: %s", feed_url) - return RedirectResponse(url="/feed?feed_url=" + urllib.parse.quote(feed_url), status_code=303) - - -@app.post("/backup") -async def manual_backup( - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], -) -> RedirectResponse: - """Manually trigger a git backup of the current state. - - Args: - request: The request object. - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to the index page with a success or error message. - """ - backup_path = get_backup_path() - if backup_path is None: - message = "Git backup is not configured. Set GIT_BACKUP_PATH environment variable to enable backups." - logger.warning("Manual git backup attempted but GIT_BACKUP_PATH is not configured") - return RedirectResponse(url=f"/?message={urllib.parse.quote(message)}", status_code=303) - - try: - commit_state_change(reader, "Manual backup triggered from web UI") - message = "Successfully created git backup!" - logger.info("Manual git backup completed successfully") - except Exception as e: - message = f"Failed to create git backup: {e}" - logger.exception("Manual git backup failed") - - return RedirectResponse(url=f"/?message={urllib.parse.quote(message)}", status_code=303) - - @app.get("/search", response_class=HTMLResponse) -async def search( - request: Request, - query: str, - reader: Annotated[Reader, Depends(get_reader_dependency)], -): +async def search(request: Request, query: str): """Get entries matching a full-text search query. Args: query: The query to search for. request: The request object. - reader: The Reader instance. Returns: HTMLResponse: The search page. """ reader.update_search() - context = create_search_context(query, reader=reader) - return templates.TemplateResponse(request=request, name="search.html", context={"request": request, **context}) + + context = { + "request": request, + "search_html": create_html_for_search_results(query), + "query": query, + "search_amount": reader.search_entry_counts(query), + } + return templates.TemplateResponse(request=request, name="search.html", context=context) @app.get("/post_entry", response_class=HTMLResponse) -async def post_entry( - entry_id: str, - reader: Annotated[Reader, Depends(get_reader_dependency)], - feed_url: str = "", -): +async def post_entry(entry_id: str): """Send single entry to Discord. Args: entry_id: The entry to send. - feed_url: Optional feed URL used to disambiguate entries with identical IDs. - reader: The Reader instance. Returns: RedirectResponse: Redirect to the feed page. """ unquoted_entry_id: str = urllib.parse.unquote(entry_id) - clean_feed_url: str = urllib.parse.unquote(feed_url.strip()) if feed_url else "" - - # Prefer feed-scoped lookup when feed_url is provided. This avoids ambiguity when - # multiple feeds contain entries with the same ID. - entry: Entry | None = None - if clean_feed_url: - entry = next( - (entry for entry in reader.get_entries(feed=clean_feed_url) if entry.id == unquoted_entry_id), - None, - ) - else: - entry = next((entry for entry in reader.get_entries() if entry.id == unquoted_entry_id), None) - + entry: Entry | None = next((entry for entry in reader.get_entries() if entry.id == unquoted_entry_id), None) if entry is None: return HTMLResponse(status_code=404, content=f"Entry '{entry_id}' not found.") - if result := send_entry_to_discord(entry=entry, reader=reader): + if result := send_entry_to_discord(entry=entry): return result # Redirect to the feed page. - redirect_feed_url: str = entry.feed.url.strip() - return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(redirect_feed_url)}", status_code=303) + clean_feed_url: str = entry.feed.url.strip() + return RedirectResponse(url=f"/feed?feed_url={urllib.parse.quote(clean_feed_url)}", status_code=303) @app.post("/modify_webhook", response_class=HTMLResponse) -def modify_webhook( - old_hook: Annotated[str, Form()], - new_hook: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], - redirect_to: Annotated[str, Form()] = "", -): +def modify_webhook(old_hook: Annotated[str, Form()], new_hook: Annotated[str, Form()]): """Modify a webhook. Args: old_hook: The webhook to modify. new_hook: The new webhook. - redirect_to: Optional redirect URL after the update. - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to the webhook page. Raises: HTTPException: Webhook could not be modified. + Returns: + RedirectResponse: Redirect to the webhook page. """ # Get current webhooks from the database if they exist otherwise use an empty list. webhooks = list(reader.get_tag((), "webhooks", [])) @@ -1510,20 +986,15 @@ def modify_webhook( # Webhooks are stored as a list of dictionaries. # Example: [{"name": "webhook_name", "url": "webhook_url"}] webhooks = cast("list[dict[str, str]]", webhooks) - old_hook_clean: str = old_hook.strip() - new_hook_clean: str = new_hook.strip() - webhook_modified: bool = False for hook in webhooks: - if hook["url"] in old_hook_clean: - hook["url"] = new_hook_clean + if hook["url"] in old_hook.strip(): + hook["url"] = new_hook.strip() # Check if it has been modified. - if hook["url"] != new_hook_clean: + if hook["url"] != new_hook.strip(): raise HTTPException(status_code=500, detail="Webhook could not be modified") - webhook_modified = True - # Add our new list of webhooks to the database. reader.set_tag((), "webhooks", webhooks) # pyright: ignore[reportArgumentType] @@ -1531,21 +1002,16 @@ def modify_webhook( # matches the old one. feeds: Iterable[Feed] = reader.get_feeds() for feed in feeds: - webhook: str = str(reader.get_tag(feed, "webhook", "")) + try: + webhook = reader.get_tag(feed, "webhook") + except TagNotFoundError: + continue - if webhook == old_hook_clean: - reader.set_tag(feed.url, "webhook", new_hook_clean) # pyright: ignore[reportArgumentType] + if webhook == old_hook.strip(): + reader.set_tag(feed.url, "webhook", new_hook.strip()) # pyright: ignore[reportArgumentType] - if webhook_modified and old_hook_clean != new_hook_clean: - commit_state_change(reader, f"Modify webhook URL from {old_hook_clean} to {new_hook_clean}") - - redirect_url: str = redirect_to.strip() or "/webhooks" - if redirect_to: - redirect_url = redirect_url.replace(urllib.parse.quote(old_hook_clean), urllib.parse.quote(new_hook_clean)) - redirect_url = redirect_url.replace(old_hook_clean, new_hook_clean) - - # Redirect to the requested page. - return RedirectResponse(url=redirect_url, status_code=303) + # Redirect to the webhook page. + return RedirectResponse(url="/webhooks", status_code=303) def extract_youtube_video_id(url: str) -> str | None: @@ -1562,477 +1028,15 @@ def extract_youtube_video_id(url: str) -> str | None: # Handle standard YouTube URLs (youtube.com/watch?v=VIDEO_ID) if "youtube.com/watch" in url and "v=" in url: - return url.split("v=")[1].split("&", maxsplit=1)[0] + return url.split("v=")[1].split("&")[0] # Handle shortened YouTube URLs (youtu.be/VIDEO_ID) if "youtu.be/" in url: - return url.split("youtu.be/")[1].split("?", maxsplit=1)[0] + return url.split("youtu.be/")[1].split("?")[0] return None -def resolve_final_feed_url(url: str) -> tuple[str, str | None]: - """Resolve a feed URL by following redirects. - - Args: - url: The feed URL to resolve. - - Returns: - tuple[str, str | None]: A tuple with (resolved_url, error_message). - error_message is None when resolution succeeded. - """ - clean_url: str = url.strip() - if not clean_url: - return "", "URL is empty" - - if not is_url_valid(clean_url): - return clean_url, "URL is invalid" - - try: - response: Response = httpx.get(clean_url, follow_redirects=True, timeout=10.0) - except httpx.HTTPError as e: - return clean_url, str(e) - - if not response.is_success: - return clean_url, f"HTTP {response.status_code}" - - return str(response.url), None - - -def create_webhook_feed_url_preview( - webhook_feeds: list[Feed], - replace_from: str, - replace_to: str, - resolve_urls: bool, # noqa: FBT001 - force_update: bool = False, # noqa: FBT001, FBT002 - existing_feed_urls: set[str] | None = None, -) -> list[dict[str, str | bool | None]]: - """Create preview rows for bulk feed URL replacement. - - Args: - webhook_feeds: Feeds attached to a webhook. - replace_from: Text to replace in each URL. - replace_to: Replacement text. - resolve_urls: Whether to resolve resulting URLs via HTTP redirects. - force_update: Whether conflicts should be marked as force-overwritable. - existing_feed_urls: Optional set of all tracked feed URLs used for conflict detection. - - Returns: - list[dict[str, str | bool | None]]: Rows used in the preview table. - """ - known_feed_urls: set[str] = existing_feed_urls or {feed.url for feed in webhook_feeds} - preview_rows: list[dict[str, str | bool | None]] = [] - for feed in webhook_feeds: - old_url: str = feed.url - has_match: bool = bool(replace_from and replace_from in old_url) - - candidate_url: str = old_url - if has_match: - candidate_url = old_url.replace(replace_from, replace_to) - - resolved_url: str = candidate_url - resolution_error: str | None = None - if has_match and candidate_url != old_url and resolve_urls: - resolved_url, resolution_error = resolve_final_feed_url(candidate_url) - - will_force_ignore_errors: bool = bool( - force_update and bool(resolution_error) and has_match and old_url != candidate_url, - ) - - target_exists: bool = bool( - has_match and not resolution_error and resolved_url != old_url and resolved_url in known_feed_urls, - ) - will_force_overwrite: bool = bool(target_exists and force_update) - will_change: bool = bool( - has_match - and old_url != (candidate_url if will_force_ignore_errors else resolved_url) - and (not target_exists or will_force_overwrite) - and (not resolution_error or will_force_ignore_errors), - ) - - preview_rows.append({ - "old_url": old_url, - "candidate_url": candidate_url, - "resolved_url": resolved_url, - "has_match": has_match, - "will_change": will_change, - "target_exists": target_exists, - "will_force_overwrite": will_force_overwrite, - "will_force_ignore_errors": will_force_ignore_errors, - "resolution_error": resolution_error, - }) - - return preview_rows - - -def build_webhook_mass_update_context( - webhook_feeds: list[Feed], - all_feeds: list[Feed], - replace_from: str, - replace_to: str, - resolve_urls: bool, # noqa: FBT001 - force_update: bool = False, # noqa: FBT001, FBT002 -) -> dict[str, str | bool | int | list[dict[str, str | bool | None]] | dict[str, int]]: - """Build context data used by the webhook mass URL update preview UI. - - Args: - webhook_feeds: Feeds attached to the selected webhook. - all_feeds: All tracked feeds. - replace_from: Text to replace in URLs. - replace_to: Replacement text. - resolve_urls: Whether to resolve resulting URLs. - force_update: Whether to allow overwriting existing target URLs. - - Returns: - dict[str, ...]: Context values for rendering preview controls and table. - """ - clean_replace_from: str = replace_from.strip() - clean_replace_to: str = replace_to.strip() - - preview_rows: list[dict[str, str | bool | None]] = [] - if clean_replace_from: - preview_rows = create_webhook_feed_url_preview( - webhook_feeds=webhook_feeds, - replace_from=clean_replace_from, - replace_to=clean_replace_to, - resolve_urls=resolve_urls, - force_update=force_update, - existing_feed_urls={feed.url for feed in all_feeds}, - ) - - preview_summary: dict[str, int] = { - "total": len(preview_rows), - "matched": sum(1 for row in preview_rows if row["has_match"]), - "will_update": sum(1 for row in preview_rows if row["will_change"]), - "conflicts": sum(1 for row in preview_rows if row["target_exists"] and not row["will_force_overwrite"]), - "force_overwrite": sum(1 for row in preview_rows if row["will_force_overwrite"]), - "force_ignore_errors": sum(1 for row in preview_rows if row["will_force_ignore_errors"]), - "resolve_errors": sum(1 for row in preview_rows if row["resolution_error"]), - } - preview_summary["no_match"] = preview_summary["total"] - preview_summary["matched"] - preview_summary["no_change"] = sum( - 1 for row in preview_rows if row["has_match"] and not row["resolution_error"] and not row["will_change"] - ) - - return { - "replace_from": clean_replace_from, - "replace_to": clean_replace_to, - "resolve_urls": resolve_urls, - "force_update": force_update, - "preview_rows": preview_rows, - "preview_summary": preview_summary, - "preview_change_count": preview_summary["will_update"], - } - - -@app.get("/webhook_entries_mass_update_preview", response_class=HTMLResponse) -async def get_webhook_entries_mass_update_preview( - webhook_url: str, - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], - replace_from: str = "", - replace_to: str = "", - resolve_urls: bool = True, # noqa: FBT001, FBT002 - force_update: bool = False, # noqa: FBT001, FBT002 -) -> HTMLResponse: - """Render the mass-update preview fragment for a webhook using HTMX. - - Args: - webhook_url: Webhook URL whose feeds are being updated. - request: The request object. - reader: The Reader instance. - replace_from: Text to find in URLs. - replace_to: Replacement text. - resolve_urls: Whether to resolve resulting URLs. - force_update: Whether to allow overwriting existing target URLs. - - Returns: - HTMLResponse: Rendered partial template containing summary + preview table. - """ - clean_webhook_url: str = urllib.parse.unquote(webhook_url.strip()) - all_feeds: list[Feed] = list(reader.get_feeds()) - webhook_feeds: list[Feed] = [ - feed for feed in all_feeds if str(reader.get_tag(feed.url, "webhook", "")) == clean_webhook_url - ] - - context = { - "request": request, - "webhook_url": clean_webhook_url, - **build_webhook_mass_update_context( - webhook_feeds=webhook_feeds, - all_feeds=all_feeds, - replace_from=replace_from, - replace_to=replace_to, - resolve_urls=resolve_urls, - force_update=force_update, - ), - } - return templates.TemplateResponse(request=request, name="_webhook_mass_update_preview.html", context=context) - - -@app.get("/webhook_entries", response_class=HTMLResponse) -async def get_webhook_entries( # noqa: C901, PLR0914 - webhook_url: str, - request: Request, - reader: Annotated[Reader, Depends(get_reader_dependency)], - starting_after: str = "", - replace_from: str = "", - replace_to: str = "", - resolve_urls: bool = True, # noqa: FBT001, FBT002 - force_update: bool = False, # noqa: FBT001, FBT002 - message: str = "", -) -> HTMLResponse: - """Get all latest entries from all feeds for a specific webhook. - - Args: - webhook_url: The webhook URL to get entries for. - request: The request object. - starting_after: The entry to start after. Used for pagination. - replace_from: Optional URL substring to find for bulk URL replacement preview. - replace_to: Optional replacement substring used in bulk URL replacement preview. - resolve_urls: Whether to resolve replaced URLs by following redirects. - force_update: Whether to allow overwriting existing target URLs during apply. - message: Optional status message shown in the UI. - reader: The Reader instance. - - Returns: - HTMLResponse: The webhook entries page. - - Raises: - HTTPException: If no feeds are found for this webhook or webhook doesn't exist. - """ - entries_per_page: int = 20 - clean_webhook_url: str = urllib.parse.unquote(webhook_url.strip()) - - # Get the webhook name from the webhooks list - webhooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(reader.get_tag((), "webhooks", []))) - webhook_name: str = "" - for hook in webhooks: - if hook["url"] == clean_webhook_url: - webhook_name = hook["name"] - break - - if not webhook_name: - raise HTTPException(status_code=404, detail=f"Webhook not found: {clean_webhook_url}") - - hook_info: WebhookInfo = get_data_from_hook_url(hook_name=webhook_name, hook_url=clean_webhook_url) - - # Get all feeds associated with this webhook - all_feeds: list[Feed] = list(reader.get_feeds()) - webhook_feeds: list[Feed] = [] - - for feed in all_feeds: - feed_webhook: str = str(reader.get_tag(feed.url, "webhook", "")) - if feed_webhook == clean_webhook_url: - webhook_feeds.append(feed) - - # Get all entries from all feeds for this webhook, sorted by published date - all_entries: list[Entry] = [entry for feed in webhook_feeds for entry in reader.get_entries(feed=feed)] - - # Sort entries by published date (newest first), with undated entries last. - all_entries.sort( - key=lambda e: ( - e.published is not None, - e.published or datetime.min.replace(tzinfo=UTC), - ), - reverse=True, - ) - - # Handle pagination - if starting_after: - try: - start_after_entry: Entry | None = reader.get_entry(( - starting_after.split("|", maxsplit=1)[0], - starting_after.split("|")[1], - )) - except (FeedNotFoundError, EntryNotFoundError): - start_after_entry = None - else: - start_after_entry = None - - # Find the index of the starting entry - start_index: int = 0 - if start_after_entry: - for idx, entry in enumerate(all_entries): - if entry.id == start_after_entry.id and entry.feed.url == start_after_entry.feed.url: - start_index = idx + 1 - break - - # Get the page of entries - paginated_entries: list[Entry] = all_entries[start_index : start_index + entries_per_page] - - # Get the last entry for pagination - last_entry: Entry | None = None - if paginated_entries: - last_entry = paginated_entries[-1] - - # Create the html for the entries - html: str = create_html_for_feed(reader=reader, entries=paginated_entries) - - mass_update_context = build_webhook_mass_update_context( - webhook_feeds=webhook_feeds, - all_feeds=all_feeds, - replace_from=replace_from, - replace_to=replace_to, - resolve_urls=resolve_urls, - force_update=force_update, - ) - - # Check if there are more entries available - total_entries: int = len(all_entries) - is_show_more_entries_button_visible: bool = (start_index + entries_per_page) < total_entries - - context = { - "request": request, - "hook_info": hook_info, - "webhook_name": webhook_name, - "webhook_url": clean_webhook_url, - "webhook_feeds": webhook_feeds, - "entries": paginated_entries, - "html": html, - "last_entry": last_entry, - "is_show_more_entries_button_visible": is_show_more_entries_button_visible, - "total_entries": total_entries, - "feeds_count": len(webhook_feeds), - "message": urllib.parse.unquote(message) if message else "", - **mass_update_context, - } - return templates.TemplateResponse(request=request, name="webhook_entries.html", context=context) - - -@app.post("/bulk_change_feed_urls", response_class=HTMLResponse) -async def post_bulk_change_feed_urls( # noqa: C901, PLR0914, PLR0912, PLR0915 - webhook_url: Annotated[str, Form()], - replace_from: Annotated[str, Form()], - reader: Annotated[Reader, Depends(get_reader_dependency)], - replace_to: Annotated[str, Form()] = "", - resolve_urls: Annotated[bool, Form()] = True, # noqa: FBT002 - force_update: Annotated[bool, Form()] = False, # noqa: FBT002 -) -> RedirectResponse: - """Bulk-change feed URLs attached to a webhook. - - Args: - webhook_url: The webhook URL whose feeds should be updated. - replace_from: Text to find in each URL. - replace_to: Text to replace with. - resolve_urls: Whether to resolve resulting URLs via redirects. - force_update: Whether existing target feed URLs should be overwritten. - reader: The Reader instance. - - Returns: - RedirectResponse: Redirect to webhook detail with status message. - - Raises: - HTTPException: If webhook is missing or replace_from is empty. - """ - clean_webhook_url: str = urllib.parse.unquote(webhook_url.strip()) - clean_replace_from: str = replace_from.strip() - clean_replace_to: str = replace_to.strip() - - if not clean_replace_from: - raise HTTPException(status_code=400, detail="replace_from cannot be empty") - - webhooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(reader.get_tag((), "webhooks", []))) - if not any(hook["url"] == clean_webhook_url for hook in webhooks): - raise HTTPException(status_code=404, detail=f"Webhook not found: {clean_webhook_url}") - - all_feeds: list[Feed] = list(reader.get_feeds()) - webhook_feeds: list[Feed] = [] - for feed in all_feeds: - feed_webhook: str = str(reader.get_tag(feed.url, "webhook", "")) - if feed_webhook == clean_webhook_url: - webhook_feeds.append(feed) - - preview_rows: list[dict[str, str | bool | None]] = create_webhook_feed_url_preview( - webhook_feeds=webhook_feeds, - replace_from=clean_replace_from, - replace_to=clean_replace_to, - resolve_urls=resolve_urls, - force_update=force_update, - existing_feed_urls={feed.url for feed in all_feeds}, - ) - - changed_count: int = 0 - skipped_count: int = 0 - failed_count: int = 0 - conflict_count: int = 0 - force_overwrite_count: int = 0 - - for row in preview_rows: - if not row["has_match"]: - continue - - if row["resolution_error"] and not force_update: - skipped_count += 1 - continue - - if row["target_exists"] and not force_update: - conflict_count += 1 - skipped_count += 1 - continue - - old_url: str = str(row["old_url"]) - new_url: str = str(row["candidate_url"] if row["will_force_ignore_errors"] else row["resolved_url"]) - - if old_url == new_url: - skipped_count += 1 - continue - - if row["target_exists"] and force_update: - try: - reader.delete_feed(new_url) - force_overwrite_count += 1 - except FeedNotFoundError: - pass - except ReaderError: - failed_count += 1 - continue - - try: - reader.change_feed_url(old_url, new_url) - except FeedExistsError: - skipped_count += 1 - continue - except FeedNotFoundError: - skipped_count += 1 - continue - except ReaderError: - failed_count += 1 - continue - - try: - reader.update_feed(new_url) - except Exception: - logger.exception("Failed to update feed after URL change: %s", new_url) - - for entry in reader.get_entries(feed=new_url, read=False): - try: - reader.set_entry_read(entry, True) - except Exception: - logger.exception("Failed to mark entry as read after URL change: %s", entry.id) - - changed_count += 1 - - if changed_count > 0: - commit_state_change( - reader, - f"Bulk change {changed_count} feed URL(s) for webhook {clean_webhook_url}", - ) - - status_message: str = ( - f"Updated {changed_count} feed URL(s). " - f"Force overwrote {force_overwrite_count}. " - f"Conflicts {conflict_count}. " - f"Skipped {skipped_count}. " - f"Failed {failed_count}." - ) - redirect_url: str = ( - f"/webhook_entries?webhook_url={urllib.parse.quote(clean_webhook_url)}" - f"&message={urllib.parse.quote(status_message)}" - ) - return RedirectResponse(url=redirect_url, status_code=303) - - if __name__ == "__main__": sentry_sdk.init( dsn="https://6e77a0d7acb9c7ea22e85a375e0ff1f4@o4505228040339456.ingest.us.sentry.io/4508792887967744", @@ -2043,9 +1047,9 @@ if __name__ == "__main__": uvicorn.run( "main:app", - log_level="debug", + log_level="info", host="0.0.0.0", # noqa: S104 - port=3000, + port=5000, proxy_headers=True, forwarded_allow_ips="*", ) diff --git a/discord_rss_bot/missing_tags.py b/discord_rss_bot/missing_tags.py new file mode 100644 index 0000000..84f375e --- /dev/null +++ b/discord_rss_bot/missing_tags.py @@ -0,0 +1,106 @@ +from __future__ import annotations + +from reader import Feed, Reader, TagNotFoundError + +from discord_rss_bot.settings import default_custom_embed, default_custom_message + + +def add_custom_message(reader: Reader, feed: Feed) -> None: + """Add the custom message tag to the feed if it doesn't exist. + + Args: + reader: What Reader to use. + feed: The feed to add the tag to. + """ + try: + reader.get_tag(feed, "custom_message") + except TagNotFoundError: + reader.set_tag(feed.url, "custom_message", default_custom_message) # pyright: ignore[reportArgumentType] + reader.set_tag(feed.url, "has_custom_message", True) # pyright: ignore[reportArgumentType] + + +def add_has_custom_message(reader: Reader, feed: Feed) -> None: + """Add the has_custom_message tag to the feed if it doesn't exist. + + Args: + reader: What Reader to use. + feed: The feed to add the tag to. + """ + try: + reader.get_tag(feed, "has_custom_message") + except TagNotFoundError: + if reader.get_tag(feed, "custom_message") == default_custom_message: + reader.set_tag(feed.url, "has_custom_message", False) # pyright: ignore[reportArgumentType] + else: + reader.set_tag(feed.url, "has_custom_message", True) # pyright: ignore[reportArgumentType] + + +def add_if_embed(reader: Reader, feed: Feed) -> None: + """Add the if_embed tag to the feed if it doesn't exist. + + Args: + reader: What Reader to use. + feed: The feed to add the tag to. + """ + try: + reader.get_tag(feed, "if_embed") + except TagNotFoundError: + reader.set_tag(feed.url, "if_embed", True) # pyright: ignore[reportArgumentType] + + +def add_custom_embed(reader: Reader, feed: Feed) -> None: + """Add the custom embed tag to the feed if it doesn't exist. + + Args: + reader: What Reader to use. + feed: The feed to add the tag to. + """ + try: + reader.get_tag(feed, "embed") + except TagNotFoundError: + reader.set_tag(feed.url, "embed", default_custom_embed) # pyright: ignore[reportArgumentType] + reader.set_tag(feed.url, "has_custom_embed", True) # pyright: ignore[reportArgumentType] + + +def add_has_custom_embed(reader: Reader, feed: Feed) -> None: + """Add the has_custom_embed tag to the feed if it doesn't exist. + + Args: + reader: What Reader to use. + feed: The feed to add the tag to. + """ + try: + reader.get_tag(feed, "has_custom_embed") + except TagNotFoundError: + if reader.get_tag(feed, "embed") == default_custom_embed: + reader.set_tag(feed.url, "has_custom_embed", False) # pyright: ignore[reportArgumentType] + else: + reader.set_tag(feed.url, "has_custom_embed", True) # pyright: ignore[reportArgumentType] + + +def add_should_send_embed(reader: Reader, feed: Feed) -> None: + """Add the should_send_embed tag to the feed if it doesn't exist. + + Args: + reader: What Reader to use. + feed: The feed to add the tag to. + """ + try: + reader.get_tag(feed, "should_send_embed") + except TagNotFoundError: + reader.set_tag(feed.url, "should_send_embed", True) # pyright: ignore[reportArgumentType] + + +def add_missing_tags(reader: Reader) -> None: + """Add missing tags to feeds. + + Args: + reader: What Reader to use. + """ + for feed in reader.get_feeds(): + add_custom_message(reader, feed) + add_has_custom_message(reader, feed) + add_if_embed(reader, feed) + add_custom_embed(reader, feed) + add_has_custom_embed(reader, feed) + add_should_send_embed(reader, feed) diff --git a/discord_rss_bot/search.py b/discord_rss_bot/search.py index 85129ac..c81b398 100644 --- a/discord_rss_bot/search.py +++ b/discord_rss_bot/search.py @@ -3,78 +3,66 @@ from __future__ import annotations import urllib.parse from typing import TYPE_CHECKING +from discord_rss_bot.settings import get_reader + if TYPE_CHECKING: from collections.abc import Iterable - from reader import EntrySearchResult - from reader import Feed - from reader import HighlightedString - from reader import Reader + from reader import EntrySearchResult, Feed, HighlightedString, Reader -def create_search_context(query: str, reader: Reader) -> dict: - """Build context for search.html template. +def create_html_for_search_results(query: str, custom_reader: Reader | None = None) -> str: + """Create HTML for the search results. Args: - query (str): The search query. - reader (Reader): Custom Reader instance. + query: Our search query + custom_reader: The reader. If None, we will get the reader from the settings. Returns: - dict: Context dictionary for rendering the search results. + str: The HTML. """ + # TODO(TheLovinator): There is a .content that also contains text, we should use that if .summary is not available. + # TODO(TheLovinator): We should also add tags to the title. + + # Get the default reader if we didn't get a custom one. + reader: Reader = get_reader() if custom_reader is None else custom_reader + search_results: Iterable[EntrySearchResult] = reader.search_entries(query) - results: list[dict] = [] + html: str = "" for result in search_results: - feed: Feed = reader.get_feed(result.feed_url) - feed_url: str = urllib.parse.quote(feed.url) - - # Prefer summary, fall back to content if ".summary" in result.content: - highlighted = result.content[".summary"] - else: - content_keys = [k for k in result.content if k.startswith(".content")] - highlighted = result.content[content_keys[0]] if content_keys else None + result_summary: str = add_span_with_slice(result.content[".summary"]) + feed: Feed = reader.get_feed(result.feed_url) + feed_url: str = urllib.parse.quote(feed.url) - summary: str = add_spans(highlighted) if highlighted else "(no preview available)" + html += f""" +
+ +

{result.metadata[".title"]}

+
+ {result_summary} +
+ """ - results.append({ - "title": add_spans(result.metadata.get(".title")), - "summary": summary, - "feed_url": feed_url, - }) - - return { - "query": query, - "search_amount": {"total": len(results)}, - "results": results, - } + return html -def add_spans(highlighted_string: HighlightedString | None) -> str: - """Wrap all highlighted parts with tags. +def add_span_with_slice(highlighted_string: HighlightedString) -> str: + """Add span tags to the string to highlight the search results. Args: - highlighted_string (HighlightedString | None): The highlighted string to process. + highlighted_string: The highlighted string. Returns: - str: The processed string with tags around highlighted parts. + str: The string with added tags. """ - if highlighted_string is None: - return "" - - value: str = highlighted_string.value - parts: list[str] = [] - last_index = 0 + # TODO(TheLovinator): We are looping through the highlights and only using the last one. We should use all of them. + before_span, span_part, after_span = "", "", "" for txt_slice in highlighted_string.highlights: - parts.extend(( - value[last_index : txt_slice.start], - f"{value[txt_slice.start : txt_slice.stop]}", - )) - last_index = txt_slice.stop + before_span: str = f"{highlighted_string.value[: txt_slice.start]}" + span_part: str = f"{highlighted_string.value[txt_slice.start : txt_slice.stop]}" + after_span: str = f"{highlighted_string.value[txt_slice.stop :]}" - # add any trailing text - parts.append(value[last_index:]) - - return "".join(parts) + return f"{before_span}{span_part}{after_span}" diff --git a/discord_rss_bot/settings.py b/discord_rss_bot/settings.py index 194bf08..d730b10 100644 --- a/discord_rss_bot/settings.py +++ b/discord_rss_bot/settings.py @@ -1,23 +1,16 @@ from __future__ import annotations -import os import typing from functools import lru_cache from pathlib import Path from platformdirs import user_data_dir -from reader import Reader -from reader import make_reader +from reader import Reader, make_reader if typing.TYPE_CHECKING: from reader.types import JSONType -data_dir: str = os.getenv("DISCORD_RSS_BOT_DATA_DIR", "").strip() or user_data_dir( - appname="discord_rss_bot", - appauthor="TheLovinator", - roaming=True, - ensure_exists=True, -) +data_dir: str = user_data_dir(appname="discord_rss_bot", appauthor="TheLovinator", roaming=True, ensure_exists=True) # TODO(TheLovinator): Add default things to the database and make the edible. @@ -45,10 +38,7 @@ def get_reader(custom_location: Path | None = None) -> Reader: reader: Reader = make_reader(url=str(db_location)) # https://reader.readthedocs.io/en/latest/api.html#reader.types.UpdateConfig - # Set the default update interval to 15 minutes if not already configured - # Users can change this via the Settings page or per-feed in the feed page - if reader.get_tag((), ".reader.update", None) is None: - # Set default - reader.set_tag((), ".reader.update", {"interval": 15}) + # Set the update interval to 15 minutes + reader.set_tag((), ".reader.update", {"interval": 15}) return reader diff --git a/discord_rss_bot/static/styles.css b/discord_rss_bot/static/styles.css index 266f951..db0cfba 100644 --- a/discord_rss_bot/static/styles.css +++ b/discord_rss_bot/static/styles.css @@ -13,7 +13,3 @@ body { .form-text { color: #acabab; } - -.interval-input { - max-width: 120px; -} diff --git a/discord_rss_bot/templates/_webhook_mass_update_preview.html b/discord_rss_bot/templates/_webhook_mass_update_preview.html deleted file mode 100644 index a59e97b..0000000 --- a/discord_rss_bot/templates/_webhook_mass_update_preview.html +++ /dev/null @@ -1,73 +0,0 @@ -{% if preview_rows %} -

- {{ preview_change_count }} feed URL{{ 's' if preview_change_count != 1 else '' }} ready to update. -

-
- Total: {{ preview_summary.total }} - Matched: {{ preview_summary.matched }} - Will update: {{ preview_summary.will_update }} - Conflicts: {{ preview_summary.conflicts }} - Force overwrite: {{ preview_summary.force_overwrite }} - Force ignore errors: {{ preview_summary.force_ignore_errors }} - Resolve errors: {{ preview_summary.resolve_errors }} - No change: {{ preview_summary.no_change }} - No match: {{ preview_summary.no_match }} -
-
- - - - - - -
-
- - - - - - - - - - {% for row in preview_rows %} - - - - - - {% endfor %} - -
Old URLNew URLStatus
- {{ row.old_url }} - - {{ row.resolved_url if resolve_urls else row.candidate_url }} - - {% if not row.has_match %} - No match - {% elif row.will_force_ignore_errors %} - Will force update (ignore resolve error) - {% elif row.resolution_error %} - {{ row.resolution_error }} - {% elif row.will_force_overwrite %} - Will force overwrite - {% elif row.target_exists %} - Conflict: target URL exists - {% elif row.will_change %} - Will update - {% else %} - No change - {% endif %} -
-
-{% elif replace_from %} -

No preview rows found for that replacement pattern.

-{% endif %} diff --git a/discord_rss_bot/templates/base.html b/discord_rss_bot/templates/base.html index 9146b35..a8640dd 100644 --- a/discord_rss_bot/templates/base.html +++ b/discord_rss_bot/templates/base.html @@ -1,12 +1,13 @@ + + content="Stay updated with the latest news and events with our easy-to-use RSS bot. Never miss a message or announcement again with real-time notifications directly to your Discord server." /> + content="discord, rss, bot, notifications, announcements, updates, real-time, server, messages, news, events, feed." /> @@ -17,20 +18,19 @@ {% block head %} {% endblock head %} + {% include "nav.html" %}
{% if messages %} - + {% endif %} + {% block content %} {% endblock content %}
@@ -41,20 +41,18 @@
- + diff --git a/discord_rss_bot/templates/custom.html b/discord_rss_bot/templates/custom.html index 48cb3bc..f018d3a 100644 --- a/discord_rss_bot/templates/custom.html +++ b/discord_rss_bot/templates/custom.html @@ -14,90 +14,90 @@
  • You can use \n to create a new line.
  • You can remove the embed from links by adding < and> around the link. (For example < - {% raw %} {{entry_link}} {% endraw %}>) + {% raw %} {{ entry_link }} {% endraw %}>)

  • {% raw %} - {{feed_author}} + {{ feed_author }} {% endraw %} {{ feed.author }}
  • {% raw %} - {{feed_added}} + {{ feed_added }} {% endraw %} {{ feed.added }}
  • {% raw %} - {{feed_last_exception}} + {{ feed_last_exception }} {% endraw %} {{ feed.last_exception }}
  • {% raw %} - {{feed_last_updated}} + {{ feed_last_updated }} {% endraw %} {{ feed.last_updated }}
  • {% raw %} - {{feed_link}} + {{ feed_link }} {% endraw %} {{ feed.link }}
  • {% raw %} - {{feed_subtitle}} + {{ feed_subtitle }} {% endraw %} {{ feed.subtitle }}
  • {% raw %} - {{feed_title}} + {{ feed_title }} {% endraw %} {{ feed.title }}
  • {% raw %} - {{feed_updated}} + {{ feed_updated }} {% endraw %} {{ feed.updated }}
  • {% raw %} - {{feed_updates_enabled}} + {{ feed_updates_enabled }} {% endraw %} {{ feed.updates_enabled }}
  • {% raw %} - {{feed_url}} + {{ feed_url }} {% endraw %} {{ feed.url }}
  • {% raw %} - {{feed_user_title}} + {{ feed_user_title }} {% endraw %} {{ feed.user_title }}
  • {% raw %} - {{feed_version}} + {{ feed_version }} {% endraw %} {{ feed.version }}
  • @@ -106,14 +106,14 @@
  • {% raw %} - {{entry_added}} + {{ entry_added }} {% endraw %} {{ entry.added }}
  • {% raw %} - {{entry_author}} + {{ entry_author }} {% endraw %} {{ entry.author }}
  • @@ -121,14 +121,14 @@
  • {% raw %} - {{entry_content}} + {{ entry_content }} {% endraw %} {{ entry.content[0].value|discord_markdown }}
  • {% raw %} - {{entry_content_raw}} + {{ entry_content_raw }} {% endraw %} {{ entry.content[0].value }}
  • @@ -136,42 +136,42 @@
  • {% raw %} - {{entry_id}} + {{ entry_id }} {% endraw %} {{ entry.id }}
  • {% raw %} - {{entry_important}} + {{ entry_important }} {% endraw %} {{ entry.important }}
  • {% raw %} - {{entry_link}} + {{ entry_link }} {% endraw %} {{ entry.link }}
  • {% raw %} - {{entry_published}} + {{ entry_published }} {% endraw %} {{ entry.published }}
  • {% raw %} - {{entry_read}} + {{ entry_read }} {% endraw %} {{ entry.read }}
  • {% raw %} - {{entry_read_modified}} + {{ entry_read_modified }} {% endraw %} {{ entry.read_modified }}
  • @@ -179,14 +179,14 @@
  • {% raw %} - {{entry_summary}} + {{ entry_summary }} {% endraw %} {{ entry.summary|discord_markdown }}
  • {% raw %} - {{entry_summary_raw}} + {{ entry_summary_raw }} {% endraw %} {{ entry.summary }}
  • @@ -194,21 +194,21 @@
  • {% raw %} - {{entry_title}} + {{ entry_title }} {% endraw %} {{ entry.title }}
  • {% raw %} - {{entry_text}} + {{ entry_text }} {% endraw %} Same as entry_content if it exists, otherwise entry_summary
  • {% raw %} - {{entry_updated}} + {{ entry_updated }} {% endraw %} {{ entry.updated }}
  • @@ -216,7 +216,7 @@
  • {% raw %} - {{image_1}} + {{ image_1 }} {% endraw %} First image in the entry if it exists
  • @@ -226,7 +226,7 @@
  • {% raw %} - {{feed_title}}\n{{entry_content}} + {{ feed_title }}\n{{ entry_content }} {% endraw %}
  • diff --git a/discord_rss_bot/templates/feed.html b/discord_rss_bot/templates/feed.html index eb3e601..ce983ff 100644 --- a/discord_rss_bot/templates/feed.html +++ b/discord_rss_bot/templates/feed.html @@ -1,172 +1,88 @@ {% extends "base.html" %} {% block title %} - | {{ feed.title }} +| {{ feed.title }} {% endblock title %} {% block content %} -
    - -

    - {{ feed.title }} ({{ total_entries }} entries) -

    - {% if not feed.updates_enabled %}Disabled{% endif %} - {% if feed.last_exception %} -
    -
    {{ feed.last_exception.type_name }}:
    - {{ feed.last_exception.value_str }} - -
    -
    {{ feed.last_exception.traceback_str }}
    -
    -
    - {% endif %} - -
    - Update -
    - -
    - {% if not feed.updates_enabled %} -
    - -
    - {% else %} -
    - -
    - {% endif %} - {% if not "youtube.com/feeds/videos.xml" in feed.url %} - {% if should_send_embed %} -
    - -
    - {% else %} -
    - -
    - {% endif %} - {% endif %} -
    - - - -
    -
    Feed URL
    -
    - -
    - - -
    -
    -
    - -
    -
    Feed Information
    -
    -
    - Added: {{ feed.added | relative_time }} -
    -
    - Last Updated: {{ feed.last_updated | relative_time }} -
    -
    - Last Retrieved: {{ feed.last_retrieved | relative_time }} -
    -
    - Next Update: {{ feed.update_after | relative_time }} -
    -
    - Updates: {{ 'Enabled' if feed.updates_enabled else 'Disabled' }} -
    -
    -
    - -
    -
    - Update Interval - {% if feed_interval %} - Custom - {% else %} - Using global default - {% endif %} - -
    -
    - Current: - {% if feed_interval %} - {{ feed_interval }} - {% if feed_interval >= 60 %}({{ (feed_interval / 60) | round(1) }} hours){% endif %} - {% else %} - {{ global_interval }} - {% if global_interval >= 60 %}({{ (global_interval / 60) | round(1) }} hours){% endif %} - {% endif %} - minutes -
    - - - -
    - {% if feed_interval %} -
    - - -
    - {% endif %} +
    + +

    + {{ feed.title }} ({{ total_entries }} entries) +

    + {% if not feed.updates_enabled %} + Disabled + {% endif %} + + {% if feed.last_exception %} +
    +
    {{ feed.last_exception.type_name }}:
    + {{ feed.last_exception.value_str }} + +
    +
    {{ feed.last_exception.traceback_str }}
    + {% endif %} + + +
    +
    + +
    + + {% if not feed.updates_enabled %} +
    + +
    + {% else %} +
    + +
    + {% endif %} + + {% if not "youtube.com/feeds/videos.xml" in feed.url %} + {% if should_send_embed %} +
    + +
    + {% else %} +
    + +
    + {% endif %} + {% endif %} +
    + + +
    + {# Rendered HTML content #}
    {{ html|safe }}
    -{% if is_show_more_entries_button_visible %} - - Show more entries - + +{% if show_more_entires_button %} + + Show more entries + {% endif %} + {% endblock content %} diff --git a/discord_rss_bot/templates/index.html b/discord_rss_bot/templates/index.html index 341ec38..f9dfc0d 100644 --- a/discord_rss_bot/templates/index.html +++ b/discord_rss_bot/templates/index.html @@ -1,155 +1,154 @@ {% extends "base.html" %} {% block content %} - -
    + {% endfor %} + {% else %} +

    + Hello there! +
    +
    + You need to add a webhook here to get started. After that, you can + add feeds here. You can find both of these links in the navigation bar + above. +
    +
    + If you have any questions or suggestions, feel free to contact me on tlovinator@gmail.com or TheLovinator#9276 on Discord. +
    +
    + Thanks! +

    + {% endif %} + + + {% if broken_feeds %} + + {% endif %} + + + {% if feeds_without_attached_webhook %} + + {% endif %} + {% endblock content %} diff --git a/discord_rss_bot/templates/nav.html b/discord_rss_bot/templates/nav.html index 7442554..8b9ee37 100644 --- a/discord_rss_bot/templates/nav.html +++ b/discord_rss_bot/templates/nav.html @@ -1,9 +1,6 @@