Refactor /remind list to use a normal embed and fix calculate() function

This commit is contained in:
2025-02-13 03:07:27 +01:00
parent f82c410097
commit 176a1a058c
5 changed files with 129 additions and 588 deletions

View File

@ -1,6 +1,5 @@
from __future__ import annotations from __future__ import annotations
import asyncio
import datetime import datetime
import json import json
import os import os
@ -21,7 +20,6 @@ from loguru import logger
from discord_reminder_bot.misc import calc_time, calculate from discord_reminder_bot.misc import calc_time, calculate
from discord_reminder_bot.parser import parse_time from discord_reminder_bot.parser import parse_time
from discord_reminder_bot.settings import get_bot_token, get_scheduler, get_webhook_url from discord_reminder_bot.settings import get_bot_token, get_scheduler, get_webhook_url
from discord_reminder_bot.ui import JobManagementView, create_job_embed
if TYPE_CHECKING: if TYPE_CHECKING:
from apscheduler.job import Job from apscheduler.job import Job
@ -40,7 +38,6 @@ sentry_sdk.init(
) )
scheduler: settings.AsyncIOScheduler = get_scheduler() scheduler: settings.AsyncIOScheduler = get_scheduler()
msg_to_cleanup: list[discord.InteractionMessage] = []
def my_listener(event: JobExecutionEvent) -> None: def my_listener(event: JobExecutionEvent) -> None:
@ -123,27 +120,6 @@ class RemindBotClient(discord.Client):
"""Log when the bot is ready.""" """Log when the bot is ready."""
logger.info(f"Logged in as {self.user} ({self.user.id if self.user else 'Unknown'})") logger.info(f"Logged in as {self.user} ({self.user.id if self.user else 'Unknown'})")
async def close(self) -> None:
"""Close the bot and cleanup views."""
logger.info("Closing bot and cleaning up views.")
for msg in msg_to_cleanup:
logger.debug(f"Removing view: {msg.id}")
try:
# If the message is "/remind list timed out.", skip it
if "`/remind list` timed out." in msg.content:
logger.debug(f"Message {msg.id} is a timeout message. Skipping.")
continue
await msg.delete()
except discord.HTTPException as e:
if e.status != 401:
# Skip if the webhook token is invalid
logger.error(f"Failed to remove view: {msg.id} - {e.text} - {e.status} - {e.code}")
except asyncio.exceptions.CancelledError:
logger.error("Failed to remove view: Task was cancelled.")
return await super().close()
async def setup_hook(self) -> None: async def setup_hook(self) -> None:
"""Setup the bot.""" """Setup the bot."""
scheduler.start() scheduler.start()
@ -168,6 +144,59 @@ class RemindBotClient(discord.Client):
await self.tree.sync() await self.tree.sync()
def generate_reminder_summary() -> list[str]:
"""Create a message with all the jobs.
Returns:
list[str]: A list of messages with all the jobs.
"""
jobs: list[Job] = scheduler.get_jobs()
msgs: list[str] = []
if not jobs:
return ["No scheduled jobs found in the database."]
header = (
"You can use the following commands to manage reminders:\n"
"`/remind pause <job_id>` - Pause a reminder\n"
"`/remind unpause <job_id>` - Unpause a reminder\n"
"`/remind remove <job_id>` - Remove a reminder\n\n"
"`/remind modify <job_id>` - Modify the time of a reminder\n\n"
"List of all reminders:\n\n"
)
current_msg = header
for job in jobs:
# Build job-specific message
job_msg: str = f"### {job.kwargs.get('message', '')}\n"
job_msg += f"**id:** {job.id}\n"
job_msg += f"**Job type:** {job.trigger}\n"
job_msg += f"**Next run time:** {calculate(job)}\n\n"
if job.kwargs.get("user_id"):
job_msg += f"**User:** <@{job.kwargs.get('user_id')}>\n"
if job.kwargs.get("channel_id"):
job_msg += f"**Channel:** <#{job.kwargs.get('channel_id')}>\n"
if job.kwargs.get("guild_id"):
job_msg += f"**Guild:** {job.kwargs.get('guild_id')}\n"
job_msg += "\n" # Extra newline for separation
# If adding this job exceeds 2000 characters, push the current message and start a new one.
if len(current_msg) + len(job_msg) > 2000:
msgs.append(current_msg)
current_msg = job_msg
else:
current_msg += job_msg
# Append any remaining content in current_msg.
if current_msg:
msgs.append(current_msg)
return msgs
class RemindGroup(discord.app_commands.Group): class RemindGroup(discord.app_commands.Group):
"""Group for remind commands.""" """Group for remind commands."""
@ -245,7 +274,6 @@ class RemindGroup(discord.app_commands.Group):
if not dm_and_current_channel: if not dm_and_current_channel:
msg = ( msg = (
f"Hello {interaction.user.display_name},\n" f"Hello {interaction.user.display_name},\n"
f"I parsed `{time}` as `{parsed_time}`. Timezone: `{scheduler.timezone}`\n"
f"I will send a DM to {user.display_name} at:\n" f"I will send a DM to {user.display_name} at:\n"
f"First run in {calculate(user_reminder)} with the message:\n**{message}**." f"First run in {calculate(user_reminder)} with the message:\n**{message}**."
) )
@ -253,11 +281,10 @@ class RemindGroup(discord.app_commands.Group):
return return
# Create channel reminder job # Create channel reminder job
parsed_time: datetime.datetime | None = parse_time(date_to_parse=time)
channel_job: Job = scheduler.add_job( channel_job: Job = scheduler.add_job(
func=send_to_discord, func=send_to_discord,
trigger="date", trigger="date",
run_date=parsed_time, run_date=parse_time(date_to_parse=time),
kwargs={ kwargs={
"channel_id": channel_id, "channel_id": channel_id,
"message": message, "message": message,
@ -268,7 +295,6 @@ class RemindGroup(discord.app_commands.Group):
msg: str = ( msg: str = (
f"Hello {interaction.user.display_name},\n" f"Hello {interaction.user.display_name},\n"
f"I parsed `{time}` as `{parsed_time}`. Timezone: `{scheduler.timezone}`\n"
f"I will notify you in <#{channel_id}>{dm_message}.\n" f"I will notify you in <#{channel_id}>{dm_message}.\n"
f"First run in {calculate(channel_job)} with the message:\n**{message}**." f"First run in {calculate(channel_job)} with the message:\n**{message}**."
) )
@ -396,12 +422,13 @@ class RemindGroup(discord.app_commands.Group):
message: discord.InteractionMessage = await interaction.original_response() message: discord.InteractionMessage = await interaction.original_response()
embed: discord.Embed = create_job_embed(job=jobs_in_guild[0]) job_summary: list[str] = generate_reminder_summary()
view = JobManagementView(job=jobs_in_guild[0], scheduler=scheduler, guild=guild, message=message)
msg_to_cleanup.append(message) for i, msg in enumerate(job_summary):
if i == 0:
await interaction.followup.send(embed=embed, view=view) await message.edit(content=msg)
else:
await interaction.followup.send(content=msg)
# /remind cron # /remind cron
@discord.app_commands.command(name="cron", description="Create new cron job. Works like UNIX cron.") @discord.app_commands.command(name="cron", description="Create new cron job. Works like UNIX cron.")

View File

@ -1,14 +1,13 @@
from __future__ import annotations from __future__ import annotations
import datetime
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import sentry_sdk from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.date import DateTrigger
from loguru import logger from apscheduler.triggers.interval import IntervalTrigger
if TYPE_CHECKING: if TYPE_CHECKING:
import datetime
from apscheduler.job import Job from apscheduler.job import Job
@ -21,21 +20,14 @@ def calculate(job: Job) -> str | None:
Returns: Returns:
str: The time left for the job. str: The time left for the job.
""" """
trigger_time: datetime.datetime | None = job.trigger.run_date if isinstance(job.trigger, DateTrigger) else job.next_run_time trigger_time = None
if isinstance(job.trigger, DateTrigger | IntervalTrigger):
trigger_time = job.next_run_time if hasattr(job, "next_run_time") else None
elif isinstance(job.trigger, CronTrigger):
trigger_time = job.trigger.get_next_fire_time(None, datetime.datetime.now(tz=job._scheduler.timezone)) # noqa: SLF001
# Check if the job is paused if not trigger_time:
if trigger_time is None: return "Paused"
with sentry_sdk.push_scope() as scope:
scope.set_tag("job_id", job.id)
scope.set_extra("job_state", job.__getstate__() if hasattr(job, "__getstate__") else "No state")
sentry_sdk.capture_exception(Exception("Couldn't calculate time for job"))
msg: str = f"Couldn't calculate time for job: {job.id}"
if hasattr(job, "__getstate__"):
msg += f"State: {job.__getstate__()}"
logger.error(msg)
return None
return f"<t:{int(trigger_time.timestamp())}:R>" return f"<t:{int(trigger_time.timestamp())}:R>"

View File

@ -1,424 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING
import discord
import sentry_sdk
from apscheduler.job import Job
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from discord.ui import Button, Select
from loguru import logger
from discord_reminder_bot.misc import calc_time, calculate
from discord_reminder_bot.parser import parse_time
if TYPE_CHECKING:
import datetime
from apscheduler.job import Job
from apscheduler.schedulers.asyncio import AsyncIOScheduler
class ModifyJobModal(discord.ui.Modal, title="Modify Job"):
"""Modal for modifying a job."""
job_name = discord.ui.TextInput(label="Name", placeholder="Enter new job name", required=False)
job_date = discord.ui.TextInput(label="Date", placeholder="Enter new job date", required=False)
def __init__(self, job: Job, scheduler: AsyncIOScheduler) -> None:
"""Initialize the modify job modal.
Args:
job: The job to modify.
scheduler: The scheduler to modify the job with.
"""
super().__init__()
self.job: Job = job
self.scheduler: AsyncIOScheduler = scheduler
# Use "Name" as label if the message is too long, otherwise use the old message
job_name_label: str = f"Name ({self.job.kwargs.get('message', 'X' * 46)})"
if len(job_name_label) > 45:
job_name_label = "Name"
self.job_name.label = job_name_label
self.job_date.label = f"Date ({self.job.next_run_time.strftime('%Y-%m-%d %H:%M:%S')})"
# Replace placeholders with current values
self.job_name.placeholder = self.job.kwargs.get("message", "No message found")
self.job_date.placeholder = self.job.next_run_time.strftime("%Y-%m-%d %H:%M:%S %Z")
logger.info(f"Job '{job_name_label}' modified: Initializing modal")
logger.info(f"\tCurrent date: '{self.job.next_run_time}'")
logger.info(f"\tCurrent message: '{self.job.kwargs.get('message', 'No message found')}")
logger.info(f"\tName label: '{self.job_name.label}'")
logger.info(f"\tDate label: '{self.job_date.label}'")
async def on_submit(self, interaction: discord.Interaction) -> None:
"""Submit the job modifications.
Args:
interaction: The interaction object for the command.
"""
job_msg: str = self.job.kwargs.get("message", "No message found")
logger.info(f"Job '{job_msg}' modified: Submitting changes")
new_name: str = self.job_name.value
new_date_str: str = self.job_date.value
old_date: datetime.datetime = self.job.next_run_time
# if both are empty, do nothing
if not new_name and not new_date_str:
logger.info(f"Job '{job_msg}' modified: No changes submitted.")
await interaction.response.send_message(
content=f"Job **{job_msg}**.\nNo changes submitted.",
ephemeral=True,
)
return
if new_date_str and new_date_str != old_date.strftime("%Y-%m-%d %H:%M:%S %Z"):
new_date: datetime.datetime | None = parse_time(new_date_str)
if not new_date:
logger.error(f"Job '{job_msg}' modified: Failed to parse date: '{new_date_str}'")
await interaction.response.send_message(
content=(
f"Failed modifying job **{job_msg}**\n"
f"Job ID: **{self.job.id}**\n"
f"Failed to parse date: **{new_date_str}**\n"
f"Defaulting to old date: **{old_date.strftime('%Y-%m-%d %H:%M:%S')}** {calc_time(old_date)}"
),
)
return
logger.info(f"Job '{job_msg}' modified: New date: '{new_date}'")
logger.info(f"Job '{job_msg}' modified: Old date: '{old_date}'")
self.job.modify(next_run_time=new_date)
old_date_str: str = old_date.strftime("%Y-%m-%d %H:%M:%S")
new_date_str: str = new_date.strftime("%Y-%m-%d %H:%M:%S")
await interaction.response.send_message(
content=(
f"Job **{job_msg}** was modified by {interaction.user.mention}:\n"
f"Job ID: **{self.job.id}**\n"
f"Old date: **{old_date_str}** {calculate(self.job)} {calc_time(old_date)}\n"
f"New date: **{new_date_str}** {calculate(self.job)} {calc_time(new_date)}"
),
)
if self.job_name.value and job_msg != new_name:
logger.info(f"Job '{job_msg}' modified: New name: '{new_name}'")
logger.info(f"Job '{job_msg}' modified: Old name: '{job_msg}'")
self.job.modify(name=new_name)
await interaction.response.send_message(
content=(
f"Job **{self.job.name}** was modified by {interaction.user.mention}:\n"
f"Job ID: **{self.job.id}**\n"
f"Old name: **{self.job.name}**\n"
f"New name: **{new_name}**"
),
)
def create_job_embed(job: Job) -> discord.Embed:
"""Create an embed for a job.
Args:
job: The job to create the embed for.
Returns:
discord.Embed: The embed for the job.
"""
job_kwargs: dict = job.kwargs or {}
msg: str = ""
if hasattr(job, "next_run_time"):
if job.next_run_time:
msg += f"Next run: {job.next_run_time.strftime('%Y-%m-%d %H:%M:%S')}\n"
else:
msg += "Status: Paused\n"
if isinstance(job.trigger, IntervalTrigger):
msg += f"Interval: {job.trigger.interval}\n"
# ID: d8a4e850245f4b06bcc04e53f13ccbbb
channel_id: int = job_kwargs.get("channel_id", 0)
if channel_id:
msg += f"Channel: <#{channel_id}>\n"
# Author: @TheLovinator
author_id: int = job_kwargs.get("author_id", 0)
if author_id:
msg += f"Created by: <@{author_id}>"
embed = discord.Embed(description=msg, color=discord.Color.blue())
embed.set_footer(text=f"{job.id}. Only jobs in the current guild are shown.")
# Set the title of the embed to the message of the job
message: str = job_kwargs.get("message", "N/A")
embed_title: str = f"{message[:256]}..." if len(message) > 256 else message
embed.title = embed_title
return embed
class JobSelector(Select):
"""Select menu for selecting a job to manage."""
def __init__(self, scheduler: AsyncIOScheduler, guild: discord.Guild) -> None:
"""Initialize the job selector.
Args:
scheduler: The scheduler to get the jobs from.
guild: The guild this view is for.
"""
self.scheduler: AsyncIOScheduler = scheduler
self.guild: discord.Guild = guild
options: list[discord.SelectOption] = []
jobs: list[Job] = scheduler.get_jobs()
jobs_in_guild: list[Job] = []
list_of_channels_in_current_guild: list[int] = [c.id for c in guild.channels]
for job in jobs:
# If the job has guild_id and it's not the current guild, skip it
if job.kwargs.get("guild_id") and job.kwargs.get("guild_id") != guild.id:
logger.debug(f"Skipping job: {job.id} because it's not in the current guild.")
continue
# If the job has channel_id and it's not in the current guild, skip it
if job.kwargs.get("channel_id") and job.kwargs.get("channel_id") not in list_of_channels_in_current_guild:
logger.debug(f"Skipping job: {job.id} because it's not from a channel in the current guild.")
continue
jobs_in_guild.append(job)
# Only 25 options are allowed in a select menu.
# TODO(TheLovinator): Add pagination for more than 25 jobs. # noqa: TD003
max_jobs: int = 25
if len(jobs_in_guild) > max_jobs:
jobs_in_guild = jobs_in_guild[:max_jobs]
for job in jobs_in_guild:
label_prefix: str = ""
label_prefix = "Paused: " if job.next_run_time is None else label_prefix
label_prefix = "Interval: " if isinstance(job.trigger, IntervalTrigger) else label_prefix
label_prefix = "Cron: " if isinstance(job.trigger, CronTrigger) else label_prefix
job_kwargs: dict = job.kwargs or {}
message: str = job_kwargs.get("message", f"{job.id}")
message = f"{label_prefix}{message}"
message = message[:96] + "..." if len(message) > 100 else message
options.append(discord.SelectOption(label=message, value=job.id))
super().__init__(placeholder="Select a job...", options=options)
async def callback(self, interaction: discord.Interaction) -> None:
"""Callback for the job selector.
Args:
interaction: The interaction object for the command.
"""
job: Job | None = self.scheduler.get_job(self.values[0])
if job:
embed: discord.Embed = create_job_embed(job)
view = JobManagementView(job, self.scheduler, guild=self.guild)
await interaction.response.edit_message(embed=embed, view=view)
class JobManagementView(discord.ui.View):
"""View for managing jobs."""
def __init__(self, job: Job, scheduler: AsyncIOScheduler, guild: discord.Guild, message: discord.Message | None = None) -> None:
"""Initialize the job management view.
Args:
job: The job to manage.
scheduler: The scheduler to manage the job with.
guild: The guild this view is for.
message: The message to manage.
"""
super().__init__(timeout=None)
self.job: Job = job
self.scheduler: AsyncIOScheduler = scheduler
self.guild: discord.Guild = guild
self.message: discord.Message | None = message
self.add_item(JobSelector(scheduler, self.guild))
self.update_buttons()
logger.debug(f"JobManagementView created for job: {self.job.id}")
async def on_error(self, interaction: discord.Interaction, error: Exception, item: discord.ui.Item) -> None:
"""Handle errors that occur within the view.
Args:
interaction: The interaction object for the command.
error: The exception that was raised.
item: The item that caused the error.
"""
with sentry_sdk.push_scope() as scope:
# Interaction-related context
scope.set_extra("interaction_id", interaction.id)
scope.set_extra("interaction_user", interaction.user.id)
scope.set_extra("interaction_user_tag", str(interaction.user))
scope.set_extra("interaction_command", interaction.command.name if interaction.command else None)
scope.set_extra("interaction_channel", str(interaction.channel))
scope.set_extra("interaction_guild", str(interaction.guild) if interaction.guild else None)
# Item-related context
scope.set_extra("item_type", type(item).__name__)
scope.set_extra("item_label", getattr(item, "label", None))
# Job and scheduler context
scope.set_extra("job_id", self.job.id if self.job else None)
scope.set_extra("job_kwargs", self.job.kwargs if self.job else None)
# Guild and message context
scope.set_extra("guild_id", self.guild.id if self.guild else None)
scope.set_extra("guild_name", self.guild.name if self.guild else None)
scope.set_extra("message_id", self.message.id if self.message else None)
scope.set_extra("message_content", self.message.content if self.message else None)
# Tags for categorization
scope.set_tag("error_type", type(error).__name__)
scope.set_tag("interaction_type", "command" if interaction.command else "other")
if interaction.guild:
scope.set_tag("guild_id", interaction.guild.id)
scope.set_tag("guild_name", interaction.guild.name)
sentry_sdk.capture_exception(error)
@discord.ui.button(label="Delete", style=discord.ButtonStyle.danger)
async def delete_button(self, interaction: discord.Interaction, button: Button) -> None: # noqa: ARG002
"""Delete the job.
Args:
interaction: The interaction object for the command.
button: The button that was clicked.
"""
job_kwargs: dict = self.job.kwargs or {}
logger.info(f"Deleting job: {self.job.id}. Clicked by {interaction.user.name}")
if hasattr(self.job, "__getstate__"):
logger.debug(f"State: {self.job.__getstate__() if hasattr(self.job, '__getstate__') else 'No state'}")
job_msg: str | int = job_kwargs.get("message", "No message found")
msg: str = f"**Job '{job_msg}' has been deleted.**\n"
msg += f"**Job ID**: {self.job.id}"
# The time the job was supposed to run
if hasattr(self.job, "next_run_time"):
if self.job.next_run_time:
msg += f"\n**Next run time**: {self.job.next_run_time} ({calculate(self.job)})"
else:
msg += "\n**Next run time**: Paused"
msg += f"\n**Trigger**: {self.job.trigger}"
else:
msg += "\n**Next run time**: Pending\n"
# The Discord user who created the job
if job_kwargs.get("author_id"):
msg += f"\n**Created by**: <@{job_kwargs.get('author_id')}>"
# The Discord channel to send the message to
if job_kwargs.get("channel_id"):
msg += f"\n**Channel**: <#{job_kwargs.get('channel_id')}>"
# The Discord user to send the message to
if job_kwargs.get("user_id"):
msg += f"\n**User**: <@{job_kwargs.get('user_id')}>"
# The Discord guild to send the message to
if job_kwargs.get("guild_id"):
msg += f"\n**Guild**: {job_kwargs.get('guild_id')}"
logger.debug(f"Deletion message: {msg}")
self.job.remove()
await interaction.response.send_message(msg)
self.stop()
@discord.ui.button(label="Modify", style=discord.ButtonStyle.primary)
async def modify_button(self, interaction: discord.Interaction, button: Button) -> None: # noqa: ARG002
"""Modify the job.
Args:
interaction: The interaction object for the command.
button: The button that was clicked.
"""
logger.info(f"Modifying job: {self.job.id}. Clicked by {interaction.user.name}")
if hasattr(self.job, "__getstate__"):
logger.debug(f"State: {self.job.__getstate__() if hasattr(self.job, '__getstate__') else 'No state'}")
modal = ModifyJobModal(self.job, self.scheduler)
await interaction.response.send_modal(modal)
@discord.ui.button(label="Pause/Resume", style=discord.ButtonStyle.secondary)
async def pause_button(self, interaction: discord.Interaction, button: Button) -> None:
"""Pause or resume the job.
Args:
interaction: The interaction object for the command.
button: The button that was clicked.
"""
if hasattr(self.job, "next_run_time"):
if self.job.next_run_time is None:
logger.info(f"State: {self.job.__getstate__() if hasattr(self.job, '__getstate__') else 'No state'}")
self.job.resume()
status = "resumed"
button.label = "Pause"
else:
logger.info(f"State: {self.job.__getstate__() if hasattr(self.job, '__getstate__') else 'No state'}")
self.job.pause()
status = "paused"
button.label = "Resume"
else:
logger.error(f"Got a job without a next_run_time: {self.job.id}")
status: str = f"What is this? {self.job.__getstate__()}"
button.label = "What?"
self.update_buttons()
await interaction.response.edit_message(view=self)
job_kwargs: dict = self.job.kwargs or {}
job_msg: str = job_kwargs.get("message", "No message found")
job_author: int = job_kwargs.get("author_id", 0)
msg: str = f"Job '{job_msg}' has been {status} by <@{interaction.user.id}>. Job was created by <@{job_author}>."
# The time the job was supposed to run
if hasattr(self.job, "next_run_time"):
if self.job.next_run_time:
msg += f"\n**Next run time**: {self.job.next_run_time} ({calculate(self.job)})"
else:
msg += "\n**Next run time**: Paused"
msg += f"\n**Trigger**: {self.job.trigger}"
else:
msg += "\n**Next run time**: Pending"
await interaction.followup.send(msg)
def update_buttons(self) -> None:
"""Update the visibility of buttons based on job status."""
logger.debug(f"Updating buttons for job: {self.job.id}")
self.pause_button.label = "Resume" if self.job.next_run_time is None else "Pause"
logger.debug(f"Pause button disabled: {self.pause_button.disabled}")
logger.debug(f"Pause button label: {self.pause_button.label}")
async def interaction_check(self, interaction: discord.Interaction) -> bool: # noqa: ARG002
"""Check the interaction and update buttons before responding.
Args:
interaction: The interaction object for the command.
Returns:
bool: Whether the interaction is valid.
"""
logger.info(f"Checking interaction for job: {self.job.id}")
# self.update_buttons()
return True

View File

@ -3,8 +3,11 @@ from __future__ import annotations
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from apscheduler.job import Job
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
from discord_reminder_bot.misc import calc_time, calculate, get_human_time from discord_reminder_bot.misc import calc_time, calculate, get_human_time
@ -63,6 +66,10 @@ def test_get_human_time() -> None:
assert get_human_time(test_timedelta) == expected_output, assert_msg assert get_human_time(test_timedelta) == expected_output, assert_msg
def dummy_job() -> None:
"""Dummy job function for testing."""
def test_calculate() -> None: def test_calculate() -> None:
"""Test the calculate function with various job inputs.""" """Test the calculate function with various job inputs."""
scheduler = BackgroundScheduler() scheduler = BackgroundScheduler()
@ -70,7 +77,7 @@ def test_calculate() -> None:
# Create a job with a DateTrigger # Create a job with a DateTrigger
run_date = datetime(2270, 10, 1, 12, 0, 0, tzinfo=timezone.utc) run_date = datetime(2270, 10, 1, 12, 0, 0, tzinfo=timezone.utc)
job: Job = scheduler.add_job(lambda: None, trigger=DateTrigger(run_date=run_date), id="test_job", name="Test Job") job: Job = scheduler.add_job(dummy_job, trigger=DateTrigger(run_date=run_date), id="test_job", name="Test Job")
expected_output = "<t:9490737600:R>" expected_output = "<t:9490737600:R>"
assert_msg: str = f"Expected {expected_output}, got {calculate(job)}" assert_msg: str = f"Expected {expected_output}, got {calculate(job)}"
@ -78,10 +85,60 @@ def test_calculate() -> None:
# Modify the job to have a next_run_time # Modify the job to have a next_run_time
job.modify(next_run_time=run_date) job.modify(next_run_time=run_date)
assert_msg: str = f"Expected {expected_output}, got {calculate(job)}"
assert calculate(job) == expected_output, assert_msg assert calculate(job) == expected_output, assert_msg
# Paused job should still return the same output # Paused job should still return the same output
job.pause() job.pause()
assert calculate(job) == expected_output, assert_msg assert_msg: str = f"Expected None, got {calculate(job)}"
assert not calculate(job), assert_msg
scheduler.shutdown() scheduler.shutdown()
def test_calculate_cronjob() -> None:
"""Test the calculate function with a CronTrigger job."""
scheduler = BackgroundScheduler()
scheduler.start()
run_date = datetime(2270, 10, 1, 12, 0, 0, tzinfo=timezone.utc)
job: Job = scheduler.add_job(
dummy_job,
trigger=CronTrigger(
second=run_date.second,
minute=run_date.minute,
hour=run_date.hour,
day=run_date.day,
month=run_date.month,
year=run_date.year,
),
)
# Force next_run_time to expected value for testing
job.modify(next_run_time=run_date)
expected_output: str = f"<t:{int(run_date.timestamp())}:R>"
assert calculate(job) == expected_output, f"Expected {expected_output}, got {calculate(job)}"
# You can't pause a CronTrigger job so this should return the same output
job.pause()
assert calculate(job) == expected_output, f"Expected {expected_output}, got {calculate(job)}"
scheduler.shutdown()
def test_calculate_intervaljob() -> None:
"""Test the calculate function with an IntervalTrigger job."""
scheduler = BackgroundScheduler()
scheduler.start()
run_date = datetime(2270, 12, 31, 23, 59, 59, tzinfo=timezone.utc)
job = scheduler.add_job(dummy_job, trigger=IntervalTrigger(seconds=3600), id="test_interval_job", name="Test Interval Job")
# Force next_run_time to expected value for testing
job.modify(next_run_time=run_date)
expected_output = f"<t:{int(run_date.timestamp())}:R>"
assert calculate(job) == expected_output, f"Expected {expected_output}, got {calculate(job)}"
# Paused job should return False
job.pause()
assert not calculate(job), f"Expected None, got {calculate(job)}"
scheduler.shutdown()

View File

@ -1,111 +0,0 @@
from __future__ import annotations
import unittest
from unittest.mock import Mock
import discord
from apscheduler.triggers.interval import IntervalTrigger
from discord_reminder_bot.ui import create_job_embed
class TestCreateJobEmbed(unittest.TestCase):
"""Test the `create_job_embed` function in the `discord_reminder_bot.ui` module."""
def setUp(self) -> None:
"""Set up the mock job for testing."""
self.job = Mock()
self.job.id = "12345"
self.job.kwargs = {"channel_id": 67890, "message": "Test message", "author_id": 54321}
self.job.next_run_time = None
self.job.trigger = Mock(spec=IntervalTrigger)
self.job.trigger.interval = "1 day"
def test_create_job_embed_with_next_run_time(self) -> None:
"""Test the `create_job_embed` function to ensure it correctly creates a Discord embed for a job with the next run time."""
self.job.next_run_time = Mock()
self.job.next_run_time.strftime.return_value = "2023-10-10 10:00:00"
embed: discord.Embed = create_job_embed(self.job)
assert_msg: str = f"Expected discord.Embed, got {type(embed)}"
assert isinstance(embed, discord.Embed), assert_msg
assert_msg = f"Expected Test message, got {embed.title}"
assert embed.title == "Test message", assert_msg
assert_msg = "Expected embed description to not be None"
assert embed.description is not None, assert_msg
assert_msg = f"Expected 12345 in embed footer, got {embed.footer}"
assert "12345" in embed.footer.text if embed.footer.text else None, assert_msg
assert_msg = f"Expected Next run: 2023-10-10 10:00:00 in embed description, got {embed.description}"
assert "Next run: 2023-10-10 10:00:00" in embed.description, assert_msg
assert_msg = f"Expected Interval: 1 day in embed description, got {embed.description}"
assert "Interval: 1 day" in embed.description, assert_msg
assert_msg = f"Expected Channel: <#67890> in embed description, got {embed.description}"
assert "Channel: <#67890>" in embed.description, assert_msg
assert_msg = f"Expected Created by: <@54321> in embed description, got {embed.description}"
assert "Created by: <@54321>" in embed.description, assert_msg
def test_create_job_embed_without_next_run_time(self) -> None:
"""Test the `create_job_embed` function to ensure it correctly creates a Discord embed for a job without the next run time."""
embed: discord.Embed = create_job_embed(self.job)
assert_msg: str = f"Expected discord.Embed, got {type(embed)}"
assert isinstance(embed, discord.Embed), assert_msg
assert_msg = f"Expected Test message, got {embed.title}"
assert embed.title == "Test message", assert_msg
assert_msg = "Expected embed description to not be None"
assert embed.description is not None, assert_msg
assert_msg = f"Expected 12345 in embed footer, got {embed.footer}"
assert "12345" in embed.footer.text if embed.footer.text else None, assert_msg
assert_msg = f"Expected Paused in embed description, got {embed.description}"
assert "Paused" in embed.description, assert_msg
assert_msg = f"Expected Interval: 1 day in embed description, got {embed.description}"
assert "Interval: 1 day" in embed.description, assert_msg
assert_msg = f"Expected Channel: <#67890> in embed description, got {embed.description}"
assert "Channel: <#67890>" in embed.description, assert_msg
assert_msg = f"Expected Created by: <@54321> in embed description, got {embed.description}"
assert "Created by: <@54321>" in embed.description, assert_msg
def test_create_job_embed_with_long_message(self) -> None:
"""Test the `create_job_embed` function to ensure it correctly truncates long messages."""
self.job.kwargs["message"] = "A" * 300
embed: discord.Embed = create_job_embed(self.job)
assert_msg: str = f"Expected A{'...' * 84} in embed title, got {embed.title}"
assert isinstance(embed, discord.Embed), assert_msg
assert_msg = f"Expected A{'...' * 84} in embed title, got {embed.title}"
assert embed.title == "A" * 256 + "...", assert_msg
assert_msg = "Expected embed description to not be None"
assert embed.description is not None, assert_msg
assert_msg = f"Expected 12345 in embed footer, got {embed.footer}"
assert "12345" in embed.footer.text if embed.footer.text else None, assert_msg
assert_msg = f"Expected Paused in embed description, got {embed.description}"
assert "Paused" in embed.description, assert_msg
assert_msg = f"Expected Interval: 1 day in embed description, got {embed.description}"
assert "Interval: 1 day" in embed.description, assert_msg
assert_msg = f"Expected Channel: <#67890> in embed description, got {embed.description}"
assert "Channel: <#67890>" in embed.description, assert_msg
assert_msg = f"Expected Created by: <@54321> in embed description, got {embed.description}"
assert "Created by: <@54321>" in embed.description, assert_msg