Reduce the amount of files and functions

This commit is contained in:
2025-02-24 13:52:00 +01:00
parent 2806e6044d
commit 4aa0bd7837
5 changed files with 59 additions and 293 deletions

View File

@ -5,29 +5,35 @@ import json
import os
import platform
import tempfile
from functools import lru_cache
from pathlib import Path
from typing import TYPE_CHECKING, Any
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
import discord
import pytz
import sentry_sdk
from apscheduler import events
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_MISSED, JobExecutionEvent
from apscheduler.job import Job
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from discord.abc import PrivateChannel
from discord_webhook import DiscordWebhook
from dotenv import load_dotenv
from loguru import logger
from discord_reminder_bot.misc import calc_time, calculate
from discord_reminder_bot.misc import calculate
from discord_reminder_bot.parser import parse_time
from discord_reminder_bot.settings import get_bot_token, get_scheduler, get_webhook_url
if TYPE_CHECKING:
from apscheduler.job import Job
from discord.guild import GuildChannel
from discord.interactions import InteractionChannel
from requests import Response
from discord_reminder_bot import settings
load_dotenv()
default_sentry_dsn: str = "https://c4c61a52838be9b5042144420fba5aaa@o4505228040339456.ingest.us.sentry.io/4508707268984832"
sentry_sdk.init(
@ -37,7 +43,39 @@ sentry_sdk.init(
send_default_pii=True,
)
scheduler: settings.AsyncIOScheduler = get_scheduler()
@lru_cache(maxsize=1)
def get_scheduler() -> AsyncIOScheduler:
"""Return the scheduler instance.
Raises:
ValueError: If the timezone is missing or invalid.
Returns:
AsyncIOScheduler: The scheduler instance.
"""
config_timezone: str | None = os.getenv("TIMEZONE")
if not config_timezone:
msg = "Missing timezone. Please set the TIMEZONE environment variable."
raise ValueError(msg)
# Test if the timezone is valid
try:
ZoneInfo(config_timezone)
except (ZoneInfoNotFoundError, ModuleNotFoundError) as e:
msg: str = f"Invalid timezone: {config_timezone}. Error: {e}"
raise ValueError(msg) from e
sqlite_location: str = os.getenv("SQLITE_LOCATION", default="/jobs.sqlite")
logger.info(f"Using SQLite database at: {sqlite_location}")
jobstores: dict[str, SQLAlchemyJobStore] = {"default": SQLAlchemyJobStore(url=f"sqlite://{sqlite_location}")}
job_defaults: dict[str, bool] = {"coalesce": True}
timezone = pytz.timezone(config_timezone)
return AsyncIOScheduler(jobstores=jobstores, timezone=timezone, job_defaults=job_defaults)
scheduler: AsyncIOScheduler = get_scheduler()
def my_listener(event: JobExecutionEvent) -> None:
@ -366,10 +404,10 @@ class RemindGroup(discord.app_commands.Group):
msg: str = f"Event '{event.name}' created successfully!\n"
if event.start_time:
msg += f"Start Time: {calc_time(event.start_time)}\n"
msg += f"Start Time: <t:{int(event.start_time.timestamp())}:R>\n"
if event.end_time:
msg += f"End Time: {calc_time(event.end_time)}\n"
msg += f"End Time: <t:{int(event.end_time.timestamp())}:R>\n"
if event.channel_id:
msg += f"Channel: <#{event.channel_id}>\n"
@ -857,31 +895,27 @@ remind_group = RemindGroup()
bot.tree.add_command(remind_group)
def send_webhook(url: str = "", message: str = "") -> None:
def send_webhook(custom_url: str = "", message: str = "") -> None:
"""Send a webhook to Discord.
Args:
url: Our webhook url, defaults to the one from settings.
custom_url: The custom webhook URL to send the message to. Defaults to the environment variable.
message: The message that will be sent to Discord.
"""
if not message:
logger.error("No message provided.")
message = "No message provided."
webhook_url: str = os.getenv("WEBHOOK_URL", default="")
url: str = custom_url or webhook_url
if not url:
url = get_webhook_url()
logger.error(f"No webhook URL provided. Using the one from settings: {url}")
webhook: DiscordWebhook = DiscordWebhook(
url=url,
username="discord-reminder-bot",
content="No webhook URL provided. Using the one from settings.",
rate_limit_retry=True,
)
webhook.execute()
logger.error("No webhook URL provided.")
return
webhook: DiscordWebhook = DiscordWebhook(url=url, content=message, rate_limit_retry=True)
webhook.execute()
webhook: DiscordWebhook = DiscordWebhook(url=custom_url, content=message, rate_limit_retry=True)
webhook_response: Response = webhook.execute()
logger.info(f"Webhook response: {webhook_response}")
async def send_to_discord(channel_id: int, message: str, author_id: int) -> None:
@ -945,5 +979,9 @@ async def send_to_user(user_id: int, guild_id: int, message: str) -> None:
if __name__ == "__main__":
bot_token: str = get_bot_token()
bot_token: str = os.getenv("BOT_TOKEN", default="")
if not bot_token:
msg = "Missing bot token. Please set the BOT_TOKEN environment variable. Read the README for more information."
raise ValueError(msg)
bot.run(bot_token)

View File

@ -65,24 +65,3 @@ def get_human_time(time: datetime.timedelta) -> str:
time_str += f"{int(seconds)}s"
return time_str
def calc_time(time: datetime.datetime | None) -> str:
"""Convert a datetime object to a Discord timestamp.
Args:
time: The datetime object to convert.
Returns:
str: The Discord timestamp.
"""
if not time:
return "None"
if time.tzinfo is None or time.tzinfo.utcoffset(time) is None:
logger.warning(f"Time is not timezone-aware: {time}")
if time < datetime.datetime.now(tz=time.tzinfo):
logger.warning(f"Time is in the past: {time}")
return f"<t:{int(time.timestamp())}:R>"

View File

@ -1,142 +0,0 @@
from __future__ import annotations
import os
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
import pytz
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from dotenv import load_dotenv
def get_settings(use_dotenv: bool = True) -> dict[str, str | dict[str, SQLAlchemyJobStore] | dict[str, bool] | AsyncIOScheduler]: # noqa: FBT001, FBT002
"""Load environment variables and return the settings.
Args:
use_dotenv (bool, optional): Whether to load environment variables from a .env file. Defaults to True.
Raises:
ValueError: If the bot token is missing.
Returns:
dict: The settings.
"""
if use_dotenv:
load_dotenv(verbose=True)
sqlite_location: str = os.getenv("SQLITE_LOCATION", default="/jobs.sqlite")
log_level: str = os.getenv("LOG_LEVEL", default="INFO")
webhook_url: str = os.getenv("WEBHOOK_URL", default="")
bot_token: str = os.getenv("BOT_TOKEN", default="")
if not bot_token:
msg = "Missing bot token. Please set the BOT_TOKEN environment variable."
raise ValueError(msg)
config_timezone: str | None = os.getenv("TIMEZONE")
if not config_timezone:
msg = "Missing timezone. Please set the TIMEZONE environment variable."
raise ValueError(msg)
# Test if the timezone is valid
try:
ZoneInfo(config_timezone)
except (ZoneInfoNotFoundError, ModuleNotFoundError) as e:
msg: str = f"Invalid timezone: {config_timezone}. Error: {e}"
raise ValueError(msg) from e
jobstores: dict[str, SQLAlchemyJobStore] = {"default": SQLAlchemyJobStore(url=f"sqlite://{sqlite_location}")}
job_defaults: dict[str, bool] = {"coalesce": True}
scheduler = AsyncIOScheduler(
jobstores=jobstores,
timezone=pytz.timezone(config_timezone),
job_defaults=job_defaults,
)
return {
"sqlite_location": sqlite_location,
"config_timezone": config_timezone,
"bot_token": bot_token,
"log_level": log_level,
"webhook_url": webhook_url,
"jobstores": jobstores,
"job_defaults": job_defaults,
"scheduler": scheduler,
}
def get_scheduler(use_dotenv: bool = True) -> AsyncIOScheduler: # noqa: FBT001, FBT002
"""Return the scheduler instance.
Args:
use_dotenv (bool, optional): Whether to load environment variables from a .env file. Defaults to True
Raises:
TypeError: If the scheduler is not an instance of AsyncIOScheduler.
KeyError: If the scheduler is missing from the settings.
Returns:
AsyncIOScheduler: The scheduler instance.
"""
settings: dict[str, str | dict[str, SQLAlchemyJobStore] | dict[str, bool] | AsyncIOScheduler] = get_settings(use_dotenv)
if scheduler := settings.get("scheduler"):
if not isinstance(scheduler, AsyncIOScheduler):
msg = "The scheduler is not an instance of AsyncIOScheduler."
raise TypeError(msg)
return scheduler
msg = "The scheduler is missing from the settings."
raise KeyError(msg)
def get_bot_token(use_dotenv: bool = True) -> str: # noqa: FBT001, FBT002
"""Return the bot token.
Args:
use_dotenv (bool, optional): Whether to load environment variables from a .env file. Defaults to True
Raises:
TypeError: If the bot token is not a string.
KeyError: If the bot token is missing from the settings.
Returns:
str: The bot token.
"""
settings: dict[str, str | dict[str, SQLAlchemyJobStore] | dict[str, bool] | AsyncIOScheduler] = get_settings(use_dotenv)
if bot_token := settings.get("bot_token"):
if not isinstance(bot_token, str):
msg = "The bot token is not a string."
raise TypeError(msg)
return bot_token
msg = "The bot token is missing from the settings."
raise KeyError(msg)
def get_webhook_url(use_dotenv: bool = True) -> str: # noqa: FBT001, FBT002
"""Return the webhook URL.
Args:
use_dotenv (bool, optional): Whether to load environment variables from a .env file. Defaults to True
Raises:
TypeError: If the webhook URL is not a string.
KeyError: If the webhook URL is missing from the settings.
Returns:
str: The webhook URL.
"""
settings: dict[str, str | dict[str, SQLAlchemyJobStore] | dict[str, bool] | AsyncIOScheduler] = get_settings(use_dotenv)
if webhook_url := settings.get("webhook_url"):
if not isinstance(webhook_url, str):
msg = "The webhook URL is not a string."
raise TypeError(msg)
return webhook_url
msg = "The webhook URL is missing from the settings."
raise KeyError(msg)

View File

@ -9,35 +9,12 @@ from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
from discord_reminder_bot.misc import calc_time, calculate, get_human_time
from discord_reminder_bot.misc import calculate, get_human_time
if TYPE_CHECKING:
from apscheduler.job import Job
def test_calc_time() -> None:
"""Test the calc_time function with various datetime inputs."""
test_datetime: datetime = datetime(2023, 10, 1, 12, 0, 0, tzinfo=timezone.utc)
expected_timestamp: str = f"<t:{int(test_datetime.timestamp())}:R>"
assert_msg = f"Expected {expected_timestamp}, got {calc_time(test_datetime)}"
assert calc_time(test_datetime) == expected_timestamp, assert_msg
now: datetime = datetime.now(tz=timezone.utc)
expected_timestamp_now: str = f"<t:{int(now.timestamp())}:R>"
assert_msg = f"Expected {expected_timestamp_now}, got {calc_time(now)}"
assert calc_time(now) == expected_timestamp_now, assert_msg
past_datetime: datetime = datetime(2000, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
expected_timestamp_past: str = f"<t:{int(past_datetime.timestamp())}:R>"
assert_msg = f"Expected {expected_timestamp_past}, got {calc_time(past_datetime)}"
assert calc_time(past_datetime) == expected_timestamp_past, assert_msg
future_datetime: datetime = datetime(2100, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
expected_timestamp_future: str = f"<t:{int(future_datetime.timestamp())}:R>"
assert_msg: str = f"Expected {expected_timestamp_future}, got {calc_time(future_datetime)}"
assert calc_time(future_datetime) == expected_timestamp_future, assert_msg
def test_get_human_time() -> None:
"""Test the get_human_time function with various timedelta inputs."""
test_timedelta = timedelta(days=1, hours=2, minutes=3, seconds=4)

View File

@ -1,86 +0,0 @@
from __future__ import annotations
import pytest
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from discord_reminder_bot.settings import get_settings
def test_get_settings(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test get_settings function with environment variables."""
monkeypatch.setenv("SQLITE_LOCATION", "/test_jobs.sqlite")
monkeypatch.setenv("TIMEZONE", "UTC")
monkeypatch.setenv("BOT_TOKEN", "test_token")
monkeypatch.setenv("LOG_LEVEL", "DEBUG")
monkeypatch.setenv("WEBHOOK_URL", "http://test_webhook_url")
settings: dict[str, str | dict[str, SQLAlchemyJobStore] | dict[str, bool] | AsyncIOScheduler] = get_settings(use_dotenv=False)
assert_msg = f"Expected /test_jobs.sqlite, got {settings['sqlite_location']}"
assert settings["sqlite_location"] == "/test_jobs.sqlite", assert_msg
assert_msg = f"Expected UTC, got {settings['config_timezone']}"
assert settings["config_timezone"] == "UTC", assert_msg
assert_msg = f"Expected test_token, got {settings['bot_token']}"
assert settings["bot_token"] == "test_token", assert_msg # noqa: S105
assert_msg = f"Expected DEBUG, got {settings['log_level']}"
assert settings["log_level"] == "DEBUG", assert_msg
assert_msg = f"Expected http://test_webhook_url, got {settings['webhook_url']}"
assert settings["webhook_url"] == "http://test_webhook_url", assert_msg
assert_msg = f"Expected dict, got {type(settings['jobstores'])}"
assert isinstance(settings["jobstores"], dict), assert_msg
assert_msg: str = f"Expected SQLAlchemyJobStore, got {type(settings['jobstores']['default'])}"
assert isinstance(settings["jobstores"]["default"], SQLAlchemyJobStore), assert_msg
assert_msg = f"Expected AsyncIOScheduler, got {type(settings['scheduler'])}"
assert isinstance(settings["scheduler"], AsyncIOScheduler), assert_msg
def test_get_settings_missing_bot_token(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test get_settings function with missing bot token."""
monkeypatch.delenv("BOT_TOKEN", raising=False)
with pytest.raises(ValueError, match="Missing bot token"):
get_settings(use_dotenv=False)
def test_get_settings_default_values(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test get_settings function with default values."""
monkeypatch.delenv("SQLITE_LOCATION", raising=False)
monkeypatch.delenv("TIMEZONE", raising=False)
monkeypatch.delenv("BOT_TOKEN", raising=False)
monkeypatch.delenv("LOG_LEVEL", raising=False)
monkeypatch.delenv("WEBHOOK_URL", raising=False)
monkeypatch.setenv("BOT_TOKEN", "default_token")
monkeypatch.setenv("TIMEZONE", "UTC")
settings: dict[str, str | dict[str, SQLAlchemyJobStore] | dict[str, bool] | AsyncIOScheduler] = get_settings(use_dotenv=False)
assert_msg: str = f"Expected /jobs.sqlite, got {settings['sqlite_location']}"
assert settings["sqlite_location"] == "/jobs.sqlite", assert_msg
assert_msg = f"Expected UTC, got {settings['config_timezone']}"
assert settings["config_timezone"] == "UTC", assert_msg
assert_msg = f"Expected default_token, got {settings['bot_token']}"
assert settings["bot_token"] == "default_token", assert_msg # noqa: S105
assert_msg = f"Expected INFO, got {settings['log_level']}"
assert settings["log_level"] == "INFO", assert_msg
assert_msg = f"Expected empty string, got {settings['webhook_url']}"
assert not settings["webhook_url"], assert_msg
assert_msg = f"Expected dict, got {type(settings['jobstores'])}"
assert isinstance(settings["jobstores"], dict), assert_msg
assert_msg = f"Expected SQLAlchemyJobStore, got {type(settings['jobstores']['default'])}"
assert isinstance(settings["jobstores"]["default"], SQLAlchemyJobStore), assert_msg
assert_msg = f"Expected AsyncIOScheduler, got {type(settings['scheduler'])}"
assert isinstance(settings["scheduler"], AsyncIOScheduler), assert_msg