Move ui stuff to ui.py

This commit is contained in:
2025-01-10 23:00:53 +01:00
parent 735d7b853c
commit 14bf6d7fbe
2 changed files with 236 additions and 236 deletions

View File

@ -1,28 +1,23 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
import textwrap
from pprint import pformat from pprint import pformat
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import discord import discord
from apscheduler.job import Job from apscheduler.job import Job
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from discord.abc import PrivateChannel from discord.abc import PrivateChannel
from discord.ui import Button, Select
from discord_webhook import DiscordWebhook from discord_webhook import DiscordWebhook
from discord_reminder_bot import settings from discord_reminder_bot import settings
from discord_reminder_bot.misc import calculate from discord_reminder_bot.misc import calculate
from discord_reminder_bot.parser import parse_time from discord_reminder_bot.parser import parse_time
from discord_reminder_bot.ui import ModifyJobModal, create_job_embed from discord_reminder_bot.ui import JobManagementView, create_job_embed
if TYPE_CHECKING: if TYPE_CHECKING:
import datetime import datetime
from apscheduler.job import Job from apscheduler.job import Job
from apscheduler.schedulers.asyncio import AsyncIOScheduler
logger: logging.Logger = logging.getLogger(__name__) logger: logging.Logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
@ -236,236 +231,6 @@ class RemindGroup(discord.app_commands.Group):
await interaction.followup.send(embed=embed, view=view) await interaction.followup.send(embed=embed, view=view)
class JobSelector(Select):
"""Select menu for selecting a job to manage."""
def __init__(self, scheduler: AsyncIOScheduler) -> None:
"""Initialize the job selector.
Args:
scheduler: The scheduler to get the jobs from.
"""
self.scheduler: settings.AsyncIOScheduler = scheduler
options: list[discord.SelectOption] = []
jobs: list[Job] = scheduler.get_jobs()
# Only 25 options are allowed in a select menu.
# TODO(TheLovinator): Add pagination for more than 25 jobs. # noqa: TD003
max_jobs: int = 25
if len(jobs) > max_jobs:
jobs = jobs[:max_jobs]
for job in jobs:
job_kwargs: dict = job.kwargs or {}
label_prefix: str = ""
if job.next_run_time is None:
label_prefix = "Paused: "
# Cron job
elif isinstance(job.trigger, CronTrigger):
label_prefix = "Cron: "
# Interval job
elif isinstance(job.trigger, IntervalTrigger):
label_prefix = "Interval: "
message: str = job_kwargs.get("message", f"{job.id}")
message: str = textwrap.shorten(f"{label_prefix}{message}", width=100, placeholder="...")
options.append(discord.SelectOption(label=message, value=job.id))
super().__init__(placeholder="Select a job...", options=options)
async def callback(self, interaction: discord.Interaction) -> None:
"""Callback for the job selector.
Args:
interaction: The interaction object for the command.
"""
job: Job | None = self.scheduler.get_job(self.values[0])
if job:
embed: discord.Embed = create_job_embed(job)
view = JobManagementView(job, self.scheduler)
await interaction.response.edit_message(embed=embed, view=view)
class JobManagementView(discord.ui.View):
"""View for managing jobs."""
def __init__(self, job: Job, scheduler: AsyncIOScheduler) -> None:
"""Initialize the job management view.
Args:
job: The job to manage.
scheduler: The scheduler to manage the job with.
"""
super().__init__(timeout=None)
self.job: Job = job
self.scheduler: settings.AsyncIOScheduler = scheduler
self.add_item(JobSelector(scheduler))
self.update_buttons()
@discord.ui.button(label="Delete", style=discord.ButtonStyle.danger)
async def delete_button(self, interaction: discord.Interaction, button: Button) -> None: # noqa: ARG002
"""Delete the job.
Args:
interaction: The interaction object for the command.
button: The button that was clicked.
"""
job_kwargs: dict = self.job.kwargs or {}
logger.info("Deleting job: %s", self.job.id)
if hasattr(self.job, "__getstate__"):
logger.error("State: %s", self.job.__getstate__() if hasattr(self.job, "__getstate__") else "No state")
# Log extra kwargs
for key, value in job_kwargs.items():
if key not in {"message", "channel_id", "author_id", "guild_id", "user_id"}:
logger.error("Extra kwargs: %s: %s", key, value)
msg: str = self.generate_deletion_message(job_kwargs)
self.job.remove()
await interaction.response.send_message(msg)
self.stop()
def generate_deletion_message(self, job_kwargs: dict[str, str | int]) -> str: # noqa: C901, PLR0912
"""Generate the deletion message.
Args:
job_kwargs: The job kwargs.
Returns:
str: The deletion message.
"""
msg: str = f"# Job *{job_kwargs.get('message'), 'No message'}* has been deleted.\n"
msg += f"**Job ID**: {self.job.id}\n"
# The time the job was supposed to run
if hasattr(self.job, "next_run_time"):
if self.job.next_run_time:
msg += f"**Next run time**: ({self.job.next_run_time} {calculate(self.job)})\n"
else:
msg += "**Next run time**: Paused\n"
else:
msg += "**Next run time**: Pending\n"
# The Discord user who created the job
if job_kwargs.get("author_id"):
msg += f"**Created by**: <@{job_kwargs.get('author_id')}>\n"
# The Discord channel to send the message to
if job_kwargs.get("channel_id"):
msg += f"**Channel**: <#{job_kwargs.get('channel_id')}>\n"
# The Discord user to send the message to
if job_kwargs.get("user_id"):
msg += f"**User**: <@{job_kwargs.get('user_id')}>\n"
# The Discord guild to send the message to
if job_kwargs.get("guild_id"):
msg += f"**Guild**: {job_kwargs.get('guild_id')}\n"
msg += "\n## Debug info\n"
# Callable (or a textual reference to one) to run at the given time
if self.job.func:
msg += f"**Function**: {self.job.func}\n"
# Trigger that determines when func is called
if self.job.trigger:
msg += f"**Trigger**: {self.job.trigger}\n"
# Alias of the executor to run the job with
if self.job.executor:
msg += f"**Executor**: {self.job.executor}\n"
# List of positional arguments to call func with
if self.job.args:
msg += f"**Args**: {self.job.args}\n"
# Textual description of the job
if self.job.name:
msg += f"**Name**: {self.job.name}\n"
# Seconds after the designated runtime that the job is still allowed to be run (or None to allow the job to run no matter how late it is) # noqa: E501
if self.job.misfire_grace_time:
msg += f"**Misfire grace time**: {self.job.misfire_grace_time}\n"
# Run once instead of many times if the scheduler determines that the job should be run more than once in succession # noqa: E501
if self.job.coalesce:
msg += f"**Coalesce**: {self.job.coalesce}\n"
# Maximum number of concurrently running instances allowed for this job
if self.job.max_instances:
msg += f"**Max instances**: {self.job.max_instances}\n"
return msg
@discord.ui.button(label="Modify", style=discord.ButtonStyle.primary)
async def modify_button(self, interaction: discord.Interaction, button: Button) -> None: # noqa: ARG002
"""Modify the job.
Args:
interaction: The interaction object for the command.
button: The button that was clicked.
"""
logger.info("Modifying job: %s", self.job.id)
if hasattr(self.job, "__getstate__"):
logger.error("State: %s", self.job.__getstate__() if hasattr(self.job, "__getstate__") else "No state")
modal = ModifyJobModal(self.job, self.scheduler)
await interaction.response.send_modal(modal)
@discord.ui.button(label="Pause/Resume", style=discord.ButtonStyle.secondary)
async def pause_button(self, interaction: discord.Interaction, button: Button) -> None:
"""Pause or resume the job.
Args:
interaction: The interaction object for the command.
button: The button that was clicked.
"""
if hasattr(self.job, "next_run_time"):
if self.job.next_run_time is None:
logger.info("State: %s", self.job.__getstate__())
self.job.resume()
status = "resumed"
button.label = "Pause"
else:
logger.info("State: %s", self.job.__getstate__())
self.job.pause()
status = "paused"
button.label = "Resume"
else:
status: str = f"What is this? {self.job.__getstate__()}"
button.label = "What?"
self.update_buttons()
await interaction.response.edit_message(view=self)
msg: str = f"Job '{self.job.name}' has been {status}."
if hasattr(self.job, "next_run_time"):
msg += f"\nNext run time: {self.job.next_run_time} {calculate(self.job)}"
await interaction.followup.send(msg)
def update_buttons(self) -> None:
"""Update the visibility of buttons based on job status."""
self.pause_button.disabled = not self.job.next_run_time
self.pause_button.label = "Resume" if self.job.next_run_time is None else "Pause"
async def interaction_check(self, interaction: discord.Interaction) -> bool: # noqa: ARG002
"""Check the interaction and update buttons before responding.
Args:
interaction: The interaction object for the command.
Returns:
bool: Whether the interaction is valid.
"""
self.update_buttons()
return True
intents: discord.Intents = discord.Intents.default() intents: discord.Intents = discord.Intents.default()
bot = RemindBotClient(intents=intents) bot = RemindBotClient(intents=intents)

View File

@ -6,7 +6,12 @@ from typing import TYPE_CHECKING
import discord import discord
from apscheduler.job import Job from apscheduler.job import Job
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from discord.ui import Button, Select
from discord_reminder_bot import settings
from discord_reminder_bot.misc import calc_time, calculate from discord_reminder_bot.misc import calc_time, calculate
from discord_reminder_bot.parser import parse_time from discord_reminder_bot.parser import parse_time
@ -208,3 +213,233 @@ def create_job_embed(job: Job) -> discord.Embed:
description=f"ID: {job.id}\nNext run: {next_run_time}\nTime left: {calculate(job)}\nChannel: <#{channel_id}>\nAuthor: <@{author_id}>", # noqa: E501 description=f"ID: {job.id}\nNext run: {next_run_time}\nTime left: {calculate(job)}\nChannel: <#{channel_id}>\nAuthor: <@{author_id}>", # noqa: E501
color=discord.Color.blue(), color=discord.Color.blue(),
) )
class JobSelector(Select):
"""Select menu for selecting a job to manage."""
def __init__(self, scheduler: AsyncIOScheduler) -> None:
"""Initialize the job selector.
Args:
scheduler: The scheduler to get the jobs from.
"""
self.scheduler: settings.AsyncIOScheduler = scheduler
options: list[discord.SelectOption] = []
jobs: list[Job] = scheduler.get_jobs()
# Only 25 options are allowed in a select menu.
# TODO(TheLovinator): Add pagination for more than 25 jobs. # noqa: TD003
max_jobs: int = 25
if len(jobs) > max_jobs:
jobs = jobs[:max_jobs]
for job in jobs:
job_kwargs: dict = job.kwargs or {}
label_prefix: str = ""
if job.next_run_time is None:
label_prefix = "Paused: "
# Cron job
elif isinstance(job.trigger, CronTrigger):
label_prefix = "Cron: "
# Interval job
elif isinstance(job.trigger, IntervalTrigger):
label_prefix = "Interval: "
message: str = job_kwargs.get("message", f"{job.id}")
message: str = textwrap.shorten(f"{label_prefix}{message}", width=100, placeholder="...")
options.append(discord.SelectOption(label=message, value=job.id))
super().__init__(placeholder="Select a job...", options=options)
async def callback(self, interaction: discord.Interaction) -> None:
"""Callback for the job selector.
Args:
interaction: The interaction object for the command.
"""
job: Job | None = self.scheduler.get_job(self.values[0])
if job:
embed: discord.Embed = create_job_embed(job)
view = JobManagementView(job, self.scheduler)
await interaction.response.edit_message(embed=embed, view=view)
class JobManagementView(discord.ui.View):
"""View for managing jobs."""
def __init__(self, job: Job, scheduler: AsyncIOScheduler) -> None:
"""Initialize the job management view.
Args:
job: The job to manage.
scheduler: The scheduler to manage the job with.
"""
super().__init__(timeout=None)
self.job: Job = job
self.scheduler: settings.AsyncIOScheduler = scheduler
self.add_item(JobSelector(scheduler))
self.update_buttons()
@discord.ui.button(label="Delete", style=discord.ButtonStyle.danger)
async def delete_button(self, interaction: discord.Interaction, button: Button) -> None: # noqa: ARG002
"""Delete the job.
Args:
interaction: The interaction object for the command.
button: The button that was clicked.
"""
job_kwargs: dict = self.job.kwargs or {}
logger.info("Deleting job: %s", self.job.id)
if hasattr(self.job, "__getstate__"):
logger.error("State: %s", self.job.__getstate__() if hasattr(self.job, "__getstate__") else "No state")
# Log extra kwargs
for key, value in job_kwargs.items():
if key not in {"message", "channel_id", "author_id", "guild_id", "user_id"}:
logger.error("Extra kwargs: %s: %s", key, value)
msg: str = self.generate_deletion_message(job_kwargs)
self.job.remove()
await interaction.response.send_message(msg)
self.stop()
def generate_deletion_message(self, job_kwargs: dict[str, str | int]) -> str: # noqa: C901, PLR0912
"""Generate the deletion message.
Args:
job_kwargs: The job kwargs.
Returns:
str: The deletion message.
"""
msg: str = f"# Job *{job_kwargs.get('message'), 'No message'}* has been deleted.\n"
msg += f"**Job ID**: {self.job.id}\n"
# The time the job was supposed to run
if hasattr(self.job, "next_run_time"):
if self.job.next_run_time:
msg += f"**Next run time**: ({self.job.next_run_time} {calculate(self.job)})\n"
else:
msg += "**Next run time**: Paused\n"
else:
msg += "**Next run time**: Pending\n"
# The Discord user who created the job
if job_kwargs.get("author_id"):
msg += f"**Created by**: <@{job_kwargs.get('author_id')}>\n"
# The Discord channel to send the message to
if job_kwargs.get("channel_id"):
msg += f"**Channel**: <#{job_kwargs.get('channel_id')}>\n"
# The Discord user to send the message to
if job_kwargs.get("user_id"):
msg += f"**User**: <@{job_kwargs.get('user_id')}>\n"
# The Discord guild to send the message to
if job_kwargs.get("guild_id"):
msg += f"**Guild**: {job_kwargs.get('guild_id')}\n"
msg += "\n## Debug info\n"
# Callable (or a textual reference to one) to run at the given time
if self.job.func:
msg += f"**Function**: {self.job.func}\n"
# Trigger that determines when func is called
if self.job.trigger:
msg += f"**Trigger**: {self.job.trigger}\n"
# Alias of the executor to run the job with
if self.job.executor:
msg += f"**Executor**: {self.job.executor}\n"
# List of positional arguments to call func with
if self.job.args:
msg += f"**Args**: {self.job.args}\n"
# Textual description of the job
if self.job.name:
msg += f"**Name**: {self.job.name}\n"
# Seconds after the designated runtime that the job is still allowed to be run (or None to allow the job to run no matter how late it is) # noqa: E501
if self.job.misfire_grace_time:
msg += f"**Misfire grace time**: {self.job.misfire_grace_time}\n"
# Run once instead of many times if the scheduler determines that the job should be run more than once in succession # noqa: E501
if self.job.coalesce:
msg += f"**Coalesce**: {self.job.coalesce}\n"
# Maximum number of concurrently running instances allowed for this job
if self.job.max_instances:
msg += f"**Max instances**: {self.job.max_instances}\n"
return msg
@discord.ui.button(label="Modify", style=discord.ButtonStyle.primary)
async def modify_button(self, interaction: discord.Interaction, button: Button) -> None: # noqa: ARG002
"""Modify the job.
Args:
interaction: The interaction object for the command.
button: The button that was clicked.
"""
logger.info("Modifying job: %s", self.job.id)
if hasattr(self.job, "__getstate__"):
logger.error("State: %s", self.job.__getstate__() if hasattr(self.job, "__getstate__") else "No state")
modal = ModifyJobModal(self.job, self.scheduler)
await interaction.response.send_modal(modal)
@discord.ui.button(label="Pause/Resume", style=discord.ButtonStyle.secondary)
async def pause_button(self, interaction: discord.Interaction, button: Button) -> None:
"""Pause or resume the job.
Args:
interaction: The interaction object for the command.
button: The button that was clicked.
"""
if hasattr(self.job, "next_run_time"):
if self.job.next_run_time is None:
logger.info("State: %s", self.job.__getstate__())
self.job.resume()
status = "resumed"
button.label = "Pause"
else:
logger.info("State: %s", self.job.__getstate__())
self.job.pause()
status = "paused"
button.label = "Resume"
else:
status: str = f"What is this? {self.job.__getstate__()}"
button.label = "What?"
self.update_buttons()
await interaction.response.edit_message(view=self)
msg: str = f"Job '{self.job.name}' has been {status}."
if hasattr(self.job, "next_run_time"):
msg += f"\nNext run time: {self.job.next_run_time} {calculate(self.job)}"
await interaction.followup.send(msg)
def update_buttons(self) -> None:
"""Update the visibility of buttons based on job status."""
self.pause_button.disabled = not self.job.next_run_time
self.pause_button.label = "Resume" if self.job.next_run_time is None else "Pause"
async def interaction_check(self, interaction: discord.Interaction) -> bool: # noqa: ARG002
"""Check the interaction and update buttons before responding.
Args:
interaction: The interaction object for the command.
Returns:
bool: Whether the interaction is valid.
"""
self.update_buttons()
return True