Remove logging to speed up the program

This commit is contained in:
2023-01-20 08:13:04 +01:00
parent 519066649d
commit c48b2b1bf6
9 changed files with 2 additions and 272 deletions

View File

@ -1,82 +0,0 @@
from apscheduler.events import (
EVENT_ALL_JOBS_REMOVED,
EVENT_EXECUTOR_ADDED,
EVENT_EXECUTOR_REMOVED,
EVENT_JOB_ADDED,
EVENT_JOB_ERROR,
EVENT_JOB_EXECUTED,
EVENT_JOB_MAX_INSTANCES,
EVENT_JOB_MISSED,
EVENT_JOB_MODIFIED,
EVENT_JOB_REMOVED,
EVENT_JOB_SUBMITTED,
EVENT_JOBSTORE_ADDED,
EVENT_JOBSTORE_REMOVED,
EVENT_SCHEDULER_PAUSED,
EVENT_SCHEDULER_RESUMED,
EVENT_SCHEDULER_SHUTDOWN,
EVENT_SCHEDULER_START,
)
from loguru import logger
def my_listener(event) -> None:
"""
EVENT_SCHEDULER_START = 1
EVENT_SCHEDULER_SHUTDOWN = 2
EVENT_SCHEDULER_PAUSED = 4
EVENT_SCHEDULER_RESUMED = 8
EVENT_EXECUTOR_ADDED = 16
EVENT_EXECUTOR_REMOVED = 32
EVENT_JOBSTORE_ADDED = 64
EVENT_JOBSTORE_REMOVED = 128
EVENT_ALL_JOBS_REMOVED = 256
EVENT_JOB_ADDED = 512
EVENT_JOB_REMOVED = 1024
EVENT_JOB_MODIFIED = 2048
EVENT_JOB_EXECUTED = 4096
EVENT_JOB_ERROR = 8192
EVENT_JOB_MISSED = 16384
EVENT_JOB_SUBMITTED = 32768
EVENT_JOB_MAX_INSTANCES = 65536
"""
event_code: int = event.code
if event_code == EVENT_SCHEDULER_START:
logger.info("The scheduler was started")
if event_code == EVENT_SCHEDULER_SHUTDOWN:
logger.info("The scheduler was shut down")
if event_code == EVENT_SCHEDULER_PAUSED:
logger.debug("Job processing in the scheduler was paused")
if event_code == EVENT_SCHEDULER_RESUMED:
logger.debug("Job processing in the scheduler was resumed")
if event_code == EVENT_EXECUTOR_ADDED:
logger.debug("An executor was added to the scheduler")
if event_code == EVENT_EXECUTOR_REMOVED:
logger.debug("An executor was removed to the scheduler")
if event_code == EVENT_JOBSTORE_ADDED:
logger.debug("A job store was added to the scheduler")
if event_code == EVENT_JOBSTORE_REMOVED:
logger.debug("A job store was removed from the scheduler")
if event_code == EVENT_ALL_JOBS_REMOVED:
logger.debug("All jobs were removed from either all job stores or one particular job store")
if event_code == EVENT_JOB_ADDED:
logger.debug("A job was added to a job store")
if event_code == EVENT_JOB_REMOVED:
logger.debug("A job was removed from a job store")
if event_code == EVENT_JOB_MODIFIED:
logger.debug("A job was modified from outside the scheduler")
if event_code == EVENT_JOB_SUBMITTED:
logger.debug("A job was submitted to its executor to be run")
if event_code == EVENT_JOB_MAX_INSTANCES:
logger.error(
"A job being submitted to its executor was not accepted by"
" the executor because the job has already reached"
" its maximum concurrently executing instances"
)
if event_code == EVENT_JOB_EXECUTED:
logger.debug("A job was executed successfully")
if event_code == EVENT_JOB_ERROR:
logger.error("A job raised an exception during execution")
if event_code == EVENT_JOB_MISSED:
logger.error("A job's execution was missed")

View File

@ -2,7 +2,6 @@ import urllib.parse
from functools import lru_cache
import html2text
from loguru import logger
from reader import Entry, Reader
from discord_rss_bot.filter.blacklist import has_black_tags, should_be_skipped
@ -26,11 +25,7 @@ def encode_url(url_to_quote: str) -> str:
Returns:
The encoded url.
"""
if url_to_quote:
return urllib.parse.quote(url_to_quote)
logger.error("URL to quote is None.")
return ""
return urllib.parse.quote(url_to_quote) if url_to_quote else ""
def entry_is_whitelisted(entry_to_check: Entry) -> bool:
@ -44,7 +39,6 @@ def entry_is_whitelisted(entry_to_check: Entry) -> bool:
bool: True if the feed is whitelisted, False otherwise.
"""
logger.debug(f"Checking if {entry_to_check.title} is whitelisted.")
return bool(has_white_tags(reader, entry_to_check.feed) and should_be_sent(reader, entry_to_check))
@ -59,14 +53,12 @@ def entry_is_blacklisted(entry_to_check: Entry) -> bool:
bool: True if the feed is blacklisted, False otherwise.
"""
logger.debug(f"Checking if {entry_to_check.title} is blacklisted.")
return bool(has_black_tags(reader, entry_to_check.feed) and should_be_skipped(reader, entry_to_check))
@lru_cache()
def convert_to_md(thing: str) -> str:
"""Discord does not support tables so we need to remove them from the markdown."""
logger.debug(f"Converting {thing} to markdown.")
text_maker: html2text.HTML2Text = html2text.HTML2Text()
# Ignore tables

View File

@ -1,7 +1,6 @@
import re
from functools import lru_cache
from loguru import logger
from reader import Entry, Feed, Reader, TagNotFoundError
from discord_rss_bot.custom_filters import convert_to_md
@ -22,10 +21,8 @@ def get_images_from_entry(entry: Entry, summary: bool = False) -> list[str]:
image_regex = r"!\[(.*)\]\((.*)\)"
if summary:
logger.debug("Getting images from summary.")
return re.findall(image_regex, convert_to_md(entry.summary)) if entry.summary else []
logger.debug("Getting images from content.")
return re.findall(image_regex, convert_to_md(entry.content[0].value)) if entry.content else []
@ -43,18 +40,14 @@ def try_to_replace(custom_message: str, template: str, replace_with: str) -> str
Returns the custom_message with the tag replaced.
"""
if not template:
logger.debug("template is empty. Returning custom_message.")
return custom_message
if not replace_with:
logger.debug("replace_with is empty. Returning custom_message.")
return custom_message
try:
logger.debug(f"Replacing {template} with {replace_with}.")
return custom_message.replace(template, replace_with)
except TypeError:
logger.debug(f"TypeError: {template} or {replace_with} is not a string.")
return custom_message
@ -68,7 +61,6 @@ def remove_image_tags(message: str) -> str:
Returns:
Returns the message with the image tags removed.
"""
logger.debug(f"Removing image tags from message {message}.")
return re.sub(r"!\[(.*)\]\((.*)\)", "", message)
@ -88,24 +80,19 @@ def replace_tags(feed: Feed, entry: Entry) -> str:
summary = ""
content = ""
if entry.summary:
logger.debug(f"Entry summary: {entry.summary}")
summary: str = entry.summary
summary = remove_image_tags(message=summary)
if entry.content:
for content_item in entry.content:
logger.debug(f"Entry content: {content_item.value}")
content: str = content_item.value
content = remove_image_tags(message=content)
if images := get_images_from_entry(entry=entry):
first_image: str = images[0][1]
else:
logger.debug("No images found.")
first_image = ""
logger.debug(f"First image: {first_image}")
list_of_replacements = [
{"{{feed_author}}": feed.author},
{"{{feed_added}}": feed.added},
@ -139,7 +126,6 @@ def replace_tags(feed: Feed, entry: Entry) -> str:
for replacement in list_of_replacements:
for template, replace_with in replacement.items():
custom_message = try_to_replace(custom_message, template, replace_with)
logger.debug(f"custom_message: {custom_message}")
return custom_message
@ -161,5 +147,4 @@ def get_custom_message(custom_reader: Reader, feed: Feed) -> str:
except ValueError:
custom_message = ""
logger.debug(f"custom_message: {custom_message}")
return custom_message

View File

@ -1,7 +1,6 @@
from typing import Iterable
from discord_webhook import DiscordWebhook
from loguru import logger
from reader import Entry, Feed, Reader
from requests import Response
@ -27,7 +26,6 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non
"""
# Get the default reader if we didn't get a custom one.
reader: Reader = get_reader() if custom_reader is None else custom_reader
logger.info(f"Sending to Discord. Reader: {reader}, feed: {feed}, do_once: {do_once}.")
# Check for new entries for every feed.
reader.update_feeds()
@ -46,46 +44,37 @@ def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = Non
# Get the webhook URL for the entry. If it is None, we will continue to the next entry.
webhook_url: str = settings.get_webhook_for_entry(reader, entry)
if not webhook_url:
logger.error(f"Could not find webhook for entry {entry.title}.")
continue
# If the user has set the custom message to an empty string, we will use the default message, otherwise we will
# use the custom message.
if custom_message.get_custom_message(reader, entry.feed) != "":
webhook_message = custom_message.replace_tags(entry=entry, feed=entry.feed) # type: ignore
logger.info(f"Using custom message for {entry.title}.")
else:
webhook_message: str = default_custom_message
logger.info(f"Using default message for {entry.title}.")
# Create the webhook.
webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True)
# Check if the feed has a whitelist, and if it does, check if the entry is whitelisted.
if feed is not None and has_white_tags(reader, feed):
logger.info(f"Feed {feed.title} has a whitelist, checking if entry {entry.title} is whitelisted.")
if should_be_sent(reader, entry):
logger.info(f"Entry {entry.title} is whitelisted, sending to Discord.")
response: Response = webhook.execute()
reader.set_entry_read(entry, True)
if not response.ok:
logger.error(f"Response was not ok, marking entry {entry.title} as unread.")
reader.set_entry_read(entry, False)
else:
logger.info(f"Entry {entry.title} is not whitelisted, skipping.")
reader.set_entry_read(entry, True)
continue
# Check if the entry is blacklisted, if it is, mark it as read and continue.
if should_be_skipped(reader, entry):
logger.info(f"Entry {entry.title} is blacklisted, skipping.")
reader.set_entry_read(entry, True)
continue
# It was not blacklisted, and not forced through whitelist, so we will send it to Discord.
response: Response = webhook.execute()
if not response.ok:
logger.error(f"Response was not ok, marking entry {entry.title} as unread.")
reader.set_entry_read(entry, False)
# If we only want to send one entry, we will break the loop. This is used when testing this function.

View File

@ -1,7 +1,6 @@
import sys
import requests
from loguru import logger
def healthcheck() -> None:
@ -15,7 +14,7 @@ def healthcheck() -> None:
if r.ok:
sys.exit(0)
except requests.exceptions.RequestException as e:
logger.error(f"Healthcheck failed: {e}")
print(f"Healthcheck failed: {e}", file=sys.stderr)
sys.exit(1)

View File

@ -1,89 +0,0 @@
"""Configure handlers and formats for application loggers."""
import logging
import sys
from pprint import pformat
# if you dont like imports of private modules
# you can move it to typing.py module
from loguru import logger
from loguru._defaults import LOGURU_FORMAT
class InterceptHandler(logging.Handler):
"""
Default handler from examples in loguru documentaion.
See https://loguru.readthedocs.io/en/stable/overview.html#entirely-compatible-with-standard-logging
"""
def emit(self, record: logging.LogRecord) -> None:
# Get corresponding Loguru level if it exists
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno
# Find caller from where originated the logged message
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__: # type: ignore
frame = frame.f_back # type: ignore
depth += 1
logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage())
def format_record(record: dict) -> str:
"""
Custom format for loguru loggers.
Uses pformat for log any data like request/response body during debug.
Works with logging if loguru handler it.
Example:
>>> payload = [{"users":[{"name": "Nick", "age": 87, "is_active": True}, {"name": "Alex", "age": 27, "is_active": True}], "count": 2}] # noqa: E501
>>> logger.bind(payload=).debug("users payload")
>>> [ { 'count': 2,
>>> 'users': [ {'age': 87, 'is_active': True, 'name': 'Nick'},
>>> {'age': 27, 'is_active': True, 'name': 'Alex'}]}]
"""
format_string = LOGURU_FORMAT
if record["extra"].get("payload") is not None:
record["extra"]["payload"] = pformat(record["extra"]["payload"], indent=4, compact=True, width=120)
format_string += "\n<level>{extra[payload]}</level>" # type: ignore
format_string += "{exception}\n" # type: ignore
return format_string
def init_logging() -> None:
"""
Replaces logging handlers with a handler for using the custom handler.
WARNING!
if you call the init_logging in startup event function,
then the first logs before the application start will be in the old format
>>> app.add_event_handler("startup", init_logging)
stdout:
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO: Started reloader process [11528] using statreload
INFO: Started server process [6036]
INFO: Waiting for application startup.
2020-07-25 02:19:21.357 | INFO | uvicorn.lifespan.on:startup:34 - Application startup complete.
"""
# disable handlers for specific uvicorn loggers
# to redirect their output to the default uvicorn logger
# works with uvicorn==0.11.6
loggers = (logging.getLogger(name) for name in logging.root.manager.loggerDict if name.startswith("uvicorn."))
for uvicorn_logger in loggers:
uvicorn_logger.handlers = []
# change handler for default uvicorn logger
intercept_handler: InterceptHandler = InterceptHandler()
logging.getLogger("uvicorn").handlers = [intercept_handler]
logging.getLogger("reader").handlers = [intercept_handler]
logging.getLogger("apscheduler.executors.default").handlers = [intercept_handler]
# set logs output, level and format
logger.configure(handlers=[{"sink": sys.stdout, "level": logging.DEBUG, "format": format_record}])

View File

@ -8,18 +8,15 @@ from fastapi import FastAPI, Form, Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from loguru import logger
from reader import Entry, EntryCounts, EntrySearchCounts, EntrySearchResult, Feed, FeedCounts, Reader, TagNotFoundError
from starlette.responses import RedirectResponse
from discord_rss_bot import settings
from discord_rss_bot.apscheduler_listener import my_listener
from discord_rss_bot.custom_filters import convert_to_md, encode_url, entry_is_blacklisted, entry_is_whitelisted
from discord_rss_bot.custom_message import get_custom_message, get_images_from_entry, remove_image_tags
from discord_rss_bot.feeds import send_to_discord
from discord_rss_bot.filter.blacklist import get_blacklist_content, get_blacklist_summary, get_blacklist_title
from discord_rss_bot.filter.whitelist import get_whitelist_content, get_whitelist_summary, get_whitelist_title
from discord_rss_bot.logger import init_logging
from discord_rss_bot.search import create_html_for_search_results
from discord_rss_bot.settings import default_custom_message, get_reader, list_webhooks
@ -29,9 +26,6 @@ templates: Jinja2Templates = Jinja2Templates(directory="discord_rss_bot/template
reader: Reader = get_reader()
init_logging()
# Add the filters to the Jinja2 environment so they can be used in html templates.
templates.env.filters["encode_url"] = encode_url
templates.env.filters["entry_is_whitelisted"] = entry_is_whitelisted
@ -54,11 +48,9 @@ async def add_webhook(webhook_name=Form(), webhook_url=Form()):
# Remove leading and trailing whitespace.
clean_webhook_name: str = webhook_name.strip()
clean_webhook_url: str = webhook_url.strip()
logger.info(f"Adding webhook {clean_webhook_name} with url {clean_webhook_url}")
# Get current webhooks from the database if they exist otherwise use an empty list.
webhooks: list[dict[str, str]] = list_webhooks(reader)
logger.debug(f"Current webhooks: {webhooks}")
# Only add the webhook if it doesn't already exist.
if all(webhook["name"] != clean_webhook_name for webhook in webhooks):
@ -71,12 +63,9 @@ async def add_webhook(webhook_name=Form(), webhook_url=Form()):
# Add our new list of webhooks to the database.
reader.set_tag((), "webhooks", webhooks) # type: ignore
logger.info(f"Added webhook {clean_webhook_name} with url {clean_webhook_url}")
return RedirectResponse(url="/", status_code=303)
# TODO: Show this error on the page.
logger.error(f"Webhook {clean_webhook_name} already exists.")
return {"error": "Webhook already exists."}
@ -93,25 +82,21 @@ async def delete_webhook(webhook_url=Form()):
"""
# Remove leading and trailing whitespace.
clean_webhook_url: str = webhook_url.strip()
logger.debug(f"Deleting webhook with url {clean_webhook_url}")
# Get current webhooks from the database if they exist otherwise use an empty list.
webhooks: list[dict[str, str]] = list_webhooks(reader)
logger.debug(f"Current webhooks: {webhooks}")
# Only add the webhook if it doesn't already exist.
for webhook in webhooks:
if webhook["url"] == clean_webhook_url:
# Add the new webhook to the list of webhooks.
webhooks.remove(webhook)
logger.info(f"Deleted webhook with url {clean_webhook_url}")
# Add our new list of webhooks to the database.
reader.set_tag((), "webhooks", webhooks) # type: ignore
return RedirectResponse(url="/", status_code=303)
# TODO: Show this error on the page.
logger.error(f"Could not find webhook with url {clean_webhook_url}")
return {"error": "Could not find webhook."}
@ -128,26 +113,20 @@ async def create_feed(feed_url=Form(), webhook_dropdown=Form()):
dict: The feed that was added.
"""
clean_feed_url: str = feed_url.strip()
logger.info(f"Adding feed {clean_feed_url} with webhook {webhook_dropdown}")
# TODO: Check if the feed is valid, if not return an error or fix it.
# For example, if the feed is missing the protocol, add it.
reader.add_feed(clean_feed_url)
reader.update_feed(clean_feed_url)
logger.debug(f"Added feed {clean_feed_url}")
# Mark every entry as read, so we don't send all the old entries to Discord.
entries: Iterable[Entry] = reader.get_entries(feed=clean_feed_url, read=False)
for entry in entries:
reader.set_entry_read(entry, True)
logger.debug(f"Marked entry {entry.title} as read.")
try:
hooks = reader.get_tag((), "webhooks")
logger.debug(f"Current webhooks: {hooks}")
except TagNotFoundError:
hooks = []
logger.error("No webhooks found in the database.")
webhook_url: str = ""
if hooks:
@ -159,18 +138,15 @@ async def create_feed(feed_url=Form(), webhook_dropdown=Form()):
if not webhook_url:
# TODO: Show this error on the page.
logger.error("No webhook URL found.")
return {"error": "No webhook URL found."}
# This is the webhook that will be used to send the feed to Discord.
reader.set_tag(clean_feed_url, "webhook", webhook_url) # type: ignore
reader.get_tag(clean_feed_url, "webhook")
logger.debug(f"Set webhook {webhook_url} for feed {clean_feed_url}")
# This is the default message that will be sent to Discord.
reader.set_tag(clean_feed_url, "custom_message", default_custom_message) # type: ignore
reader.get_tag(clean_feed_url, "custom_message")
logger.debug(f"Set custom message {default_custom_message} for feed {clean_feed_url}")
# Update the full-text search index so our new feed is searchable.
reader.update_search()
@ -188,15 +164,12 @@ async def pause_feed(feed_url=Form()):
Returns:
Redirect the URL to the feed we paused.
"""
logger.info(f"Pausing feed {feed_url}")
# Disable/pause the feed.
reader.disable_feed_updates(feed_url)
logger.debug(f"Paused feed {feed_url}")
# Clean URL is used to redirect to the feed page.
clean_url: str = urllib.parse.quote(feed_url)
logger.debug(f"Clean URL: {clean_url}")
return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303)
@ -211,15 +184,12 @@ async def unpause_feed(feed_url=Form()):
Returns:
Redirect to the feed we unpaused.
"""
logger.info(f"Unpausing feed {feed_url}")
# Enable/unpause the feed.
reader.enable_feed_updates(feed_url)
logger.debug(f"Unpaused feed {feed_url}")
# Clean URL is used to redirect to the feed page.
clean_url: str = urllib.parse.quote(feed_url)
logger.debug(f"Clean URL: {clean_url}")
return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303)
@ -244,17 +214,13 @@ async def set_whitelist(
"""
if whitelist_title:
reader.set_tag(feed_url, "whitelist_title", whitelist_title)
logger.info(f"Set whitelist_title to {whitelist_title} for feed {feed_url}")
if whitelist_summary:
reader.set_tag(feed_url, "whitelist_summary", whitelist_summary)
logger.info(f"Set whitelist_summary to {whitelist_summary} for feed {feed_url}")
if whitelist_content:
reader.set_tag(feed_url, "whitelist_content", whitelist_content)
logger.info(f"Set whitelist_content to {whitelist_content} for feed {feed_url}")
# Clean URL is used to redirect to the feed page.
clean_url: str = urllib.parse.quote(feed_url)
logger.debug(f"Clean URL: {clean_url}")
return RedirectResponse(url=f"/feed/?feed_url={clean_url}", status_code=303)
@ -272,7 +238,6 @@ async def get_whitelist(feed_url, request: Request):
"""
# Make feed_url a valid URL.
url: str = urllib.parse.unquote(feed_url)
logger.debug(f"URL: {url}")
feed: Feed = reader.get_feed(url)
@ -281,10 +246,6 @@ async def get_whitelist(feed_url, request: Request):
whitelist_summary: str = get_whitelist_summary(reader, feed)
whitelist_content: str = get_whitelist_content(reader, feed)
logger.debug(f"whitelist_title: {whitelist_title}")
logger.debug(f"whitelist_summary: {whitelist_summary}")
logger.debug(f"whitelist_content: {whitelist_content}")
context = {
"request": request,
"feed": feed,
@ -292,7 +253,6 @@ async def get_whitelist(feed_url, request: Request):
"whitelist_summary": whitelist_summary,
"whitelist_content": whitelist_content,
}
logger.debug(f"Context: {context}")
return templates.TemplateResponse("whitelist.html", context)
@ -655,9 +615,6 @@ def startup() -> None:
scheduler: BackgroundScheduler = BackgroundScheduler()
# Add our own listener to the scheduler. We use this so we can use loguru instead of the default logger.
scheduler.add_listener(my_listener)
# Update all feeds every 15 minutes.
scheduler.add_job(send_to_discord, "interval", minutes=15, next_run_time=datetime.now())
scheduler.start()

View File

@ -1,7 +1,6 @@
import urllib.parse
from typing import Iterable
from loguru import logger
from reader import EntrySearchResult, Feed, HighlightedString, Reader
from discord_rss_bot.settings import get_reader
@ -32,10 +31,6 @@ def create_html_for_search_results(
feed: Feed = reader.get_feed(result.feed_url)
feed_url: str = urllib.parse.quote(feed.url)
logger.debug(f"Adding {result.metadata['.title']} to the search results.")
logger.debug(f"Feed URL: {feed_url}")
logger.debug(f"Result summary: {result_summary}")
html += f"""
<div class="p-2 mb-2 border border-dark">
<a class="text-muted text-decoration-none" href="/feed?feed_url={feed_url}">
@ -45,7 +40,6 @@ def create_html_for_search_results(
</div>
"""
logger.debug(f"Search results HTML: {html}")
return html
@ -66,7 +60,4 @@ def add_span_with_slice(highlighted_string: HighlightedString) -> str:
span_part: str = f"<span class='bg-warning'>{highlighted_string.value[txt_slice.start: txt_slice.stop]}</span>"
after_span: str = f"{highlighted_string.value[txt_slice.stop:]}"
logger.debug(f"Before span: {before_span}")
logger.debug(f"Span part: {span_part}")
logger.debug(f"After span: {after_span}")
return f"{before_span}{span_part}{after_span}"

View File

@ -1,16 +1,13 @@
import os
from functools import lru_cache
from loguru import logger
from platformdirs import user_data_dir
from reader import Entry, Reader, TagNotFoundError, make_reader # type: ignore
data_dir: str = user_data_dir(appname="discord_rss_bot", appauthor="TheLovinator", roaming=True)
os.makedirs(data_dir, exist_ok=True)
logger.info(f"Data directory: {data_dir}")
default_custom_message: str = "{{entry_title}}\n{{entry_link}}"
logger.debug(f"Default custom message: {default_custom_message}")
def get_webhook_for_entry(custom_reader: Reader, entry: Entry) -> str:
@ -31,10 +28,8 @@ def get_webhook_for_entry(custom_reader: Reader, entry: Entry) -> str:
# Is None if not found or error.
try:
webhook_url: str = str(reader.get_tag(entry.feed_url, "webhook"))
logger.debug(f"Webhook for {entry.title}: {webhook_url}")
except TagNotFoundError:
webhook_url = ""
logger.error(f"Could not find webhook for {entry.title}.")
return webhook_url
@ -50,10 +45,8 @@ def get_db_location(custom_location: str = "") -> str:
The database location.
"""
# Use the custom location if it is provided.
logger.debug(f"Custom location: {custom_location}")
db_loc: str = custom_location or os.path.join(data_dir, "db.sqlite")
logger.debug(f"Database location: {db_loc}")
return db_loc
@ -65,10 +58,8 @@ def get_reader(custom_location: str = "") -> Reader:
custom_location: The location of the database file.
"""
logger.debug(f"Custom location: {custom_location}")
db_location: str = get_db_location(custom_location)
logger.debug(f"Database location: {db_location}")
return make_reader(url=db_location)
@ -91,9 +82,6 @@ def list_webhooks(reader: Reader) -> list[dict[str, str]]:
# Check if the tag is named webhooks
if tag == "webhooks":
webhooks = reader.get_tag((), "webhooks") # type: ignore
else:
logger.debug(f"Tag {tag} is not webhooks.")
break
logger.debug(f"Webhooks: {webhooks}")
return webhooks