Fix all errors and warnings
This commit is contained in:
@ -42,9 +42,7 @@ def has_black_tags(custom_reader: Reader, feed: Feed) -> bool:
|
||||
blacklist_summary: str = get_blacklist_summary(custom_reader, feed)
|
||||
blacklist_content: str = get_blacklist_content(custom_reader, feed)
|
||||
|
||||
if blacklist_title or blacklist_summary or blacklist_content:
|
||||
return True
|
||||
return False
|
||||
return bool(blacklist_title or blacklist_summary or blacklist_content)
|
||||
|
||||
|
||||
def should_be_skipped(custom_reader: Reader, entry: Entry) -> bool:
|
||||
@ -61,22 +59,16 @@ def should_be_skipped(custom_reader: Reader, entry: Entry) -> bool:
|
||||
feed: Feed = entry.feed
|
||||
blacklist_title: str = get_blacklist_title(custom_reader, feed)
|
||||
blacklist_summary: str = get_blacklist_summary(custom_reader, feed)
|
||||
blacklist_content: str = get_blacklist_content(custom_reader, feed)
|
||||
# blacklist_content: str = get_blacklist_content(custom_reader, feed)
|
||||
# TODO: Fix content
|
||||
# TODO: Check author
|
||||
|
||||
if blacklist_title:
|
||||
if is_word_in_text(blacklist_title, entry.title):
|
||||
return True
|
||||
|
||||
if blacklist_summary:
|
||||
if is_word_in_text(blacklist_summary, entry.summary):
|
||||
return True
|
||||
|
||||
if entry.title and blacklist_title and is_word_in_text(blacklist_title, entry.title):
|
||||
return True
|
||||
elif entry.summary and blacklist_summary and is_word_in_text(blacklist_summary, entry.summary):
|
||||
return True
|
||||
return False
|
||||
|
||||
# if blacklist_content.lower() in entry.content.lower():
|
||||
|
||||
|
||||
def get_blacklist_content(custom_reader: Reader, feed: Feed) -> str:
|
||||
"""
|
||||
|
@ -1,7 +1,7 @@
|
||||
from typing import Iterable
|
||||
|
||||
from discord_webhook import DiscordWebhook
|
||||
from reader import Entry, Reader
|
||||
from reader import Entry, Feed, Reader
|
||||
from requests import Response
|
||||
|
||||
from discord_rss_bot import settings
|
||||
@ -10,7 +10,7 @@ from discord_rss_bot.settings import get_reader
|
||||
from discord_rss_bot.whitelist import has_white_tags, should_be_sent
|
||||
|
||||
|
||||
def send_to_discord(custom_reader: Reader | None = None, feed=None, do_once=False) -> None:
|
||||
def send_to_discord(custom_reader: Reader | None = None, feed: Feed | None = None, do_once: bool = False) -> None:
|
||||
"""
|
||||
Send entries to Discord.
|
||||
|
||||
@ -38,37 +38,41 @@ def send_to_discord(custom_reader: Reader | None = None, feed=None, do_once=Fals
|
||||
|
||||
for entry in entries:
|
||||
# Set the webhook to read, so we don't send it again.
|
||||
reader.set_entry_read(entry, True) # type: ignore
|
||||
reader.set_entry_read(entry, True)
|
||||
|
||||
webhook_url = settings.get_webhook_for_entry(reader, entry)
|
||||
webhook_url: str | None = settings.get_webhook_for_entry(reader, entry)
|
||||
|
||||
webhook_message: str = f":robot: :mega: {entry.title}\n{entry.link}"
|
||||
|
||||
if webhook_url is None:
|
||||
print(f"Error: No webhook found for feed: {entry.feed.title}")
|
||||
continue
|
||||
|
||||
webhook: DiscordWebhook = DiscordWebhook(url=webhook_url, content=webhook_message, rate_limit_retry=True)
|
||||
|
||||
# Check if the entry has a whitelist
|
||||
if has_white_tags(reader, feed):
|
||||
if feed is not None and has_white_tags(reader, feed):
|
||||
# Only send the entry if it is whitelisted, otherwise, mark it as read and continue.
|
||||
if should_be_sent(reader, entry):
|
||||
response: Response = webhook.execute()
|
||||
reader.set_entry_read(entry, True) # type: ignore
|
||||
reader.set_entry_read(entry, True)
|
||||
if not response.ok:
|
||||
print(f"Error sending to Discord: {response.text}")
|
||||
reader.set_entry_read(entry, False) # type: ignore
|
||||
reader.set_entry_read(entry, False)
|
||||
else:
|
||||
reader.set_entry_read(entry, True) # type: ignore
|
||||
reader.set_entry_read(entry, True)
|
||||
continue
|
||||
|
||||
# Check if the entry is blacklisted, if it is, mark it as read and continue.
|
||||
if should_be_skipped(reader, entry):
|
||||
print(f"Blacklisted entry: {entry.title}, not sending to Discord.")
|
||||
reader.set_entry_read(entry, True) # type: ignore
|
||||
reader.set_entry_read(entry, True)
|
||||
continue
|
||||
|
||||
# It was not blacklisted, and not forced through whitelist, so we will send it to Discord.
|
||||
response: Response = webhook.execute()
|
||||
if not response.ok:
|
||||
print(f"Error sending to Discord: {response.text}")
|
||||
reader.set_entry_read(entry, False) # type: ignore
|
||||
reader.set_entry_read(entry, False)
|
||||
|
||||
# If we only want to send one entry, we will break the loop. This is used when testing this function.
|
||||
if do_once:
|
||||
|
@ -69,9 +69,10 @@ def entry_is_blacklisted(entry_to_check: Entry) -> bool:
|
||||
return bool(has_black_tags(reader, entry_to_check.feed) and should_be_skipped(reader, entry_to_check))
|
||||
|
||||
|
||||
templates.env.filters["encode_url"] = encode_url
|
||||
templates.env.filters["entry_is_whitelisted"] = entry_is_whitelisted
|
||||
templates.env.filters["entry_is_blacklisted"] = entry_is_blacklisted
|
||||
# Add the filters to the Jinja2 environment so they can be used in html templates.
|
||||
templates.env.filters["encode_url"] = encode_url # type: ignore
|
||||
templates.env.filters["entry_is_whitelisted"] = entry_is_whitelisted # type: ignore
|
||||
templates.env.filters["entry_is_blacklisted"] = entry_is_blacklisted # type: ignore
|
||||
|
||||
|
||||
@app.post("/add_webhook")
|
||||
@ -175,8 +176,8 @@ async def create_feed(feed_url: str = Form(), webhook_dropdown: str = Form()):
|
||||
if hooks:
|
||||
# Get the webhook URL from the dropdown.
|
||||
for hook in hooks:
|
||||
if hook["name"] == webhook_dropdown:
|
||||
webhook_url: str = hook["url"]
|
||||
if hook["name"] == webhook_dropdown: # type: ignore
|
||||
webhook_url: str = hook["url"] # type: ignore
|
||||
break
|
||||
|
||||
if not webhook_url:
|
||||
@ -277,9 +278,9 @@ async def get_whitelist(feed_url: str, request: Request):
|
||||
feed: Feed = reader.get_feed(url)
|
||||
|
||||
# Get previous data, this is used when creating the form.
|
||||
whitelist_title = whitelist.get_whitelist_title(reader, feed)
|
||||
whitelist_summary = whitelist.get_whitelist_summary(reader, feed)
|
||||
whitelist_content = whitelist.get_whitelist_content(reader, feed)
|
||||
whitelist_title: str = whitelist.get_whitelist_title(reader, feed)
|
||||
whitelist_summary: str = whitelist.get_whitelist_summary(reader, feed)
|
||||
whitelist_content: str = whitelist.get_whitelist_content(reader, feed)
|
||||
|
||||
context = {
|
||||
"request": request,
|
||||
@ -288,7 +289,7 @@ async def get_whitelist(feed_url: str, request: Request):
|
||||
"whitelist_summary": whitelist_summary,
|
||||
"whitelist_content": whitelist_content,
|
||||
}
|
||||
return templates.TemplateResponse("whitelist.html", context)
|
||||
return templates.TemplateResponse("whitelist.html", context) # type: ignore
|
||||
|
||||
|
||||
@app.post("/blacklist")
|
||||
@ -298,7 +299,8 @@ async def set_blacklist(
|
||||
blacklist_content: str = Form(None),
|
||||
feed_url: str = Form(),
|
||||
):
|
||||
"""Set the blacklist, if this is set we will check if words are in the title, summary or content and then don't send that entry.
|
||||
"""Set the blacklist, if this is set we will check if words are in the title, summary or content
|
||||
and then don't send that entry.
|
||||
|
||||
Args:
|
||||
blacklist_title: Blacklisted words for when checking the title.
|
||||
@ -332,9 +334,9 @@ async def get_blacklist(feed_url: str, request: Request):
|
||||
feed: Feed = reader.get_feed(url)
|
||||
|
||||
# Get previous data, this is used when creating the form.
|
||||
blacklist_title = blacklist.get_blacklist_title(reader, feed)
|
||||
blacklist_summary = blacklist.get_blacklist_summary(reader, feed)
|
||||
blacklist_content = blacklist.get_blacklist_content(reader, feed)
|
||||
blacklist_title: str = blacklist.get_blacklist_title(reader, feed)
|
||||
blacklist_summary: str = blacklist.get_blacklist_summary(reader, feed)
|
||||
blacklist_content: str = blacklist.get_blacklist_content(reader, feed)
|
||||
|
||||
context = {
|
||||
"request": request,
|
||||
@ -343,7 +345,7 @@ async def get_blacklist(feed_url: str, request: Request):
|
||||
"blacklist_summary": blacklist_summary,
|
||||
"blacklist_content": blacklist_content,
|
||||
}
|
||||
return templates.TemplateResponse("blacklist.html", context)
|
||||
return templates.TemplateResponse("blacklist.html", context) # type: ignore
|
||||
|
||||
|
||||
@app.get("/add", response_class=HTMLResponse)
|
||||
@ -357,8 +359,8 @@ def get_add(request: Request):
|
||||
Returns:
|
||||
HTMLResponse: The HTML response.
|
||||
"""
|
||||
context = make_context_index(request)
|
||||
return templates.TemplateResponse("add.html", context)
|
||||
context = make_context_index(request) # type: ignore
|
||||
return templates.TemplateResponse("add.html", context) # type: ignore
|
||||
|
||||
|
||||
@app.get("/feed", response_class=HTMLResponse)
|
||||
@ -385,7 +387,7 @@ async def get_feed(feed_url: str, request: Request):
|
||||
feed_counts: FeedCounts = reader.get_feed_counts(feed=url)
|
||||
|
||||
context = {"request": request, "feed": feed, "entries": entries, "feed_counts": feed_counts}
|
||||
return templates.TemplateResponse("feed.html", context)
|
||||
return templates.TemplateResponse("feed.html", context) # type: ignore
|
||||
|
||||
|
||||
@app.get("/webhooks", response_class=HTMLResponse)
|
||||
@ -399,7 +401,7 @@ async def get_webhooks(request: Request):
|
||||
Returns:
|
||||
HTMLResponse: The HTML response.
|
||||
"""
|
||||
return templates.TemplateResponse("webhooks.html", {"request": request})
|
||||
return templates.TemplateResponse("webhooks.html", {"request": request}) # type: ignore
|
||||
|
||||
|
||||
@app.get("/", response_class=HTMLResponse)
|
||||
@ -413,11 +415,11 @@ def index(request: Request):
|
||||
Returns:
|
||||
HTMLResponse: The HTML response.
|
||||
"""
|
||||
context = make_context_index(request)
|
||||
return templates.TemplateResponse("index.html", context)
|
||||
context = make_context_index(request) # type: ignore
|
||||
return templates.TemplateResponse("index.html", context) # type: ignore
|
||||
|
||||
|
||||
def make_context_index(request) -> dict:
|
||||
def make_context_index(request: Request) -> dict[str, Any]:
|
||||
"""
|
||||
Create the needed context for the index page.
|
||||
|
||||
@ -503,10 +505,10 @@ async def search(request: Request, query: str):
|
||||
"query": query,
|
||||
"search_amount": search_amount,
|
||||
}
|
||||
return templates.TemplateResponse("search.html", context)
|
||||
return templates.TemplateResponse("search.html", context) # type: ignore
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
@app.on_event("startup") # type: ignore
|
||||
def startup() -> None:
|
||||
"""This is called when the server starts.
|
||||
|
||||
@ -514,10 +516,10 @@ def startup() -> None:
|
||||
scheduler: BackgroundScheduler = BackgroundScheduler()
|
||||
|
||||
# Update all feeds every 15 minutes.
|
||||
scheduler.add_job(send_to_discord, "interval", minutes=15, next_run_time=datetime.now())
|
||||
scheduler.add_job(send_to_discord, "interval", minutes=15, next_run_time=datetime.now()) # type: ignore
|
||||
|
||||
scheduler.start()
|
||||
scheduler.start() # type: ignore
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
uvicorn.run("main:app", log_level="debug", reload=True)
|
||||
uvicorn.run("main:app", log_level="debug", reload=True) # type: ignore
|
||||
|
@ -2,7 +2,7 @@ import logging
|
||||
import os
|
||||
|
||||
from platformdirs import user_data_dir
|
||||
from reader import Entry, Reader, TagNotFoundError, make_reader
|
||||
from reader import Entry, Reader, TagNotFoundError, make_reader # type: ignore
|
||||
|
||||
logging_format: str = "[%(asctime)s] [%(funcName)s:%(lineno)d] %(message)s"
|
||||
logging.basicConfig(level=logging.DEBUG, format=logging_format)
|
||||
|
@ -42,9 +42,7 @@ def has_white_tags(custom_reader: Reader, feed: Feed) -> bool:
|
||||
whitelist_summary: str = get_whitelist_summary(custom_reader, feed)
|
||||
whitelist_content: str = get_whitelist_content(custom_reader, feed)
|
||||
|
||||
if whitelist_title or whitelist_summary or whitelist_content:
|
||||
return True
|
||||
return False
|
||||
return bool(whitelist_title or whitelist_summary or whitelist_content)
|
||||
|
||||
|
||||
def should_be_sent(custom_reader: Reader, entry: Entry) -> bool:
|
||||
@ -61,20 +59,15 @@ def should_be_sent(custom_reader: Reader, entry: Entry) -> bool:
|
||||
feed: Feed = entry.feed
|
||||
whitelist_title: str = get_whitelist_title(custom_reader, feed)
|
||||
whitelist_summary: str = get_whitelist_summary(custom_reader, feed)
|
||||
whitelist_content: str = get_whitelist_content(custom_reader, feed)
|
||||
# whitelist_content: str = get_whitelist_content(custom_reader, feed)
|
||||
# TODO: Fix content
|
||||
# TODO: Check author
|
||||
|
||||
if whitelist_title:
|
||||
if is_word_in_text(whitelist_title, entry.title):
|
||||
return True
|
||||
|
||||
if whitelist_summary:
|
||||
if is_word_in_text(whitelist_summary, entry.summary):
|
||||
return True
|
||||
|
||||
if entry.title and whitelist_title and is_word_in_text(whitelist_title, entry.title):
|
||||
return True
|
||||
elif entry.summary and whitelist_summary and is_word_in_text(whitelist_summary, entry.summary):
|
||||
return True
|
||||
return False
|
||||
# if whitelist_content.lower() in entry.content.lower():
|
||||
|
||||
|
||||
def get_whitelist_content(custom_reader: Reader, feed: Feed) -> str:
|
||||
|
Reference in New Issue
Block a user