Add /remind backup and /remind restore commands

This commit is contained in:
2025-01-18 23:50:27 +01:00
parent 4b3acc0948
commit 07297e5a56
3 changed files with 197 additions and 65 deletions
discord_reminder_bot

@ -1,7 +1,10 @@
from __future__ import annotations
import datetime
import json
import logging
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING
import discord
@ -446,6 +449,169 @@ class RemindGroup(discord.app_commands.Group):
f"First run in {calculate(channel_job)} with the message:\n**{message}**.",
)
# /remind backup
@discord.app_commands.command(name="backup", description="Backup all reminders to a file.")
async def backup(self, interaction: discord.Interaction, all_servers: bool = False) -> None: # noqa: FBT001, FBT002, PLR6301
"""Backup all reminders to a file.
Args:
interaction (discord.Interaction): The interaction object for the command.
all_servers (bool): Backup all servers or just the current server. Defaults to only the current server.
"""
await interaction.response.defer()
# Retrieve all jobs
with tempfile.NamedTemporaryFile(mode="r+", delete=False, encoding="utf-8", suffix=".json") as temp_file:
# Export jobs to a temporary file
settings.scheduler.export_jobs(temp_file.name)
# Load the exported jobs
temp_file.seek(0)
jobs_data = json.load(temp_file)
# Amount of jobs before filtering
amount_of_jobs: int = len(jobs_data.get("jobs", []))
if not all_servers:
interaction_channel: InteractionChannel | None = interaction.channel
if not interaction_channel:
await interaction.followup.send(content="Failed to get channel.", ephemeral=True)
return
if not isinstance(interaction.user, discord.Member):
await interaction.followup.send(content="Failed to get user.", ephemeral=True)
return
is_admin: bool = interaction_channel.permissions_for(interaction.user).administrator
if not is_admin:
await interaction.followup.send(content="You must be an administrator to backup all servers.", ephemeral=True)
return
# Can't be 0 because that's the default value for jobs without a guild
guild_id: int = interaction.guild.id if interaction.guild else -1
channels_in_this_guild: list[int] = [c.id for c in interaction.guild.channels] if interaction.guild else []
logger.debug("Guild ID: %s, Channels in this guild: %s", guild_id, channels_in_this_guild)
for job in jobs_data.get("jobs", []):
# Check if the job is in the current guild
job_guild_id = job.get("kwargs", {}).get("guild_id", 0)
if job_guild_id and job_guild_id != guild_id:
logger.debug("Removing job: %s because it's not in the current guild. %s vs %s", job.get("id"), job_guild_id, guild_id)
jobs_data["jobs"].remove(job)
# Check if the channel is in the current guild
if job.get("kwargs", {}).get("channel_id") not in channels_in_this_guild:
logger.debug("Removing job: %s because it's not in the current guild's channels.", job.get("id"))
jobs_data["jobs"].remove(job)
msg: str = "All reminders in this server have been backed up." if not all_servers else "All reminders have been backed up."
msg += "\nYou can restore them using `/remind restore`."
if not all_servers:
msg += f"\nAmount of jobs on all servers: {amount_of_jobs}, in this server: {len(jobs_data.get('jobs', []))}"
msg += "\nYou can use /remind backup all_servers:True to backup all servers."
# Write the data to a new file
with tempfile.NamedTemporaryFile(mode="w", delete=False, encoding="utf-8", suffix=".json") as output_file:
file_name = f"reminders-backup-{datetime.datetime.now(tz=settings.scheduler.timezone)}.json"
json.dump(jobs_data, output_file, indent=4)
output_file.seek(0)
await interaction.followup.send(content=msg, file=discord.File(output_file.name, filename=file_name))
# /remind restore
@discord.app_commands.command(name="restore", description="Restore reminders from a file.")
async def restore(self, interaction: discord.Interaction) -> None: # noqa: PLR6301
"""Restore reminders from a file.
Args:
interaction (discord.Interaction): The interaction object for the command.
"""
await interaction.response.defer()
logger.info("Restoring reminders from file for %s (%s) in %s", interaction.user, interaction.user.id, interaction.channel)
# Get the old jobs
old_jobs: list[Job] = settings.scheduler.get_jobs()
# Tell to reply with the file to this message
await interaction.followup.send(content="Please reply to this message with the backup file.")
while True:
# Wait for the reply
try:
reply: discord.Message | None = await bot.wait_for("message", timeout=60, check=lambda m: m.author == interaction.user)
except TimeoutError:
# Modify the original message to say we timed out
await interaction.edit_original_response(
content=("~~Please reply to this message with the backup file.~~\nTimed out after 60 seconds."),
)
return
# Fetch the message by its ID to ensure we have the latest data
reply = await reply.channel.fetch_message(reply.id)
if not reply or not reply.attachments:
await interaction.followup.send(content="No file attached. Please try again.")
continue
break
# Get the attachment
attachment: discord.Attachment = reply.attachments[0]
if not attachment or not attachment.filename.endswith(".json"):
await interaction.followup.send(
content=f"Invalid file type. Should be a JSON file not '{attachment.filename}'. Please try again.",
)
return
jobs_already_exist: list[str] = []
# Save the file to a temporary file and import the jobs
with tempfile.NamedTemporaryFile(mode="w+", delete=False, encoding="utf-8", suffix=".json") as temp_file:
logger.info("Saving attachment to %s", temp_file.name)
await attachment.save(Path(temp_file.name))
# Load the jobs data from the file
temp_file.seek(0)
jobs_data: dict = json.load(temp_file)
logger.info("Importing jobs from file")
logger.debug("Jobs data: %s", jobs_data)
with tempfile.NamedTemporaryFile(mode="w+", delete=False, encoding="utf-8", suffix=".json") as temp_import_file:
# We can't import jobs with the same ID so remove them from the JSON
jobs = [job for job in jobs_data.get("jobs", []) if not settings.scheduler.get_job(job.get("id"))]
jobs_already_exist = [job.get("id") for job in jobs_data.get("jobs", []) if settings.scheduler.get_job(job.get("id"))]
jobs_data["jobs"] = jobs
for job_id in jobs_already_exist:
logger.debug("Removed job: %s because it already exists.", job_id)
logger.debug("Jobs data after removing existing jobs: %s", jobs_data)
logger.info("Jobs already exist: %s", jobs_already_exist)
# Write the new data to a temporary file
json.dump(jobs_data, temp_import_file)
temp_import_file.seek(0)
# Import the jobs
settings.scheduler.import_jobs(temp_import_file.name)
# Get the new jobs
new_jobs: list[Job] = settings.scheduler.get_jobs()
# Get the difference
added_jobs: list[Job] = [job for job in new_jobs if job not in old_jobs]
if added_jobs:
job_info: str = ""
for j in added_jobs:
job_info += f"\n• Message: {j.kwargs.get('message', 'No message found')} | Countdown: {calculate(j) or 'N/A'}"
msg: str = f"Reminders restored successfully.\nAdded jobs:\n{job_info}"
await interaction.followup.send(content=msg)
else:
await interaction.followup.send(content="No new reminders were added. Note that only jobs for this server will be restored.")
intents: discord.Intents = discord.Intents.default()
bot = RemindBotClient(intents=intents)