Move everything out of main.py

This commit is contained in:
2025-04-26 19:57:21 +02:00
parent 0cb2a596b5
commit 0eb7a26439
16 changed files with 1287 additions and 796 deletions

View File

@ -1,226 +1,42 @@
from __future__ import annotations
import datetime
import json
import os
import platform
import tempfile
from functools import lru_cache
from pathlib import Path
from typing import TYPE_CHECKING, Any
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
import dateparser
import discord
import pytz
import sentry_sdk
from apscheduler import events
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_MISSED, JobExecutionEvent
from apscheduler.job import Job
from apscheduler.jobstores.base import JobLookupError
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
from discord.abc import PrivateChannel
from discord_webhook import DiscordWebhook
from dotenv import load_dotenv
from loguru import logger
from interactions.api.models.misc import Snowflake
from discord_reminder_bot.commands.add import add_reminder_job
from discord_reminder_bot.commands.backup import backup_reminder_job
from discord_reminder_bot.commands.cron import cron_reminder_job
from discord_reminder_bot.commands.event import add_discord_event
from discord_reminder_bot.commands.interval import interval_reminder_job
from discord_reminder_bot.commands.list import list_reminder_job
from discord_reminder_bot.commands.pause import pause_reminder_job
from discord_reminder_bot.commands.remove import remove_reminder_job
from discord_reminder_bot.commands.restore import restore_reminder_job
from discord_reminder_bot.commands.unpause import unpause_reminder
from discord_reminder_bot.settings import scheduler
if TYPE_CHECKING:
from apscheduler.job import Job
from discord.guild import GuildChannel
from discord.interactions import InteractionChannel
from discord.types.channel import _BaseChannel
from requests import Response
load_dotenv()
default_sentry_dsn: str = "https://c4c61a52838be9b5042144420fba5aaa@o4505228040339456.ingest.us.sentry.io/4508707268984832"
sentry_sdk.init(
dsn=os.getenv("SENTRY_DSN", default_sentry_dsn),
environment=platform.node() or "Unknown",
traces_sample_rate=1.0,
send_default_pii=True,
)
def generate_markdown_state(state: dict[str, Any]) -> str:
"""Format the __getstate__ dictionary for Discord markdown.
Args:
state (dict): The __getstate__ dictionary.
Returns:
str: The formatted string.
"""
if not state:
return "```json\nNo state found.\n```"
# discord.app_commands.errors.CommandInvokeError: Command 'remove' raised an exception: TypeError: Object of type IntervalTrigger is not JSON serializable
# Convert the IntervalTrigger to a string representation
for key, value in state.items():
if isinstance(value, IntervalTrigger):
state[key] = "IntervalTrigger"
elif isinstance(value, DateTrigger):
state[key] = "DateTrigger"
elif isinstance(value, Job):
state[key] = "Job"
elif isinstance(value, Snowflake):
state[key] = str(value)
try:
msg: str = json.dumps(state, indent=4, default=str)
except TypeError as e:
e.add_note("This is likely due to a non-serializable object in the state. Please check the state for any non-serializable objects.")
e.add_note(f"{state=}")
logger.error(f"Failed to serialize state: {e}")
return "```json\nFailed to serialize state.\n```"
return "```json\n" + msg + "\n```"
def parse_time(date_to_parse: str | None, timezone: str | None = os.getenv("TIMEZONE")) -> datetime.datetime | None:
"""Parse a date string into a datetime object.
Args:
date_to_parse(str): The date string to parse.
timezone(str, optional): The timezone to use. Defaults to the TIMEZONE environment variable.
Returns:
datetime.datetime: The parsed datetime object.
"""
if not date_to_parse:
logger.error("No date provided to parse.")
return None
if not timezone:
logger.error("No timezone provided to parse date.")
return None
logger.info(f"Parsing date: '{date_to_parse}' with timezone: '{timezone}'")
try:
parsed_date: datetime.datetime | None = dateparser.parse(
date_string=date_to_parse,
settings={
"PREFER_DATES_FROM": "future",
"TIMEZONE": f"{timezone}",
"RETURN_AS_TIMEZONE_AWARE": True,
"RELATIVE_BASE": datetime.datetime.now(tz=ZoneInfo(str(timezone))),
},
)
except (ValueError, TypeError) as e:
logger.error(f"Failed to parse date: '{date_to_parse}' with timezone: '{timezone}'. Error: {e}")
return None
logger.debug(f"Parsed date: {parsed_date} from '{date_to_parse}'")
return parsed_date
def calculate(job: Job) -> str:
"""Calculate the time left for a job.
Args:
job: The job to calculate the time for.
Returns:
str: The time left for the job or "Paused" if the job is paused or has no next run time.
"""
trigger_time = None
if isinstance(job.trigger, DateTrigger | IntervalTrigger):
trigger_time = job.next_run_time or None
elif isinstance(job.trigger, CronTrigger):
if not job.next_run_time:
logger.debug(f"No next run time found for '{job.id}', probably paused? {job.__getstate__()}")
return "Paused"
trigger_time = job.trigger.get_next_fire_time(None, datetime.datetime.now(tz=job._scheduler.timezone)) # noqa: SLF001
logger.debug(f"{type(job.trigger)=}, {trigger_time=}")
if not trigger_time:
logger.debug("No trigger time found")
return "Paused"
return f"<t:{int(trigger_time.timestamp())}:R>"
def get_human_readable_time(job: Job) -> str:
"""Get the human-readable time for a job.
Args:
job: The job to get the time for.
Returns:
str: The human-readable time.
"""
trigger_time = None
if isinstance(job.trigger, DateTrigger | IntervalTrigger):
trigger_time = job.next_run_time or None
elif isinstance(job.trigger, CronTrigger):
if not job.next_run_time:
logger.debug(f"No next run time found for '{job.id}', probably paused? {job.__getstate__()}")
return "Paused"
trigger_time = job.trigger.get_next_fire_time(None, datetime.datetime.now(tz=job._scheduler.timezone)) # noqa: SLF001
if not trigger_time:
logger.debug("No trigger time found")
return "Paused"
return trigger_time.strftime("%Y-%m-%d %H:%M:%S")
@lru_cache(maxsize=1)
def get_scheduler() -> AsyncIOScheduler:
"""Return the scheduler instance.
Uses the SQLITE_LOCATION environment variable for the SQLite database location.
Raises:
ValueError: If the timezone is missing or invalid.
Returns:
AsyncIOScheduler: The scheduler instance.
"""
config_timezone: str | None = os.getenv("TIMEZONE")
if not config_timezone:
msg = "Missing timezone. Please set the TIMEZONE environment variable."
raise ValueError(msg)
# Test if the timezone is valid
try:
ZoneInfo(config_timezone)
except (ZoneInfoNotFoundError, ModuleNotFoundError) as e:
msg: str = f"Invalid timezone: {config_timezone}. Error: {e}"
raise ValueError(msg) from e
sqlite_location: str = os.getenv("SQLITE_LOCATION", default="/jobs.sqlite")
logger.info(f"Using SQLite database at: {sqlite_location}")
jobstores: dict[str, SQLAlchemyJobStore] = {"default": SQLAlchemyJobStore(url=f"sqlite://{sqlite_location}")}
job_defaults: dict[str, bool] = {"coalesce": True}
timezone = pytz.timezone(config_timezone)
return AsyncIOScheduler(jobstores=jobstores, timezone=timezone, job_defaults=job_defaults)
scheduler: AsyncIOScheduler = get_scheduler()
def my_listener(event: JobExecutionEvent) -> None:
"""Listener for job events.
Args:
event: The event that occurred.
"""
# TODO(TheLovinator): We should save the job state to a file and send it to Discord. # noqa: TD003
if event.code == events.EVENT_JOB_MISSED:
scheduled_time: str = event.scheduled_run_time.strftime("%Y-%m-%d %H:%M:%S")
msg: str = f"Job {event.job_id} was missed! Was scheduled at {scheduled_time}"
@ -250,10 +66,7 @@ class RemindBotClient(discord.Client):
async def on_error(self, event_method: str, *args: list[Any], **kwargs: dict[str, Any]) -> None:
"""Log errors that occur in the bot."""
# Log the error
logger.exception(f"An error occurred in {event_method} with args: {args} and kwargs: {kwargs}")
# Add context to Sentry
with sentry_sdk.push_scope() as scope:
# Add event details
scope.set_tag("event_method", event_method)
@ -319,97 +132,6 @@ class RemindBotClient(discord.Client):
await self.tree.sync()
def generate_reminder_summary(ctx: discord.Interaction) -> list[str]: # noqa: PLR0912
"""Create a message with all the jobs, splitting messages into chunks of up to 2000 characters.
Args:
ctx (discord.Interaction): The context of the interaction.
Returns:
list[str]: A list of messages with all the jobs.
"""
jobs: list[Job] = scheduler.get_jobs()
msgs: list[str] = []
guild: discord.Guild | None = None
if isinstance(ctx.channel, discord.abc.GuildChannel):
guild = ctx.channel.guild
channels: list[GuildChannel] | list[_BaseChannel] = list(guild.channels) if guild else []
channels_in_this_guild: list[int] = [c.id for c in channels]
jobs_in_guild: list[Job] = []
for job in jobs:
guild_id: int = guild.id if guild else -1
guild_id_from_kwargs = int(job.kwargs.get("guild_id", 0))
if guild_id_from_kwargs and guild_id_from_kwargs != guild_id:
logger.debug(f"Skipping job: {job.id} because it's not in the current guild.")
continue
if job.kwargs.get("channel_id") not in channels_in_this_guild:
logger.debug(f"Skipping job: {job.id} because it's not in the current guild.")
continue
logger.debug(f"Adding job: {job.id} to the list.")
jobs_in_guild.append(job)
if len(jobs) != len(jobs_in_guild):
logger.info(f"Filtered out {len(jobs) - len(jobs_in_guild)} jobs that are not in the current guild.")
jobs = jobs_in_guild
if not jobs:
return ["No scheduled jobs found in the database."]
header = (
"You can use the following commands to manage reminders:\n"
"Only jobs in the current guild are shown.\n"
"`/remind pause <job_id>` - Pause a reminder\n"
"`/remind unpause <job_id>` - Unpause a reminder\n"
"`/remind remove <job_id>` - Remove a reminder\n"
"`/remind modify <job_id>` - Modify the time of a reminder\n"
"List of all reminders:\n"
)
current_msg: str = header
for job in jobs:
# Build job-specific message
job_msg: str = "```md\n"
job_msg += f"# {job.kwargs.get('message', '')}\n"
job_msg += f" * {job.id}\n"
job_msg += f" * {job.trigger} {get_human_readable_time(job)}"
if job.kwargs.get("user_id"):
job_msg += f" <@{job.kwargs.get('user_id')}>"
if job.kwargs.get("channel_id"):
channel = bot.get_channel(job.kwargs.get("channel_id"))
if channel and isinstance(channel, discord.abc.GuildChannel | discord.Thread):
job_msg += f" in #{channel.name}"
if job.kwargs.get("guild_id"):
guild = bot.get_guild(job.kwargs.get("guild_id"))
if guild:
job_msg += f" in {guild.name}"
job_msg += f" {job.kwargs.get('guild_id')}"
job_msg += "```"
# If adding this job exceeds 2000 characters, push the current message and start a new one.
if len(current_msg) + len(job_msg) > 2000:
msgs.append(current_msg)
current_msg = job_msg
else:
current_msg += job_msg
# Append any remaining content in current_msg.
if current_msg:
msgs.append(current_msg)
return msgs
class RemindGroup(discord.app_commands.Group):
"""Base class for the /remind commands."""
@ -440,76 +162,14 @@ class RemindGroup(discord.app_commands.Group):
dm_and_current_channel (bool, optional): Send reminder as a DM to the user and in this channel. Defaults to False.
"""
await interaction.response.defer()
logger.info(f"New reminder from {interaction.user} ({interaction.user.id}) in {interaction.channel}")
logger.info(f"Arguments: {locals()}")
# Check if we have access to the specified channel or the current channel
target_channel: InteractionChannel | discord.TextChannel | None = channel or interaction.channel
if target_channel and interaction.guild and not target_channel.permissions_for(interaction.guild.me).send_messages:
await interaction.followup.send(content=f"I don't have permission to send messages in <#{target_channel.id}>.", ephemeral=True)
# Get the channel ID
channel_id: int | None = channel.id if channel else (interaction.channel.id if interaction.channel else None)
if not channel_id:
await interaction.followup.send(content="Failed to get channel.", ephemeral=True)
return
# Ensure the guild is valid
guild: discord.Guild | None = interaction.guild or None
if not guild:
await interaction.followup.send(content="Failed to get guild.", ephemeral=True)
return
dm_message: str = ""
if user:
parsed_time: datetime.datetime | None = parse_time(date_to_parse=time)
if not parsed_time:
await interaction.followup.send(content=f"Failed to parse time: {time}.", ephemeral=True)
return
user_reminder: Job = scheduler.add_job(
func=send_to_user,
trigger="date",
run_date=parsed_time,
kwargs={
"user_id": user.id,
"guild_id": guild.id,
"message": message,
},
)
logger.info(f"User reminder job created: {user_reminder} for {user.id} at {parsed_time}")
dm_message = f" and a DM to {user.display_name}"
if not dm_and_current_channel:
msg = (
f"Hello {interaction.user.display_name},\n"
f"I will send a DM to {user.display_name} at:\n"
f"First run in {calculate(user_reminder)} with the message:\n**{message}**."
)
await interaction.followup.send(content=msg)
return
# Create channel reminder job
channel_job: Job = scheduler.add_job(
func=send_to_discord,
trigger="date",
run_date=parse_time(date_to_parse=time),
kwargs={
"channel_id": channel_id,
"message": message,
"author_id": interaction.user.id,
},
return await add_reminder_job(
interaction=interaction,
message=message,
time=time,
channel=channel,
user=user,
dm_and_current_channel=dm_and_current_channel,
)
logger.info(f"Channel reminder job created: {channel_job} for {channel_id}")
msg: str = (
f"Hello {interaction.user.display_name},\n"
f"I will notify you in <#{channel_id}>{dm_message}.\n"
f"First run in {calculate(channel_job)} with the message:\n**{message}**."
)
await interaction.followup.send(content=msg)
# /remind event
@discord.app_commands.command(name="event", description="Add a new Discord event.")
@ -533,69 +193,15 @@ class RemindGroup(discord.app_commands.Group):
location (str, optional): The location of the scheduled event.
"""
await interaction.response.defer()
logger.info(f"New event from {interaction.user} ({interaction.user.id}) in {interaction.channel}")
logger.info(f"Arguments: {locals()}")
# Check if we have a valid guild
guild: discord.Guild | None = interaction.guild
if not guild:
await interaction.followup.send(content="This command can only be used in a server.", ephemeral=True)
return
# Check if we have permission to create events
if not guild.me.guild_permissions.create_events:
await interaction.followup.send(content="I don't have permission to create events in this guild.", ephemeral=True)
return
event_start_time: datetime.datetime | None = parse_time(date_to_parse=event_start)
event_end_time: datetime.datetime | None = parse_time(date_to_parse=event_end)
if not event_start_time or not event_end_time:
await interaction.followup.send(content=f"Failed to parse time: {event_start} or {event_end}.", ephemeral=True)
return
# If event_start_time is in the past, make it now + 5 seconds
start_immediately: bool = False
if event_start_time < datetime.datetime.now(event_start_time.tzinfo):
start_immediately = True
event_start_time = datetime.datetime.now(event_start_time.tzinfo) + datetime.timedelta(seconds=5)
await interaction.followup.send(content="Event start time was in the past. Starting event in 5 seconds instead.")
reason_msg: str = f"Event created by {interaction.user} ({interaction.user.id})."
event: discord.ScheduledEvent = await guild.create_scheduled_event(
name=message,
start_time=event_start_time,
entity_type=discord.EntityType.external,
privacy_level=discord.PrivacyLevel.guild_only,
end_time=event_end_time,
reason=reason or reason_msg,
return await add_discord_event(
interaction=interaction,
message=message,
event_start=event_start,
event_end=event_end,
location=location,
reason=reason,
)
if start_immediately:
await event.start()
msg: str = f"Event '{event.name}' created successfully!\n"
if event.start_time:
msg += f"Start Time: <t:{int(event.start_time.timestamp())}:R>\n"
if event.end_time:
msg += f"End Time: <t:{int(event.end_time.timestamp())}:R>\n"
if event.channel_id:
msg += f"Channel: <#{event.channel_id}>\n"
if event.location:
msg += f"Location: {event.location}\n"
if event.creator_id:
msg += f"Created by: <@{event.creator_id}>"
await interaction.followup.send(content=msg)
# /remind list
@discord.app_commands.command(name="list", description="List, pause, unpause, and remove reminders.")
async def list(self, interaction: discord.Interaction) -> None:
@ -605,39 +211,10 @@ class RemindGroup(discord.app_commands.Group):
interaction(discord.Interaction): The interaction object for the command.
"""
await interaction.response.defer()
user: discord.User | discord.Member = interaction.user
if not isinstance(user, discord.Member):
await interaction.followup.send(content="This command can only be used in a server.", ephemeral=True)
return
channel = interaction.channel
if not isinstance(channel, discord.TextChannel):
await interaction.followup.send(content="This command can only be used in a text channel.", ephemeral=True)
return
logger.info(f"Listing reminders for {user} ({user.id}) in {interaction.channel}")
logger.info(f"Arguments: {locals()}")
jobs: list[Job] = scheduler.get_jobs()
if not jobs:
await interaction.followup.send(content="No scheduled jobs found in the database.", ephemeral=True)
return
guild: discord.Guild | None = interaction.guild
if not guild:
await interaction.followup.send(content="Failed to get guild.", ephemeral=True)
return
message: discord.InteractionMessage = await interaction.original_response()
job_summary: list[str] = generate_reminder_summary(ctx=interaction)
for i, msg in enumerate(job_summary):
if i == 0:
await message.edit(content=msg)
else:
await interaction.followup.send(content=msg)
return await list_reminder_job(
interaction=interaction,
scheduler=scheduler,
)
# /remind cron
@discord.app_commands.command(name="cron", description="Create new cron job. Works like UNIX cron.")
@ -685,61 +262,10 @@ class RemindGroup(discord.app_commands.Group):
dm_and_current_channel (bool, optional): If user is provided, send reminder as a DM to the user and in this channel. Defaults to only the user.
"""
await interaction.response.defer()
# Log kwargs
logger.info(f"New cron job from {interaction.user} ({interaction.user.id}) in {interaction.channel}")
logger.info(f"Cron job arguments: {locals()}")
# Get the channel ID
channel_id: int | None = channel.id if channel else (interaction.channel.id if interaction.channel else None)
if not channel_id:
await interaction.followup.send(content="Failed to get channel.", ephemeral=True)
return
# Ensure the guild is valid
guild: discord.Guild | None = interaction.guild or None
if not guild:
await interaction.followup.send(content="Failed to get guild.", ephemeral=True)
return
# Create user DM reminder job if user is specified
dm_message: str = ""
if user:
user_reminder: Job = scheduler.add_job(
func=send_to_user,
trigger="cron",
year=year,
month=month,
day=day,
week=week,
day_of_week=day_of_week,
hour=hour,
minute=minute,
second=second,
start_date=start_date,
end_date=end_date,
timezone=timezone,
jitter=jitter,
kwargs={
"user_id": user.id,
"guild_id": guild.id,
"message": message,
},
)
dm_message = f" and a DM to {user.display_name}"
if not dm_and_current_channel:
await interaction.followup.send(
content=f"Hello {interaction.user.display_name},\n"
f"I will send a DM to {user.display_name} at:\n"
f"First run in {calculate(user_reminder)} with the message:\n**{message}**.",
)
return
# Create channel reminder job
channel_job: Job = scheduler.add_job(
func=send_to_discord,
trigger="cron",
return await cron_reminder_job(
interaction=interaction,
scheduler=scheduler,
message=message,
year=year,
month=month,
day=day,
@ -752,17 +278,9 @@ class RemindGroup(discord.app_commands.Group):
end_date=end_date,
timezone=timezone,
jitter=jitter,
kwargs={
"channel_id": channel_id,
"message": message,
"author_id": interaction.user.id,
},
)
await interaction.followup.send(
content=f"Hello {interaction.user.display_name},\n"
f"I will notify you in <#{channel_id}>{dm_message}.\n"
f"First run in {calculate(channel_job)} with the message:\n**{message}**.",
channel=channel,
user=user,
dm_and_current_channel=dm_and_current_channel,
)
# /remind interval
@ -806,69 +324,9 @@ class RemindGroup(discord.app_commands.Group):
dm_and_current_channel (bool, optional): If user is provided, send reminder as a DM to the user and in this channel. Defaults to only the user.
"""
await interaction.response.defer()
logger.info(f"New interval job from {interaction.user} ({interaction.user.id}) in {interaction.channel}")
logger.info(f"Arguments: {locals()}")
# Only allow intervals of 30 seconds or more so we don't spam the channel
if weeks == days == hours == minutes == 0 and seconds < 30:
await interaction.followup.send(content="Interval must be at least 30 seconds.", ephemeral=True)
return
# Check if we have access to the specified channel or the current channel
target_channel: InteractionChannel | None = channel or interaction.channel
if target_channel and interaction.guild and not target_channel.permissions_for(interaction.guild.me).send_messages:
await interaction.followup.send(
content=f"I don't have permission to send messages in <#{target_channel.id}>.",
ephemeral=True,
)
# Get the channel ID
channel_id: int | None = channel.id if channel else (interaction.channel.id if interaction.channel else None)
if not channel_id:
await interaction.followup.send(content="Failed to get channel.", ephemeral=True)
return
# Ensure the guild is valid
guild: discord.Guild | None = interaction.guild or None
if not guild:
await interaction.followup.send(content="Failed to get guild.", ephemeral=True)
return
# Create user DM reminder job if user is specified
dm_message: str = ""
if user:
dm_job: Job = scheduler.add_job(
func=send_to_user,
trigger="interval",
weeks=weeks,
days=days,
hours=hours,
minutes=minutes,
seconds=seconds,
start_date=start_date,
end_date=end_date,
timezone=timezone,
jitter=jitter,
kwargs={
"user_id": user.id,
"guild_id": guild.id,
"message": message,
},
)
dm_message = f" and a DM to {user.display_name} "
if not dm_and_current_channel:
await interaction.followup.send(
content=f"Hello {interaction.user.display_name},\n"
f"I will send a DM to {user.display_name} at:\n"
f"First run in {calculate(dm_job)} with the message:\n**{message}**.",
)
# Create channel reminder job
channel_job: Job = scheduler.add_job(
func=send_to_discord,
trigger="interval",
return await interval_reminder_job(
interaction=interaction,
message=message,
weeks=weeks,
days=days,
hours=hours,
@ -878,17 +336,9 @@ class RemindGroup(discord.app_commands.Group):
end_date=end_date,
timezone=timezone,
jitter=jitter,
kwargs={
"channel_id": channel_id,
"message": message,
"author_id": interaction.user.id,
},
)
await interaction.followup.send(
content=f"Hello {interaction.user.display_name},\n"
f"I will notify you in <#{channel_id}>{dm_message}.\n"
f"First run in {calculate(channel_job)} with the message:\n**{message}**.",
channel=channel,
user=user,
dm_and_current_channel=dm_and_current_channel,
)
# /remind backup
@ -901,70 +351,11 @@ class RemindGroup(discord.app_commands.Group):
all_servers (bool): Backup all servers or just the current server. Defaults to only the current server.
"""
await interaction.response.defer()
logger.info(f"Backing up reminders for {interaction.user} ({interaction.user.id}) in {interaction.channel}")
logger.info(f"Arguments: {locals()}")
# Retrieve all jobs
with tempfile.NamedTemporaryFile(mode="r+", delete=False, encoding="utf-8", suffix=".json") as temp_file:
# Export jobs to a temporary file
scheduler.export_jobs(temp_file.name)
# Load the exported jobs
temp_file.seek(0)
jobs_data = json.load(temp_file)
# Amount of jobs before filtering
amount_of_jobs: int = len(jobs_data.get("jobs", []))
if not all_servers:
interaction_channel: InteractionChannel | None = interaction.channel
if not interaction_channel:
await interaction.followup.send(content="Failed to get channel.", ephemeral=True)
return
if not isinstance(interaction.user, discord.Member):
await interaction.followup.send(content="Failed to get user.", ephemeral=True)
return
is_admin: bool = interaction_channel.permissions_for(interaction.user).administrator
if not is_admin:
await interaction.followup.send(content="You must be an administrator to backup all servers.", ephemeral=True)
return
# Can't be 0 because that's the default value for jobs without a guild
guild_id: int = interaction.guild.id if interaction.guild else -1
channels_in_this_guild: list[int] = [c.id for c in interaction.guild.channels] if interaction.guild else []
logger.debug(f"Guild ID: {guild_id}")
for job in jobs_data.get("jobs", []):
# Check if the job is in the current guild
job_guild_id = int(job.get("kwargs", {}).get("guild_id", 0))
if job_guild_id and job_guild_id != guild_id:
logger.debug(f"Skipping job: {job.get('id')} because it's not in the current guild.")
jobs_data["jobs"].remove(job)
# Check if the channel is in the current guild
if job.get("kwargs", {}).get("channel_id") not in channels_in_this_guild:
logger.debug(f"Skipping job: {job.get('id')} because it's not in the current guild.")
jobs_data["jobs"].remove(job)
msg: str = "All reminders in this server have been backed up." if not all_servers else "All reminders have been backed up."
msg += "\nYou can restore them using `/remind restore`."
if not all_servers:
msg += f"\nAmount of jobs on all servers: {amount_of_jobs}, in this server: {len(jobs_data.get('jobs', []))}"
msg += "\nYou can use `/remind backup all_servers:True` to backup all servers."
else:
msg += f"\nAmount of jobs: {amount_of_jobs}"
# Write the data to a new file
with tempfile.NamedTemporaryFile(mode="w", delete=False, encoding="utf-8", suffix=".json") as output_file:
file_name: str = f"reminders-backup-{datetime.datetime.now(tz=scheduler.timezone)}.json"
json.dump(jobs_data, output_file, indent=4)
output_file.seek(0)
await interaction.followup.send(content=msg, file=discord.File(output_file.name, filename=file_name))
return await backup_reminder_job(
interaction=interaction,
scheduler=scheduler,
all_servers=all_servers,
)
# /remind restore
@discord.app_commands.command(name="restore", description="Restore reminders from a file.")
@ -975,91 +366,11 @@ class RemindGroup(discord.app_commands.Group):
interaction (discord.Interaction): The interaction object for the command.
"""
await interaction.response.defer()
logger.info(f"Restoring reminders from file for {interaction.user} ({interaction.user.id}) in {interaction.channel}")
logger.info(f"Arguments: {locals()}")
# Tell to reply with the file to this message
await interaction.followup.send(content="Please reply to this message with the backup file.")
# Get the old jobs
old_jobs: list[Job] = scheduler.get_jobs()
# Wait for the reply
while True:
try:
reply: discord.Message | None = await bot.wait_for("message", timeout=60, check=lambda m: m.author == interaction.user)
except TimeoutError:
edit_msg = "~~Please reply to this message with the backup file.~~\nTimed out after 60 seconds."
await interaction.edit_original_response(content=edit_msg)
return
if not reply.channel:
await interaction.followup.send(content="No channel found. Please try again.")
continue
# Fetch the message by its ID to ensure we have the latest data
reply = await reply.channel.fetch_message(reply.id)
if not reply or not reply.attachments:
await interaction.followup.send(content="No file attached. Please try again.")
continue
break
# Get the attachment
attachment: discord.Attachment = reply.attachments[0]
if not attachment or not attachment.filename.endswith(".json"):
await interaction.followup.send(
content=f"Invalid file type. Should be a JSON file not '{attachment.filename}'. Please try again.",
)
return
jobs_already_exist: list[str] = []
# Save the file to a temporary file and import the jobs
with tempfile.NamedTemporaryFile(mode="w+", delete=False, encoding="utf-8", suffix=".json") as temp_file:
logger.info(f"Saving attachment to {temp_file.name}")
await attachment.save(Path(temp_file.name))
# Load the jobs data from the file
temp_file.seek(0)
jobs_data: dict = json.load(temp_file)
logger.info("Importing jobs from file")
logger.debug(f"Jobs data: {jobs_data}")
with tempfile.NamedTemporaryFile(mode="w+", delete=False, encoding="utf-8", suffix=".json") as temp_import_file:
# We can't import jobs with the same ID so remove them from the JSON
jobs = [job for job in jobs_data.get("jobs", []) if not scheduler.get_job(job.get("id"))]
jobs_already_exist = [job.get("id") for job in jobs_data.get("jobs", []) if scheduler.get_job(job.get("id"))]
jobs_data["jobs"] = jobs
for job_id in jobs_already_exist:
logger.debug(f"Skipping importing '{job_id}' because it already exists in the db.")
logger.debug(f"Jobs data after filtering: {jobs_data}")
logger.info(f"Jobs already exist: {jobs_already_exist}")
# Write the new data to a temporary file
json.dump(jobs_data, temp_import_file)
temp_import_file.seek(0)
# Import the jobs
scheduler.import_jobs(temp_import_file.name)
# Get the new jobs
new_jobs: list[Job] = scheduler.get_jobs()
# Get the difference
added_jobs: list[Job] = [job for job in new_jobs if job not in old_jobs]
if added_jobs:
msg: str = "Reminders restored successfully.\nAdded jobs:\n"
for j in added_jobs:
msg += f"* Message: **{j.kwargs.get('message', 'No message found')}** {calculate(j) or 'N/A'}\n"
await interaction.followup.send(content=msg)
else:
await interaction.followup.send(content="No new reminders were added.")
return await restore_reminder_job(
bot=bot,
interaction=interaction,
scheduler=scheduler,
)
# /remind remove
@discord.app_commands.command(name="remove", description="Remove a reminder")
@ -1071,25 +382,11 @@ class RemindGroup(discord.app_commands.Group):
job_id (str): The identifier of the job to remove.
"""
await interaction.response.defer()
logger.debug(f"Removing reminder with ID {job_id} for {interaction.user} ({interaction.user.id}) in {interaction.channel}")
logger.debug(f"Arguments: {locals()}")
try:
job: Job | None = scheduler.get_job(job_id)
if not job:
await interaction.followup.send(content=f"Reminder with ID {job_id} not found.", ephemeral=True)
return
scheduler.remove_job(job_id)
logger.info(f"Removed job {job_id}. {job.__getstate__()}")
await interaction.followup.send(
content=f"Reminder with ID {job_id} removed successfully.\n{generate_markdown_state(job.__getstate__())}",
)
except JobLookupError as e:
logger.exception(f"Failed to remove job {job_id}")
await interaction.followup.send(content=f"Failed to remove reminder with ID {job_id}. {e}", ephemeral=True)
logger.info(f"Job {job_id} removed from the scheduler.")
return await remove_reminder_job(
interaction=interaction,
job_id=job_id,
scheduler=scheduler,
)
# /remind pause
@discord.app_commands.command(name="pause", description="Pause a reminder")
@ -1101,23 +398,11 @@ class RemindGroup(discord.app_commands.Group):
job_id (str): The identifier of the job to pause.
"""
await interaction.response.defer()
logger.debug(f"Pausing reminder with ID {job_id} for {interaction.user} ({interaction.user.id}) in {interaction.channel}")
logger.debug(f"Arguments: {locals()}")
try:
job: Job | None = scheduler.get_job(job_id)
if not job:
await interaction.followup.send(content=f"Reminder with ID {job_id} not found.", ephemeral=True)
return
scheduler.pause_job(job_id)
logger.info(f"Paused job {job_id}.")
await interaction.followup.send(content=f"Reminder with ID {job_id} paused successfully.")
except JobLookupError as e:
logger.exception(f"Failed to pause job {job_id}")
await interaction.followup.send(content=f"Failed to pause reminder with ID {job_id}. {e}", ephemeral=True)
logger.info(f"Job {job_id} paused in the scheduler.")
return await pause_reminder_job(
interaction=interaction,
job_id=job_id,
scheduler=scheduler,
)
# /remind unpause
@discord.app_commands.command(name="unpause", description="Unpause a reminder")
@ -1129,23 +414,11 @@ class RemindGroup(discord.app_commands.Group):
job_id (str): The identifier of the job to unpause.
"""
await interaction.response.defer()
logger.debug(f"Unpausing reminder with ID {job_id} for {interaction.user} ({interaction.user.id}) in {interaction.channel}")
logger.debug(f"Arguments: {locals()}")
try:
job: Job | None = scheduler.get_job(job_id)
if not job:
await interaction.followup.send(content=f"Reminder with ID {job_id} not found.", ephemeral=True)
return
scheduler.resume_job(job_id)
logger.info(f"Unpaused job {job_id}.")
await interaction.followup.send(content=f"Reminder with ID {job_id} unpaused successfully.")
except JobLookupError as e:
logger.exception(f"Failed to unpause job {job_id}")
await interaction.followup.send(content=f"Failed to unpause reminder with ID {job_id}. {e}", ephemeral=True)
logger.info(f"Job {job_id} unpaused in the scheduler.")
return await unpause_reminder(
interaction=interaction,
job_id=job_id,
scheduler=scheduler,
)
intents: discord.Intents = discord.Intents.default()