Refactor tag retrieval to use default values and remove missing_tags module
This commit is contained in:
parent
8805da33b6
commit
dfa6ea48e5
7 changed files with 63 additions and 228 deletions
|
|
@ -5,18 +5,20 @@ import json
|
||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
from bs4 import BeautifulSoup
|
from bs4 import BeautifulSoup
|
||||||
from bs4 import Tag
|
from bs4 import Tag
|
||||||
from markdownify import markdownify
|
from markdownify import markdownify
|
||||||
from reader import Entry
|
|
||||||
from reader import Feed
|
|
||||||
from reader import Reader
|
|
||||||
from reader import TagNotFoundError
|
|
||||||
|
|
||||||
from discord_rss_bot.is_url_valid import is_url_valid
|
from discord_rss_bot.is_url_valid import is_url_valid
|
||||||
from discord_rss_bot.settings import get_reader
|
from discord_rss_bot.settings import get_reader
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from reader import Entry
|
||||||
|
from reader import Feed
|
||||||
|
from reader import Reader
|
||||||
|
|
||||||
logger: logging.Logger = logging.getLogger(__name__)
|
logger: logging.Logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
DISCORD_TIMESTAMP_TAG_RE: re.Pattern[str] = re.compile(r"<t:\d+(?::[tTdDfFrRsS])?>")
|
DISCORD_TIMESTAMP_TAG_RE: re.Pattern[str] = re.compile(r"<t:\d+(?::[tTdDfFrRsS])?>")
|
||||||
|
|
@ -342,9 +344,7 @@ def get_custom_message(custom_reader: Reader, feed: Feed) -> str:
|
||||||
Returns the contents from the custom_message tag.
|
Returns the contents from the custom_message tag.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
custom_message: str = str(custom_reader.get_tag(feed, "custom_message"))
|
custom_message: str = str(custom_reader.get_tag(feed, "custom_message", ""))
|
||||||
except TagNotFoundError:
|
|
||||||
custom_message = ""
|
|
||||||
except ValueError:
|
except ValueError:
|
||||||
custom_message = ""
|
custom_message = ""
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,6 @@ from reader import FeedNotFoundError
|
||||||
from reader import Reader
|
from reader import Reader
|
||||||
from reader import ReaderError
|
from reader import ReaderError
|
||||||
from reader import StorageError
|
from reader import StorageError
|
||||||
from reader import TagNotFoundError
|
|
||||||
|
|
||||||
from discord_rss_bot.custom_message import CustomEmbed
|
from discord_rss_bot.custom_message import CustomEmbed
|
||||||
from discord_rss_bot.custom_message import get_custom_message
|
from discord_rss_bot.custom_message import get_custom_message
|
||||||
|
|
@ -37,7 +36,6 @@ from discord_rss_bot.hoyolab_api import extract_post_id_from_hoyolab_url
|
||||||
from discord_rss_bot.hoyolab_api import fetch_hoyolab_post
|
from discord_rss_bot.hoyolab_api import fetch_hoyolab_post
|
||||||
from discord_rss_bot.hoyolab_api import is_c3kay_feed
|
from discord_rss_bot.hoyolab_api import is_c3kay_feed
|
||||||
from discord_rss_bot.is_url_valid import is_url_valid
|
from discord_rss_bot.is_url_valid import is_url_valid
|
||||||
from discord_rss_bot.missing_tags import add_missing_tags
|
|
||||||
from discord_rss_bot.settings import default_custom_message
|
from discord_rss_bot.settings import default_custom_message
|
||||||
from discord_rss_bot.settings import get_reader
|
from discord_rss_bot.settings import get_reader
|
||||||
|
|
||||||
|
|
@ -98,7 +96,7 @@ def extract_domain(url: str) -> str: # noqa: PLR0911
|
||||||
return "Other"
|
return "Other"
|
||||||
|
|
||||||
|
|
||||||
def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> str | None: # noqa: C901, PLR0912
|
def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> str | None: # noqa: C901
|
||||||
"""Send a single entry to Discord.
|
"""Send a single entry to Discord.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
|
@ -149,10 +147,7 @@ def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) ->
|
||||||
|
|
||||||
# Create the webhook.
|
# Create the webhook.
|
||||||
try:
|
try:
|
||||||
should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed"))
|
should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed", True))
|
||||||
except TagNotFoundError:
|
|
||||||
logger.exception("No should_send_embed tag found for feed: %s", entry.feed.url)
|
|
||||||
should_send_embed = True
|
|
||||||
except StorageError:
|
except StorageError:
|
||||||
logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url)
|
logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url)
|
||||||
should_send_embed = True
|
should_send_embed = True
|
||||||
|
|
@ -316,13 +311,14 @@ def get_webhook_url(reader: Reader, entry: Entry) -> str:
|
||||||
str: The webhook URL.
|
str: The webhook URL.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook"))
|
webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook", ""))
|
||||||
except TagNotFoundError:
|
|
||||||
logger.exception("No webhook URL found for feed: %s", entry.feed.url)
|
|
||||||
return ""
|
|
||||||
except StorageError:
|
except StorageError:
|
||||||
logger.exception("Storage error getting webhook URL for feed: %s", entry.feed.url)
|
logger.exception("Storage error getting webhook URL for feed: %s", entry.feed.url)
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
|
if not webhook_url:
|
||||||
|
logger.error("No webhook URL found for feed: %s", entry.feed.url)
|
||||||
|
return ""
|
||||||
return webhook_url
|
return webhook_url
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -493,10 +489,7 @@ def should_send_embed_check(reader: Reader, entry: Entry) -> bool:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed"))
|
should_send_embed = bool(reader.get_tag(entry.feed, "should_send_embed", True))
|
||||||
except TagNotFoundError:
|
|
||||||
logger.exception("No should_send_embed tag found for feed: %s", entry.feed.url)
|
|
||||||
should_send_embed = True
|
|
||||||
except ReaderError:
|
except ReaderError:
|
||||||
logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url)
|
logger.exception("Error getting should_send_embed tag for feed: %s", entry.feed.url)
|
||||||
should_send_embed = True
|
should_send_embed = True
|
||||||
|
|
@ -551,9 +544,7 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None:
|
||||||
reader.add_feed(clean_feed_url)
|
reader.add_feed(clean_feed_url)
|
||||||
except FeedExistsError:
|
except FeedExistsError:
|
||||||
# Add the webhook to an already added feed if it doesn't have a webhook instead of trying to create a new.
|
# Add the webhook to an already added feed if it doesn't have a webhook instead of trying to create a new.
|
||||||
try:
|
if not reader.get_tag(clean_feed_url, "webhook", ""):
|
||||||
reader.get_tag(clean_feed_url, "webhook")
|
|
||||||
except TagNotFoundError:
|
|
||||||
reader.set_tag(clean_feed_url, "webhook", webhook_url) # pyright: ignore[reportArgumentType]
|
reader.set_tag(clean_feed_url, "webhook", webhook_url) # pyright: ignore[reportArgumentType]
|
||||||
except ReaderError as e:
|
except ReaderError as e:
|
||||||
raise HTTPException(status_code=404, detail=f"Error adding feed: {e}") from e
|
raise HTTPException(status_code=404, detail=f"Error adding feed: {e}") from e
|
||||||
|
|
@ -580,5 +571,3 @@ def create_feed(reader: Reader, feed_url: str, webhook_dropdown: str) -> None:
|
||||||
|
|
||||||
# Update the full-text search index so our new feed is searchable.
|
# Update the full-text search index so our new feed is searchable.
|
||||||
reader.update_search()
|
reader.update_search()
|
||||||
|
|
||||||
add_missing_tags(reader)
|
|
||||||
|
|
|
||||||
|
|
@ -30,8 +30,6 @@ from pathlib import Path
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from reader import TagNotFoundError
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from reader import Reader
|
from reader import Reader
|
||||||
|
|
||||||
|
|
@ -176,21 +174,15 @@ def export_state(reader: Reader, backup_path: Path) -> None:
|
||||||
logger.exception("Failed to read tag '%s' for feed '%s' during state export", tag, feed.url)
|
logger.exception("Failed to read tag '%s' for feed '%s' during state export", tag, feed.url)
|
||||||
feeds_state.append(feed_data)
|
feeds_state.append(feed_data)
|
||||||
|
|
||||||
try:
|
|
||||||
webhooks: list[str | int | float | bool | dict[str, Any] | list[Any] | None] = list(
|
webhooks: list[str | int | float | bool | dict[str, Any] | list[Any] | None] = list(
|
||||||
reader.get_tag((), "webhooks", []),
|
reader.get_tag((), "webhooks", []),
|
||||||
)
|
)
|
||||||
except TagNotFoundError:
|
|
||||||
webhooks = []
|
|
||||||
|
|
||||||
# Export global update interval if set
|
# Export global update interval if set
|
||||||
global_update_interval: dict[str, Any] | None = None
|
global_update_interval: dict[str, Any] | None = None
|
||||||
try:
|
|
||||||
global_update_config = reader.get_tag((), ".reader.update", None)
|
global_update_config = reader.get_tag((), ".reader.update", None)
|
||||||
if isinstance(global_update_config, dict):
|
if isinstance(global_update_config, dict):
|
||||||
global_update_interval = global_update_config
|
global_update_interval = global_update_config
|
||||||
except TagNotFoundError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
state: dict = {"feeds": feeds_state, "webhooks": webhooks}
|
state: dict = {"feeds": feeds_state, "webhooks": webhooks}
|
||||||
if global_update_interval is not None:
|
if global_update_interval is not None:
|
||||||
|
|
|
||||||
|
|
@ -54,7 +54,6 @@ from discord_rss_bot.feeds import send_entry_to_discord
|
||||||
from discord_rss_bot.feeds import send_to_discord
|
from discord_rss_bot.feeds import send_to_discord
|
||||||
from discord_rss_bot.git_backup import commit_state_change
|
from discord_rss_bot.git_backup import commit_state_change
|
||||||
from discord_rss_bot.git_backup import get_backup_path
|
from discord_rss_bot.git_backup import get_backup_path
|
||||||
from discord_rss_bot.missing_tags import add_missing_tags
|
|
||||||
from discord_rss_bot.search import create_search_context
|
from discord_rss_bot.search import create_search_context
|
||||||
from discord_rss_bot.settings import get_reader
|
from discord_rss_bot.settings import get_reader
|
||||||
|
|
||||||
|
|
@ -157,7 +156,6 @@ def relative_time(dt: datetime | None) -> str:
|
||||||
async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
|
async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
|
||||||
"""Lifespan function for the FastAPI app."""
|
"""Lifespan function for the FastAPI app."""
|
||||||
reader: Reader = get_reader()
|
reader: Reader = get_reader()
|
||||||
add_missing_tags(reader)
|
|
||||||
scheduler: AsyncIOScheduler = AsyncIOScheduler(timezone=UTC)
|
scheduler: AsyncIOScheduler = AsyncIOScheduler(timezone=UTC)
|
||||||
scheduler.add_job(
|
scheduler.add_job(
|
||||||
func=send_to_discord,
|
func=send_to_discord,
|
||||||
|
|
@ -944,24 +942,18 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
|
||||||
|
|
||||||
# Get feed and global intervals for error case too
|
# Get feed and global intervals for error case too
|
||||||
feed_interval: int | None = None
|
feed_interval: int | None = None
|
||||||
try:
|
feed_update_config = reader.get_tag(feed, ".reader.update", None)
|
||||||
feed_update_config = reader.get_tag(feed, ".reader.update")
|
|
||||||
if isinstance(feed_update_config, dict) and "interval" in feed_update_config:
|
if isinstance(feed_update_config, dict) and "interval" in feed_update_config:
|
||||||
interval_value = feed_update_config["interval"]
|
interval_value = feed_update_config["interval"]
|
||||||
if isinstance(interval_value, int):
|
if isinstance(interval_value, int):
|
||||||
feed_interval = interval_value
|
feed_interval = interval_value
|
||||||
except TagNotFoundError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
global_interval: int = 60
|
global_interval: int = 60
|
||||||
try:
|
global_update_config = reader.get_tag((), ".reader.update", None)
|
||||||
global_update_config = reader.get_tag((), ".reader.update")
|
|
||||||
if isinstance(global_update_config, dict) and "interval" in global_update_config:
|
if isinstance(global_update_config, dict) and "interval" in global_update_config:
|
||||||
interval_value = global_update_config["interval"]
|
interval_value = global_update_config["interval"]
|
||||||
if isinstance(interval_value, int):
|
if isinstance(interval_value, int):
|
||||||
global_interval = interval_value
|
global_interval = interval_value
|
||||||
except TagNotFoundError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
context = {
|
context = {
|
||||||
"request": request,
|
"request": request,
|
||||||
|
|
@ -998,34 +990,23 @@ async def get_feed( # noqa: C901, PLR0912, PLR0914, PLR0915
|
||||||
# Create the html for the entries.
|
# Create the html for the entries.
|
||||||
html: str = create_html_for_feed(entries, clean_feed_url)
|
html: str = create_html_for_feed(entries, clean_feed_url)
|
||||||
|
|
||||||
try:
|
should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed", True))
|
||||||
should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed"))
|
|
||||||
except TagNotFoundError:
|
|
||||||
add_missing_tags(reader)
|
|
||||||
should_send_embed: bool = bool(reader.get_tag(feed, "should_send_embed"))
|
|
||||||
|
|
||||||
# Get the update interval for this feed
|
# Get the update interval for this feed
|
||||||
feed_interval: int | None = None
|
feed_interval: int | None = None
|
||||||
try:
|
feed_update_config = reader.get_tag(feed, ".reader.update", None)
|
||||||
feed_update_config = reader.get_tag(feed, ".reader.update")
|
|
||||||
if isinstance(feed_update_config, dict) and "interval" in feed_update_config:
|
if isinstance(feed_update_config, dict) and "interval" in feed_update_config:
|
||||||
interval_value = feed_update_config["interval"]
|
interval_value = feed_update_config["interval"]
|
||||||
if isinstance(interval_value, int):
|
if isinstance(interval_value, int):
|
||||||
feed_interval = interval_value
|
feed_interval = interval_value
|
||||||
except TagNotFoundError:
|
|
||||||
# No custom interval set for this feed, will use global default
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Get the global default update interval
|
# Get the global default update interval
|
||||||
global_interval: int = 60 # Default to 60 minutes if not set
|
global_interval: int = 60 # Default to 60 minutes if not set
|
||||||
try:
|
global_update_config = reader.get_tag((), ".reader.update", None)
|
||||||
global_update_config = reader.get_tag((), ".reader.update")
|
|
||||||
if isinstance(global_update_config, dict) and "interval" in global_update_config:
|
if isinstance(global_update_config, dict) and "interval" in global_update_config:
|
||||||
interval_value = global_update_config["interval"]
|
interval_value = global_update_config["interval"]
|
||||||
if isinstance(interval_value, int):
|
if isinstance(interval_value, int):
|
||||||
global_interval = interval_value
|
global_interval = interval_value
|
||||||
except TagNotFoundError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
context = {
|
context = {
|
||||||
"request": request,
|
"request": request,
|
||||||
|
|
@ -1202,28 +1183,22 @@ async def get_settings(
|
||||||
"""
|
"""
|
||||||
# Get the global default update interval
|
# Get the global default update interval
|
||||||
global_interval: int = 60 # Default to 60 minutes if not set
|
global_interval: int = 60 # Default to 60 minutes if not set
|
||||||
try:
|
global_update_config = reader.get_tag((), ".reader.update", None)
|
||||||
global_update_config = reader.get_tag((), ".reader.update")
|
|
||||||
if isinstance(global_update_config, dict) and "interval" in global_update_config:
|
if isinstance(global_update_config, dict) and "interval" in global_update_config:
|
||||||
interval_value = global_update_config["interval"]
|
interval_value = global_update_config["interval"]
|
||||||
if isinstance(interval_value, int):
|
if isinstance(interval_value, int):
|
||||||
global_interval = interval_value
|
global_interval = interval_value
|
||||||
except TagNotFoundError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Get all feeds with their intervals
|
# Get all feeds with their intervals
|
||||||
feeds: Iterable[Feed] = reader.get_feeds()
|
feeds: Iterable[Feed] = reader.get_feeds()
|
||||||
feed_intervals = []
|
feed_intervals = []
|
||||||
for feed in feeds:
|
for feed in feeds:
|
||||||
feed_interval: int | None = None
|
feed_interval: int | None = None
|
||||||
try:
|
feed_update_config = reader.get_tag(feed, ".reader.update", None)
|
||||||
feed_update_config = reader.get_tag(feed, ".reader.update")
|
|
||||||
if isinstance(feed_update_config, dict) and "interval" in feed_update_config:
|
if isinstance(feed_update_config, dict) and "interval" in feed_update_config:
|
||||||
interval_value = feed_update_config["interval"]
|
interval_value = feed_update_config["interval"]
|
||||||
if isinstance(interval_value, int):
|
if isinstance(interval_value, int):
|
||||||
feed_interval = interval_value
|
feed_interval = interval_value
|
||||||
except TagNotFoundError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
feed_intervals.append({
|
feed_intervals.append({
|
||||||
"feed": feed,
|
"feed": feed,
|
||||||
|
|
@ -1313,13 +1288,13 @@ def make_context_index(request: Request, message: str = "", reader: Reader | Non
|
||||||
# Get all feeds and organize them
|
# Get all feeds and organize them
|
||||||
feeds: Iterable[Feed] = effective_reader.get_feeds()
|
feeds: Iterable[Feed] = effective_reader.get_feeds()
|
||||||
for feed in feeds:
|
for feed in feeds:
|
||||||
try:
|
webhook: str = str(effective_reader.get_tag(feed.url, "webhook", ""))
|
||||||
webhook: JSONType = effective_reader.get_tag(feed.url, "webhook")
|
if not webhook:
|
||||||
feed_list.append({"feed": feed, "webhook": webhook, "domain": extract_domain(feed.url)})
|
|
||||||
except TagNotFoundError:
|
|
||||||
broken_feeds.append(feed)
|
broken_feeds.append(feed)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
feed_list.append({"feed": feed, "webhook": webhook, "domain": extract_domain(feed.url)})
|
||||||
|
|
||||||
webhook_list: list[str] = [hook["url"] for hook in hooks]
|
webhook_list: list[str] = [hook["url"] for hook in hooks]
|
||||||
if webhook not in webhook_list:
|
if webhook not in webhook_list:
|
||||||
feeds_without_attached_webhook.append(feed)
|
feeds_without_attached_webhook.append(feed)
|
||||||
|
|
@ -1512,10 +1487,7 @@ def modify_webhook(
|
||||||
# matches the old one.
|
# matches the old one.
|
||||||
feeds: Iterable[Feed] = reader.get_feeds()
|
feeds: Iterable[Feed] = reader.get_feeds()
|
||||||
for feed in feeds:
|
for feed in feeds:
|
||||||
try:
|
webhook: str = str(reader.get_tag(feed, "webhook", ""))
|
||||||
webhook = reader.get_tag(feed, "webhook")
|
|
||||||
except TagNotFoundError:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if webhook == old_hook.strip():
|
if webhook == old_hook.strip():
|
||||||
reader.set_tag(feed.url, "webhook", new_hook.strip()) # pyright: ignore[reportArgumentType]
|
reader.set_tag(feed.url, "webhook", new_hook.strip()) # pyright: ignore[reportArgumentType]
|
||||||
|
|
@ -1548,7 +1520,7 @@ def extract_youtube_video_id(url: str) -> str | None:
|
||||||
|
|
||||||
|
|
||||||
@app.get("/webhook_entries", response_class=HTMLResponse)
|
@app.get("/webhook_entries", response_class=HTMLResponse)
|
||||||
async def get_webhook_entries( # noqa: C901, PLR0912, PLR0914
|
async def get_webhook_entries( # noqa: C901, PLR0914
|
||||||
webhook_url: str,
|
webhook_url: str,
|
||||||
request: Request,
|
request: Request,
|
||||||
reader: Annotated[Reader, Depends(get_reader_dependency)],
|
reader: Annotated[Reader, Depends(get_reader_dependency)],
|
||||||
|
|
@ -1587,12 +1559,9 @@ async def get_webhook_entries( # noqa: C901, PLR0912, PLR0914
|
||||||
webhook_feeds: list[Feed] = []
|
webhook_feeds: list[Feed] = []
|
||||||
|
|
||||||
for feed in all_feeds:
|
for feed in all_feeds:
|
||||||
try:
|
|
||||||
feed_webhook: str = str(reader.get_tag(feed.url, "webhook", ""))
|
feed_webhook: str = str(reader.get_tag(feed.url, "webhook", ""))
|
||||||
if feed_webhook == clean_webhook_url:
|
if feed_webhook == clean_webhook_url:
|
||||||
webhook_feeds.append(feed)
|
webhook_feeds.append(feed)
|
||||||
except TagNotFoundError:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Get all entries from all feeds for this webhook, sorted by published date
|
# Get all entries from all feeds for this webhook, sorted by published date
|
||||||
all_entries: list[Entry] = [entry for feed in webhook_feeds for entry in reader.get_entries(feed=feed)]
|
all_entries: list[Entry] = [entry for feed in webhook_feeds for entry in reader.get_entries(feed=feed)]
|
||||||
|
|
|
||||||
|
|
@ -1,109 +0,0 @@
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from reader import Feed
|
|
||||||
from reader import Reader
|
|
||||||
from reader import TagNotFoundError
|
|
||||||
|
|
||||||
from discord_rss_bot.settings import default_custom_embed
|
|
||||||
from discord_rss_bot.settings import default_custom_message
|
|
||||||
|
|
||||||
|
|
||||||
def add_custom_message(reader: Reader, feed: Feed) -> None:
|
|
||||||
"""Add the custom message tag to the feed if it doesn't exist.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
reader: What Reader to use.
|
|
||||||
feed: The feed to add the tag to.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
reader.get_tag(feed, "custom_message")
|
|
||||||
except TagNotFoundError:
|
|
||||||
reader.set_tag(feed.url, "custom_message", default_custom_message) # pyright: ignore[reportArgumentType]
|
|
||||||
reader.set_tag(feed.url, "has_custom_message", True) # pyright: ignore[reportArgumentType]
|
|
||||||
|
|
||||||
|
|
||||||
def add_has_custom_message(reader: Reader, feed: Feed) -> None:
|
|
||||||
"""Add the has_custom_message tag to the feed if it doesn't exist.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
reader: What Reader to use.
|
|
||||||
feed: The feed to add the tag to.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
reader.get_tag(feed, "has_custom_message")
|
|
||||||
except TagNotFoundError:
|
|
||||||
if reader.get_tag(feed, "custom_message") == default_custom_message:
|
|
||||||
reader.set_tag(feed.url, "has_custom_message", False) # pyright: ignore[reportArgumentType]
|
|
||||||
else:
|
|
||||||
reader.set_tag(feed.url, "has_custom_message", True) # pyright: ignore[reportArgumentType]
|
|
||||||
|
|
||||||
|
|
||||||
def add_if_embed(reader: Reader, feed: Feed) -> None:
|
|
||||||
"""Add the if_embed tag to the feed if it doesn't exist.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
reader: What Reader to use.
|
|
||||||
feed: The feed to add the tag to.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
reader.get_tag(feed, "if_embed")
|
|
||||||
except TagNotFoundError:
|
|
||||||
reader.set_tag(feed.url, "if_embed", True) # pyright: ignore[reportArgumentType]
|
|
||||||
|
|
||||||
|
|
||||||
def add_custom_embed(reader: Reader, feed: Feed) -> None:
|
|
||||||
"""Add the custom embed tag to the feed if it doesn't exist.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
reader: What Reader to use.
|
|
||||||
feed: The feed to add the tag to.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
reader.get_tag(feed, "embed")
|
|
||||||
except TagNotFoundError:
|
|
||||||
reader.set_tag(feed.url, "embed", default_custom_embed) # pyright: ignore[reportArgumentType]
|
|
||||||
reader.set_tag(feed.url, "has_custom_embed", True) # pyright: ignore[reportArgumentType]
|
|
||||||
|
|
||||||
|
|
||||||
def add_has_custom_embed(reader: Reader, feed: Feed) -> None:
|
|
||||||
"""Add the has_custom_embed tag to the feed if it doesn't exist.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
reader: What Reader to use.
|
|
||||||
feed: The feed to add the tag to.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
reader.get_tag(feed, "has_custom_embed")
|
|
||||||
except TagNotFoundError:
|
|
||||||
if reader.get_tag(feed, "embed") == default_custom_embed:
|
|
||||||
reader.set_tag(feed.url, "has_custom_embed", False) # pyright: ignore[reportArgumentType]
|
|
||||||
else:
|
|
||||||
reader.set_tag(feed.url, "has_custom_embed", True) # pyright: ignore[reportArgumentType]
|
|
||||||
|
|
||||||
|
|
||||||
def add_should_send_embed(reader: Reader, feed: Feed) -> None:
|
|
||||||
"""Add the should_send_embed tag to the feed if it doesn't exist.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
reader: What Reader to use.
|
|
||||||
feed: The feed to add the tag to.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
reader.get_tag(feed, "should_send_embed")
|
|
||||||
except TagNotFoundError:
|
|
||||||
reader.set_tag(feed.url, "should_send_embed", True) # pyright: ignore[reportArgumentType]
|
|
||||||
|
|
||||||
|
|
||||||
def add_missing_tags(reader: Reader) -> None:
|
|
||||||
"""Add missing tags to feeds.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
reader: What Reader to use.
|
|
||||||
"""
|
|
||||||
for feed in reader.get_feeds():
|
|
||||||
add_custom_message(reader, feed)
|
|
||||||
add_has_custom_message(reader, feed)
|
|
||||||
add_if_embed(reader, feed)
|
|
||||||
add_custom_embed(reader, feed)
|
|
||||||
add_has_custom_embed(reader, feed)
|
|
||||||
add_should_send_embed(reader, feed)
|
|
||||||
|
|
@ -7,7 +7,6 @@ from pathlib import Path
|
||||||
|
|
||||||
from platformdirs import user_data_dir
|
from platformdirs import user_data_dir
|
||||||
from reader import Reader
|
from reader import Reader
|
||||||
from reader import TagNotFoundError
|
|
||||||
from reader import make_reader
|
from reader import make_reader
|
||||||
|
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
|
|
@ -48,9 +47,7 @@ def get_reader(custom_location: Path | None = None) -> Reader:
|
||||||
# https://reader.readthedocs.io/en/latest/api.html#reader.types.UpdateConfig
|
# https://reader.readthedocs.io/en/latest/api.html#reader.types.UpdateConfig
|
||||||
# Set the default update interval to 15 minutes if not already configured
|
# Set the default update interval to 15 minutes if not already configured
|
||||||
# Users can change this via the Settings page or per-feed in the feed page
|
# Users can change this via the Settings page or per-feed in the feed page
|
||||||
try:
|
if reader.get_tag((), ".reader.update", None) is None:
|
||||||
reader.get_tag((), ".reader.update")
|
|
||||||
except TagNotFoundError:
|
|
||||||
# Set default
|
# Set default
|
||||||
reader.set_tag((), ".reader.update", {"interval": 15})
|
reader.set_tag((), ".reader.update", {"interval": 15})
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,6 @@ from discord_rss_bot.feeds import send_entry_to_discord
|
||||||
from discord_rss_bot.feeds import send_to_discord
|
from discord_rss_bot.feeds import send_to_discord
|
||||||
from discord_rss_bot.feeds import should_send_embed_check
|
from discord_rss_bot.feeds import should_send_embed_check
|
||||||
from discord_rss_bot.feeds import truncate_webhook_message
|
from discord_rss_bot.feeds import truncate_webhook_message
|
||||||
from discord_rss_bot.missing_tags import add_missing_tags
|
|
||||||
|
|
||||||
|
|
||||||
def test_send_to_discord() -> None:
|
def test_send_to_discord() -> None:
|
||||||
|
|
@ -35,8 +34,6 @@ def test_send_to_discord() -> None:
|
||||||
# Add a feed to the reader.
|
# Add a feed to the reader.
|
||||||
reader.add_feed("https://www.reddit.com/r/Python/.rss")
|
reader.add_feed("https://www.reddit.com/r/Python/.rss")
|
||||||
|
|
||||||
add_missing_tags(reader)
|
|
||||||
|
|
||||||
# Update the feed to get the entries.
|
# Update the feed to get the entries.
|
||||||
reader.update_feeds()
|
reader.update_feeds()
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue