7 Commits

23 changed files with 1505 additions and 1008 deletions

View File

@ -0,0 +1,103 @@
name: Docker
on:
push:
branches:
- master
pull_request:
workflow_dispatch:
schedule:
- cron: "@daily"
env:
BOT_TOKEN: ${{ secrets.BOT_TOKEN }}
TIMEZONE: Europe/Stockholm
LOG_LEVEL: Info
SQLITE_LOCATION: /data/jobs.sqlite
jobs:
build-and-push-docker:
runs-on: ubuntu-latest
steps:
# GitHub Container Registry
- uses: https://github.com/docker/login-action@v3
if: github.event_name != 'pull_request'
with:
registry: ghcr.io
username: thelovinator1
password: ${{ secrets.PACKAGES_WRITE_GITHUB_TOKEN }}
# Gitea Container Registry
- uses: https://github.com/docker/login-action@v3
if: github.event_name != 'pull_request'
with:
registry: git.lovinator.space
username: thelovinator
password: ${{ secrets.PACKAGES_WRITE_GITEA_TOKEN }}
# Docker Hub Registry
- uses: https://github.com/docker/login-action@v3
if: github.event_name != 'pull_request'
with:
registry: docker.io
username: thelovinator
password: ${{ secrets.DOCKERHUB_TOKEN }}
# Download the latest commit from the master branch
- uses: https://github.com/actions/checkout@v4
# Set up QEMU
- id: qemu
uses: https://github.com/docker/setup-qemu-action@v3
with:
image: tonistiigi/binfmt:master
platforms: linux/amd64,linux/arm64
cache-image: false
# Set up Buildx so we can build multi-arch images
- uses: https://github.com/docker/setup-buildx-action@v3
# Set up uv for Python dependency management
- uses: astral-sh/setup-uv@v5
with:
version: "latest"
# Install dependencies
- run: uv sync --all-extras --all-groups
# Run tests
- run: uv run pytest
# Install the latest version of ruff
- uses: https://github.com/astral-sh/ruff-action@v3
with:
version: "latest"
# Lint the Python code using ruff
- run: ruff check --exit-non-zero-on-fix --verbose
# Check if the Python code needs formatting
- run: ruff format --check --verbose
# Lint Dockerfile
- run: docker build --check .
# Extract metadata (tags, labels) from Git reference and GitHub events for Docker
- id: meta
uses: https://github.com/docker/metadata-action@v5
with:
images: |
thelovinator/discord-reminder-bot
ghcr.io/thelovinator1/discord-reminder-bot
tags: |
type=raw,value=latest,enable=${{ gitea.ref == format('refs/heads/{0}', 'master') }}
# Build and push the Docker image
- uses: https://github.com/docker/build-push-action@v6
with:
context: .
platforms: linux/amd64,linux/arm64
push: ${{ gitea.event_name != 'pull_request' }}
labels: ${{ steps.meta.outputs.labels }}
tags: ${{ steps.meta.outputs.tags }}
annotations: ${{ steps.meta.outputs.annotations }}

View File

@ -1,60 +0,0 @@
name: Docker
on:
# push:
# pull_request:
workflow_dispatch:
# schedule:
# - cron: "0 6 * * *"
env:
BOT_TOKEN: ${{ secrets.BOT_TOKEN }}
TIMEZONE: Europe/Stockholm
LOG_LEVEL: Info
SQLITE_LOCATION: /data/jobs.sqlite
jobs:
build-and-push-docker:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps:
- run: |
if [ -z "${{ env.BOT_TOKEN }}" ]; then
echo "BOT_TOKEN not set"
exit 1
fi
- uses: actions/checkout@v4
- uses: astral-sh/setup-uv@v5
with:
python-version: 3.13
- run: uv sync --all-extras --dev
- run: uv run pytest
- uses: docker/setup-qemu-action@v3
with:
platforms: all
- uses: docker/setup-buildx-action@v3
- uses: docker/login-action@v3
if: github.event_name != 'pull_request'
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- uses: docker/login-action@v3
if: github.event_name != 'pull_request'
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- uses: docker/build-push-action@v6
with:
context: .
platforms: linux/amd64, linux/arm64
push: ${{ github.event_name != 'pull_request' }}
tags: thelovinator/discord-reminder-bot:latest
- uses: docker/build-push-action@v6
with:
context: .
platforms: linux/amd64, linux/arm64
push: ${{ github.event_name != 'pull_request' }}
tags: ghcr.io/thelovinator1/discord-reminder-bot:latest

View File

@ -1,38 +0,0 @@
name: Poetry
on:
# push:
# pull_request:
workflow_dispatch:
# schedule:
# - cron: "0 6 * * *"
env:
BOT_TOKEN: ${{ secrets.BOT_TOKEN }}
TIMEZONE: Europe/Stockholm
LOG_LEVEL: Info
SQLITE_LOCATION: /data/jobs.sqlite
jobs:
test-on-poetry:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.10", "3.11", "3.12", "3.13"]
poetry-version: ["latest", "main"]
steps:
- run: |
if [ -z "${{ env.BOT_TOKEN }}" ]; then
echo "BOT_TOKEN not set"
exit 1
fi
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- uses: abatilo/actions-poetry@v4
with:
poetry-version: ${{ matrix.poetry-version }}
- run: poetry install
- run: poetry run pytest

View File

@ -1,41 +0,0 @@
name: uv
on:
# push:
# pull_request:
workflow_dispatch:
# schedule:
# - cron: "0 6 * * *"
env:
BOT_TOKEN: ${{ secrets.BOT_TOKEN }}
TIMEZONE: Europe/Stockholm
LOG_LEVEL: Info
SQLITE_LOCATION: /data/jobs.sqlite
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
jobs:
test-on-uv:
name: Install with uv and run tests on Python ${{ matrix.python-version }}
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.10", "3.11", "3.12", "3.13", "pypy"]
steps:
- run: |
if [ -z "${{ env.BOT_TOKEN }}" ]; then
echo "BOT_TOKEN not set"
exit 1
fi
- name: Checkout code
uses: actions/checkout@v4
- name: Install uv and set the python version to ${{ matrix.python-version }}
uses: astral-sh/setup-uv@v5
with:
python-version: ${{ matrix.python-version }}
version: "latest"
- name: Install dependencies
run: uv sync --all-extras --dev
- name: Run tests
run: uv run pytest

View File

@ -29,7 +29,7 @@ repos:
args: ["--py310-plus"]
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.9.6
rev: v0.11.6
hooks:
- id: ruff-format
- id: ruff

View File

@ -6,6 +6,7 @@
"asyncio",
"audioop",
"autouse",
"binfmt",
"botuser",
"Buildx",
"cookiejar",
@ -16,6 +17,7 @@
"dotenv",
"filterwarnings",
"freezegun",
"Gitea",
"hikari",
"isort",
"jobstore",
@ -28,11 +30,13 @@
"pydocstyle",
"pyproject",
"pypy",
"pytest",
"PYTHONDONTWRITEBYTECODE",
"PYTHONUNBUFFERED",
"pyupgrade",
"sqlalchemy",
"thelovinator",
"tonistiigi",
"uvloop"
],
"python.analysis.typeCheckingMode": "standard"

View File

@ -1,13 +1,18 @@
FROM python:3.13-slim
FROM --platform=$BUILDPLATFORM python:3.13-slim
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
RUN useradd -m botuser && mkdir -p /home/botuser/data
WORKDIR /home/botuser
COPY interactions /home/botuser/interactions
COPY discord_reminder_bot /home/botuser/discord_reminder_bot
RUN --mount=type=cache,target=/root/.cache/uv \
--mount=type=bind,source=pyproject.toml,target=pyproject.toml \
RUN --mount=type=bind,source=pyproject.toml,target=pyproject.toml \
uv sync --no-install-project
VOLUME ["/home/botuser/data/"]
CMD ["uv", "run", "discord_reminder_bot/main.py"]

View File

@ -0,0 +1,91 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from loguru import logger
from discord_reminder_bot.parsers import calculate, parse_time
from discord_reminder_bot.settings import scheduler
if TYPE_CHECKING:
import datetime
from apscheduler.job import Job
def add_reminder_job(
message: str,
time: str,
channel_id: int,
author_id: int,
user_id: int | None = None,
guild_id: int | None = None,
dm_and_current_channel: bool | None = None,
) -> str:
"""Adds a reminder job to the scheduler based on user input.
Schedules a message to be sent at a specified time either to a specific channel,
a specific user via direct message, or both. It handles permission checks,
time parsing, and job creation using the APScheduler instance.
Args:
message: The content of the reminder message to be sent.
time: A string representing the date and time for the reminder.
This string will be parsed to a datetime object.
channel_id: The ID of the channel where the reminder will be sent.
user_id: The Discord ID of the user to send a DM to. If None, no DM is sent.
guild_id: The ID of the guild (server) where the reminder is set.
author_id: The ID of the user who created the reminder.
dm_and_current_channel: If True and a user is specified, sends the
reminder to both the user's DM and the target channel. If False
and a user is specified, only sends the DM. Defaults to None,
behaving like False if only a user is specified, or sending only
to the channel if no user is specified.
Returns:
The response message indicating the status of the reminder job creation.
"""
dm_message: str = ""
if user_id:
parsed_time: datetime.datetime | None = parse_time(date_to_parse=time)
if not parsed_time:
return f"Failed to parse time: {time}."
user_reminder: Job = scheduler.add_job(
func="discord_reminder_bot.main:send_to_user",
trigger="date",
run_date=parsed_time,
kwargs={
"user_id": user_id,
"guild_id": guild_id,
"message": message,
},
)
logger.info(f"User reminder job created: {user_reminder} for {user_id} at {parsed_time}")
dm_message = f" and a DM to <@{user_id}>"
if not dm_and_current_channel:
return (
f"Hello <@{author_id}>,\n"
f"I will send a DM to <@{user_id}> at:\n"
f"First run in {calculate(user_reminder)} with the message:\n**{message}**."
)
# Create channel reminder job
channel_job: Job = scheduler.add_job(
func="discord_reminder_bot.main:send_to_discord",
trigger="date",
run_date=parse_time(date_to_parse=time),
kwargs={
"channel_id": channel_id,
"message": message,
"author_id": author_id,
},
)
logger.info(f"Channel reminder job created: {channel_job} for {channel_id}")
return (
f"Hello <@{author_id}>,\n"
f"I will notify you in <#{channel_id}>{dm_message}.\n"
f"First run in {calculate(channel_job)} with the message:\n**{message}**."
)

View File

@ -0,0 +1,96 @@
from __future__ import annotations
import datetime
import json
import tempfile
from typing import TYPE_CHECKING
import discord
from loguru import logger
if TYPE_CHECKING:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from discord.interactions import InteractionChannel
async def backup_reminder_job(interaction: discord.Interaction, scheduler: AsyncIOScheduler, all_servers: bool) -> None:
"""Backs up reminder jobs from the scheduler to a JSON file.
Exports jobs from the provided AsyncIOScheduler to a temporary JSON file.
If `all_servers` is False, it filters the jobs to include only those
associated with the guild where the command was invoked. This requires
the invoking user to have administrator permissions in that guild.
The resulting list of jobs (either all or filtered) is then written
to another temporary JSON file and sent as an attachment via the
interaction followup, along with a confirmation message.
Args:
interaction: The discord interaction object that triggered the command.
scheduler: The AsyncIOScheduler instance containing the jobs to back up.
all_servers: If True, backs up jobs from all servers managed by the
scheduler. If False, backs up only jobs associated with the
server (guild) where the command was invoked.
"""
logger.info(f"Backing up reminders for {interaction.user} ({interaction.user.id}) in {interaction.channel}")
logger.info(f"Arguments: {locals()}")
# Retrieve all jobs
with tempfile.NamedTemporaryFile(mode="r+", delete=False, encoding="utf-8", suffix=".json") as temp_file:
# Export jobs to a temporary file
scheduler.export_jobs(temp_file.name)
# Load the exported jobs
temp_file.seek(0)
jobs_data = json.load(temp_file)
# Amount of jobs before filtering
amount_of_jobs: int = len(jobs_data.get("jobs", []))
if not all_servers:
interaction_channel: InteractionChannel | None = interaction.channel
if not interaction_channel:
await interaction.followup.send(content="Failed to get channel.", ephemeral=True)
return
if not isinstance(interaction.user, discord.Member):
await interaction.followup.send(content="Failed to get user.", ephemeral=True)
return
is_admin: bool = interaction_channel.permissions_for(interaction.user).administrator
if not is_admin:
await interaction.followup.send(content="You must be an administrator to backup all servers.", ephemeral=True)
return
# Can't be 0 because that's the default value for jobs without a guild
guild_id: int = interaction.guild.id if interaction.guild else -1
channels_in_this_guild: list[int] = [c.id for c in interaction.guild.channels] if interaction.guild else []
logger.debug(f"Guild ID: {guild_id}")
for job in jobs_data.get("jobs", []):
# Check if the job is in the current guild
job_guild_id = int(job.get("kwargs", {}).get("guild_id", 0))
if job_guild_id and job_guild_id != guild_id:
logger.debug(f"Skipping job: {job.get('id')} because it's not in the current guild.")
jobs_data["jobs"].remove(job)
# Check if the channel is in the current guild
if job.get("kwargs", {}).get("channel_id") not in channels_in_this_guild:
logger.debug(f"Skipping job: {job.get('id')} because it's not in the current guild.")
jobs_data["jobs"].remove(job)
msg: str = "All reminders in this server have been backed up." if not all_servers else "All reminders have been backed up."
msg += "\nYou can restore them using `/remind restore`."
if not all_servers:
msg += f"\nAmount of jobs on all servers: {amount_of_jobs}, in this server: {len(jobs_data.get('jobs', []))}"
msg += "\nYou can use `/remind backup all_servers:True` to backup all servers."
else:
msg += f"\nAmount of jobs: {amount_of_jobs}"
# Write the data to a new file
with tempfile.NamedTemporaryFile(mode="w", delete=False, encoding="utf-8", suffix=".json") as output_file:
file_name: str = f"reminders-backup-{datetime.datetime.now(tz=scheduler.timezone)}.json"
json.dump(jobs_data, output_file, indent=4)
output_file.seek(0)
await interaction.followup.send(content=msg, file=discord.File(output_file.name, filename=file_name))

View File

@ -0,0 +1,139 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
import discord
from discord_reminder_bot.parsers import calculate
if TYPE_CHECKING:
import discord
from apscheduler.job import Job
from apscheduler.schedulers.asyncio import AsyncIOScheduler
logger: logging.Logger = logging.getLogger("discord_reminder_bot")
async def cron_reminder_job(
interaction: discord.Interaction,
scheduler: AsyncIOScheduler,
message: str,
year: str | None = None,
month: str | None = None,
day: str | None = None,
week: str | None = None,
day_of_week: str | None = None,
hour: str | None = None,
minute: str | None = None,
second: str | None = None,
start_date: str | None = None,
end_date: str | None = None,
timezone: str | None = None,
jitter: int | None = None,
channel: discord.TextChannel | None = None,
user: discord.User | None = None,
dm_and_current_channel: bool | None = None,
) -> None:
"""Create a new cron job.
Args that are None will be defaulted to *.
Args:
interaction (discord.Interaction): The interaction object for the command.
scheduler (AsyncIOScheduler): The scheduler to add the job to.
message (str): The content of the reminder.
year (str): 4-digit year. Defaults to *.
month (str): Month (1-12). Defaults to *.
day (str): Day of the month (1-31). Defaults to *.
week (str): ISO Week of the year (1-53). Defaults to *.
day_of_week (str): Number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun).
hour (str): Hour (0-23). Defaults to *.
minute (str): Minute (0-59). Defaults to *.
second (str): Second (0-59). Defaults to *.
start_date (str): Earliest possible date/time to trigger on (inclusive). Will get parsed.
end_date (str): Latest possible date/time to trigger on (inclusive). Will get parsed.
timezone (str): Time zone to use for the date/time calculations Defaults to scheduler timezone.
jitter (int, optional): Delay the job execution by jitter seconds at most.
channel (discord.TextChannel, optional): The channel to send the reminder to. Defaults to current channel.
user (discord.User, optional): Send reminder as a DM to this user. Defaults to None.
dm_and_current_channel (bool, optional): If user is provided, send reminder as a DM to the user and in this channel. Defaults to only the user.
"""
# Log kwargs
logger.info("New cron job from %s (%s) in %s", interaction.user, interaction.user.id, interaction.channel)
logger.info("Cron job arguments: %s", locals())
# Get the channel ID
channel_id: int | None = channel.id if channel else (interaction.channel.id if interaction.channel else None)
if not channel_id:
await interaction.followup.send(content="Failed to get channel.", ephemeral=True)
return
# Ensure the guild is valid
guild: discord.Guild | None = interaction.guild or None
if not guild:
await interaction.followup.send(content="Failed to get guild.", ephemeral=True)
return
# Create user DM reminder job if user is specified
dm_message: str = ""
if user:
user_reminder: Job = scheduler.add_job(
func="discord_reminder_bot.main:send_to_user",
trigger="cron",
year=year,
month=month,
day=day,
week=week,
day_of_week=day_of_week,
hour=hour,
minute=minute,
second=second,
start_date=start_date,
end_date=end_date,
timezone=timezone,
jitter=jitter,
kwargs={
"user_id": user.id,
"guild_id": guild.id,
"message": message,
},
)
dm_message = f" and a DM to {user.display_name}"
if not dm_and_current_channel:
await interaction.followup.send(
content=f"Hello {interaction.user.display_name},\n"
f"I will send a DM to {user.display_name} at:\n"
f"First run in {calculate(user_reminder)} with the message:\n**{message}**.",
)
return
# Create channel reminder job
channel_job: Job = scheduler.add_job(
func="discord_reminder_bot.main:send_to_discord",
trigger="cron",
year=year,
month=month,
day=day,
week=week,
day_of_week=day_of_week,
hour=hour,
minute=minute,
second=second,
start_date=start_date,
end_date=end_date,
timezone=timezone,
jitter=jitter,
kwargs={
"channel_id": channel_id,
"message": message,
"author_id": interaction.user.id,
},
)
await interaction.followup.send(
content=f"Hello {interaction.user.display_name},\n"
f"I will notify you in <#{channel_id}>{dm_message}.\n"
f"First run in {calculate(channel_job)} with the message:\n**{message}**.",
)

View File

@ -0,0 +1,99 @@
from __future__ import annotations
import datetime
import discord
from loguru import logger
from discord_reminder_bot.parsers import parse_time
async def add_discord_event(
interaction: discord.Interaction,
message: str,
event_start: str,
event_end: str,
location: str,
reason: str | None = None,
) -> None:
"""Creates a new Discord scheduled event based on user input.
This command handles the creation of a scheduled event within the Discord guild
where the interaction originated. It parses the provided start and end times,
validates permissions, and creates the event using the Discord API. If the
specified start time is in the past, it adjusts the start time to be slightly
in the future and starts the event immediately.
Args:
interaction: The discord interaction object representing the command invocation.
message: The name or description for the scheduled event.
event_start: A string representing the desired start time for the event.
This string will be parsed into a datetime object.
event_end: A string representing the desired end time for the event.
This string will be parsed into a datetime object.
location: A string specifying the location for the event (e.g., a URL or physical address).
reason: An optional string providing a reason for the event creation,
visible in the guild's audit log. Defaults to a standard message
indicating the creator if None.
"""
logger.info(f"New event from {interaction.user} ({interaction.user.id}) in {interaction.channel}")
logger.info(f"Arguments: {locals()}")
# Check if we have a valid guild
guild: discord.Guild | None = interaction.guild
if not guild:
await interaction.followup.send(content="This command can only be used in a server.", ephemeral=True)
return
# Check if we have permission to create events
if not guild.me.guild_permissions.create_events:
await interaction.followup.send(content="I don't have permission to create events in this guild.", ephemeral=True)
return
event_start_time: datetime.datetime | None = parse_time(date_to_parse=event_start)
event_end_time: datetime.datetime | None = parse_time(date_to_parse=event_end)
if not event_start_time or not event_end_time:
await interaction.followup.send(content=f"Failed to parse time: {event_start} or {event_end}.", ephemeral=True)
return
# If event_start_time is in the past, make it now + 5 seconds
start_immediately: bool = False
if event_start_time < datetime.datetime.now(event_start_time.tzinfo):
start_immediately = True
event_start_time = datetime.datetime.now(event_start_time.tzinfo) + datetime.timedelta(seconds=5)
await interaction.followup.send(content="Event start time was in the past. Starting event in 5 seconds instead.")
reason_msg: str = f"Event created by {interaction.user} ({interaction.user.id})."
event: discord.ScheduledEvent = await guild.create_scheduled_event(
name=message,
start_time=event_start_time,
entity_type=discord.EntityType.external,
privacy_level=discord.PrivacyLevel.guild_only,
end_time=event_end_time,
reason=reason or reason_msg,
location=location,
)
if start_immediately:
await event.start()
msg: str = f"Event '{event.name}' created successfully!\n"
if event.start_time:
msg += f"Start Time: <t:{int(event.start_time.timestamp())}:R>\n"
if event.end_time:
msg += f"End Time: <t:{int(event.end_time.timestamp())}:R>\n"
if event.channel_id:
msg += f"Channel: <#{event.channel_id}>\n"
if event.location:
msg += f"Location: {event.location}\n"
if event.creator_id:
msg += f"Created by: <@{event.creator_id}>"
await interaction.followup.send(content=msg)

View File

@ -0,0 +1,135 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from discord_reminder_bot.parsers import calculate
from discord_reminder_bot.settings import scheduler
if TYPE_CHECKING:
import discord
from apscheduler.job import Job
from discord.interactions import InteractionChannel
logger: logging.Logger = logging.getLogger("discord_reminder_bot")
async def interval_reminder_job(
interaction: discord.Interaction,
message: str,
weeks: int = 0,
days: int = 0,
hours: int = 0,
minutes: int = 0,
seconds: int = 0,
start_date: str | None = None,
end_date: str | None = None,
timezone: str | None = None,
jitter: int | None = None,
channel: discord.TextChannel | None = None,
user: discord.User | None = None,
dm_and_current_channel: bool | None = None,
) -> None:
"""Create a new reminder that triggers based on an interval.
Args:
interaction (discord.Interaction): The interaction object for the command.
message (str): The content of the reminder.
weeks (int, optional): Number of weeks between each run. Defaults to 0.
days (int, optional): Number of days between each run. Defaults to 0.
hours (int, optional): Number of hours between each run. Defaults to 0.
minutes (int, optional): Number of minutes between each run. Defaults to 0.
seconds (int, optional): Number of seconds between each run. Defaults to 0.
start_date (str, optional): Earliest possible date/time to trigger on (inclusive). Will get parsed.
end_date (str, optional): Latest possible date/time to trigger on (inclusive). Will get parsed.
timezone (str, optional): Time zone to use for the date/time calculations Defaults to scheduler timezone.
jitter (int, optional): Delay the job execution by jitter seconds at most.
channel (discord.TextChannel, optional): The channel to send the reminder to. Defaults to current channel.
user (discord.User, optional): Send reminder as a DM to this user. Defaults to None.
dm_and_current_channel (bool, optional): If user is provided, send reminder as a DM to the user and in this channel. Defaults to only the user.
"""
logger.info("New interval job from %s (%s) in %s", interaction.user, interaction.user.id, interaction.channel)
logger.info("Arguments: %s", locals())
# Only allow intervals of 30 seconds or more so we don't spam the channel
if weeks == days == hours == minutes == 0 and seconds < 30:
await interaction.followup.send(content="Interval must be at least 30 seconds.", ephemeral=True)
return
# Check if we have access to the specified channel or the current channel
target_channel: InteractionChannel | None = channel or interaction.channel
if target_channel and interaction.guild and not target_channel.permissions_for(interaction.guild.me).send_messages:
await interaction.followup.send(
content=f"I don't have permission to send messages in <#{target_channel.id}>.",
ephemeral=True,
)
# Get the channel ID
channel_id: int | None = channel.id if channel else (interaction.channel.id if interaction.channel else None)
if not channel_id:
await interaction.followup.send(content="Failed to get channel.", ephemeral=True)
return
# Ensure the guild is valid
guild: discord.Guild | None = interaction.guild or None
if not guild:
await interaction.followup.send(content="Failed to get guild.", ephemeral=True)
return
# Create user DM reminder job if user is specified
dm_message: str = ""
if user:
dm_job: Job = scheduler.add_job(
func="discord_reminder_bot.main:send_to_user",
trigger="interval",
weeks=weeks,
days=days,
hours=hours,
minutes=minutes,
seconds=seconds,
start_date=start_date,
end_date=end_date,
timezone=timezone,
jitter=jitter,
kwargs={
"user_id": user.id,
"guild_id": guild.id,
"message": message,
},
)
dm_message = f" and a DM to {user.display_name} "
if not dm_and_current_channel:
await interaction.followup.send(
content=f"Hello {interaction.user.display_name},\n"
f"I will send a DM to {user.display_name} at:\n"
f"First run in {calculate(dm_job)} with the message:\n**{message}**.",
)
# Create channel reminder job
# TODO(TheLovinator): Test that "discord_reminder_bot.main:send_to_discord" is always there # noqa: TD003
channel_job: Job = scheduler.add_job(
func="discord_reminder_bot.main:send_to_discord",
trigger="interval",
weeks=weeks,
days=days,
hours=hours,
minutes=minutes,
seconds=seconds,
start_date=start_date,
end_date=end_date,
timezone=timezone,
jitter=jitter,
kwargs={
"channel_id": channel_id,
"message": message,
"author_id": interaction.user.id,
},
)
await interaction.followup.send(
content=f"Hello {interaction.user.display_name},\n"
f"I will notify you in <#{channel_id}>{dm_message}.\n"
f"First run in {calculate(channel_job)} with the message:\n**{message}**.",
)

View File

@ -0,0 +1,53 @@
from __future__ import annotations
from typing import TYPE_CHECKING
import discord
from loguru import logger
from discord_reminder_bot.markdown_utils import generate_reminder_summary
if TYPE_CHECKING:
from apscheduler.job import Job
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def list_reminder_job(interaction: discord.Interaction, scheduler: AsyncIOScheduler) -> None:
"""List all reminder jobs in the scheduler.
Args:
interaction (discord.Interaction): The interaction object for the command.
scheduler (AsyncIOScheduler): The scheduler to list the jobs from.
"""
user: discord.User | discord.Member = interaction.user
if not isinstance(user, discord.Member):
await interaction.followup.send(content="This command can only be used in a server.", ephemeral=True)
return
channel = interaction.channel
if not isinstance(channel, discord.TextChannel):
await interaction.followup.send(content="This command can only be used in a text channel.", ephemeral=True)
return
logger.info(f"Listing reminders for {user} ({user.id}) in {interaction.channel}")
logger.info(f"Arguments: {locals()}")
jobs: list[Job] = scheduler.get_jobs()
if not jobs:
await interaction.followup.send(content="No scheduled jobs found in the database.", ephemeral=True)
return
guild: discord.Guild | None = interaction.guild
if not guild:
await interaction.followup.send(content="Failed to get guild.", ephemeral=True)
return
message: discord.InteractionMessage = await interaction.original_response()
job_summary: list[str] = generate_reminder_summary(ctx=interaction, bot=interaction.client, scheduler=scheduler)
for i, msg in enumerate(job_summary):
if i == 0:
await message.edit(content=msg)
else:
await interaction.followup.send(content=msg)

View File

@ -0,0 +1,44 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from apscheduler.jobstores.base import JobLookupError
if TYPE_CHECKING:
import discord
from apscheduler.job import Job
from apscheduler.schedulers.asyncio import AsyncIOScheduler
logger: logging.Logger = logging.getLogger("discord_reminder_bot")
async def pause_reminder_job(interaction: discord.Interaction, job_id: str, scheduler: AsyncIOScheduler) -> None:
"""Pauses a specific reminder job in the APScheduler.
This function attempts to find and pause a job identified by `job_id`
within the provided `scheduler`. It logs the action and sends feedback
to the user via the Discord interaction, indicating success or failure.
Args:
interaction: The Discord interaction object representing the command invocation.
job_id: The unique identifier of the reminder job to pause.
scheduler: The AsyncIOScheduler instance managing the reminder jobs.
"""
logger.info("Pausing reminder with ID %s for %s (%s) in %s", job_id, interaction.user, interaction.user.id, interaction.channel)
logger.debug("Arguments: %s", locals())
logger.debug("Attempting to pause job...")
try:
job: Job | None = scheduler.get_job(job_id)
if not job:
await interaction.followup.send(content=f"Reminder with ID {job_id} not found.", ephemeral=True)
return
scheduler.pause_job(job_id)
logger.info("Paused job %s.", job_id)
await interaction.followup.send(content=f"Reminder with ID {job_id} paused successfully.")
except JobLookupError as e:
logger.exception("Failed to pause job %s", job_id)
await interaction.followup.send(content=f"Failed to pause reminder with ID {job_id}. {e}", ephemeral=True)
logger.info("Job %s paused in the scheduler.", job_id)

View File

@ -0,0 +1,51 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from apscheduler.jobstores.base import JobLookupError
from loguru import logger
from discord_reminder_bot.markdown_utils import generate_markdown_state
if TYPE_CHECKING:
import discord
from apscheduler.job import Job
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def remove_reminder_job(interaction: discord.Interaction, job_id: str, scheduler: AsyncIOScheduler) -> None:
"""Removes a specific reminder job from the scheduler based on its ID.
This function attempts to find a job associated with the given `job_id`
within the provided `scheduler`. If the job exists, it is removed,
and a confirmation message detailing the removed job's state is sent
as a follow-up to the interaction. If the job cannot be found, or if
an error occurs during the removal process (like `JobLookupError`),
an appropriate error message is sent as an ephemeral follow-up.
Args:
interaction (discord.Interaction): The Discord interaction object
that triggered the command. Used for sending follow-up messages.
job_id (str): The unique identifier of the reminder job to be removed.
scheduler (AsyncIOScheduler): The APScheduler instance managing the
scheduled reminder jobs.
"""
logger.debug(f"Removing reminder with ID {job_id} for {interaction.user} ({interaction.user.id}) in {interaction.channel}")
logger.debug(f"Arguments: {locals()}")
try:
job: Job | None = scheduler.get_job(job_id)
if not job:
await interaction.followup.send(content=f"Reminder with ID {job_id} not found.", ephemeral=True)
return
scheduler.remove_job(job_id)
logger.info(f"Removed job {job_id}. {job.__getstate__()}")
await interaction.followup.send(
content=f"Reminder with ID {job_id} removed successfully.\n{generate_markdown_state(job.__getstate__())}",
)
except JobLookupError as e:
logger.exception(f"Failed to remove job {job_id}")
await interaction.followup.send(content=f"Failed to remove reminder with ID {job_id}. {e}", ephemeral=True)
logger.info(f"Job {job_id} removed from the scheduler.")

View File

@ -0,0 +1,129 @@
from __future__ import annotations
import json
import logging
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING
import discord
from discord_reminder_bot.parsers import calculate
if TYPE_CHECKING:
import discord
from apscheduler.job import Job
from apscheduler.schedulers.asyncio import AsyncIOScheduler
logger: logging.Logger = logging.getLogger("discord_reminder_bot")
async def restore_reminder_job(bot: discord.Client, interaction: discord.Interaction, scheduler: AsyncIOScheduler) -> None: # noqa: PLR0915
"""Restores reminder jobs from a user-provided JSON backup file.
Prompts the user via the interaction followup to reply with a JSON file
containing the backup data. Waits for a message reply from the same user
in the same channel containing a single JSON attachment.
Validates the reply and attachment. Downloads the JSON file to a temporary
location, loads the job data, filters out any jobs that already exist in the
scheduler (based on job ID), and imports the remaining jobs using
`scheduler.import_jobs`.
Finally, sends a confirmation message listing the newly added jobs or
indicates if no new jobs were added.
Args:
bot (discord.Client): The Discord bot client instance, used to wait for
the user's reply message.
interaction (discord.Interaction): The interaction object representing the
command invocation. Used for sending messages back to the user.
scheduler (AsyncIOScheduler): The APScheduler instance where the jobs
will be restored.
"""
logger.info("Restoring reminders from file for %s (%s) in %s", interaction.user, interaction.user.id, interaction.channel)
logger.info("Arguments: %s", locals())
# Tell to reply with the file to this message
await interaction.followup.send(content="Please reply to this message with the backup file.")
# Get the old jobs
old_jobs: list[Job] = scheduler.get_jobs()
# Wait for the reply
while True:
try:
reply: discord.Message | None = await bot.wait_for("message", timeout=60, check=lambda m: m.author == interaction.user)
except TimeoutError:
edit_msg = "~~Please reply to this message with the backup file.~~\nTimed out after 60 seconds."
await interaction.edit_original_response(content=edit_msg)
return
if not reply:
await interaction.followup.send(content="No reply found. Please try again.")
continue
if not reply.channel:
await interaction.followup.send(content="No channel found. Please try again.")
continue
# Fetch the message by its ID to ensure we have the latest data
reply = await reply.channel.fetch_message(reply.id)
if not reply or not reply.attachments:
await interaction.followup.send(content="No file attached. Please try again.")
continue
break
# Get the attachment
attachment: discord.Attachment = reply.attachments[0]
if not attachment or not attachment.filename.endswith(".json"):
await interaction.followup.send(
content=f"Invalid file type. Should be a JSON file not '{attachment.filename}'. Please try again.",
)
return
jobs_already_exist: list[str] = []
# Save the file to a temporary file and import the jobs
with tempfile.NamedTemporaryFile(mode="w+", delete=False, encoding="utf-8", suffix=".json") as temp_file:
logger.info("Saving attachment to %s", temp_file.name)
await attachment.save(Path(temp_file.name))
# Load the jobs data from the file
temp_file.seek(0)
jobs_data: dict = json.load(temp_file)
logger.info("Importing jobs from file")
logger.debug("Jobs data: %s", jobs_data)
with tempfile.NamedTemporaryFile(mode="w+", delete=False, encoding="utf-8", suffix=".json") as temp_import_file:
# We can't import jobs with the same ID so remove them from the JSON
jobs = [job for job in jobs_data.get("jobs", []) if not scheduler.get_job(job.get("id"))]
jobs_already_exist = [job.get("id") for job in jobs_data.get("jobs", []) if scheduler.get_job(job.get("id"))]
jobs_data["jobs"] = jobs
for job_id in jobs_already_exist:
logger.debug("Skipping importing '%s' because it already exists in the db.", job_id)
logger.debug("Jobs data after filtering: %s", jobs_data)
logger.info("Jobs already exist: %s", jobs_already_exist)
# Write the new data to a temporary file
json.dump(jobs_data, temp_import_file)
temp_import_file.seek(0)
# Import the jobs
scheduler.import_jobs(temp_import_file.name)
# Get the new jobs
new_jobs: list[Job] = scheduler.get_jobs()
# Get the difference
added_jobs: list[Job] = [job for job in new_jobs if job not in old_jobs]
if added_jobs:
msg: str = "Reminders restored successfully.\nAdded jobs:\n"
for j in added_jobs:
msg += f"* Message: **{j.kwargs.get('message', 'No message found')}** {calculate(j) or 'N/A'}\n"
await interaction.followup.send(content=msg)
else:
await interaction.followup.send(content="No new reminders were added.")

View File

@ -0,0 +1,40 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from apscheduler.jobstores.base import JobLookupError
from loguru import logger
if TYPE_CHECKING:
import discord
from apscheduler.job import Job
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def unpause_reminder(interaction: discord.Interaction, job_id: str, scheduler: AsyncIOScheduler) -> None:
"""Unpauses a scheduled reminder job.
Finds a job by its ID in the scheduler and resumes it. Sends feedback
to the user via the interaction about the success or failure of the operation.
Args:
interaction: The discord interaction object representing the command invocation.
job_id: The unique identifier of the reminder job to unpause.
scheduler: The scheduler instance to manage the job.
"""
logger.debug(f"Unpausing reminder with ID {job_id} for {interaction.user} ({interaction.user.id}) in {interaction.channel}")
logger.debug(f"Arguments: {locals()}")
try:
job: Job | None = scheduler.get_job(job_id)
if not job:
await interaction.followup.send(content=f"Reminder with ID {job_id} not found.", ephemeral=True)
return
scheduler.resume_job(job_id)
logger.info(f"Unpaused job {job_id}.")
await interaction.followup.send(content=f"Reminder with ID {job_id} unpaused successfully.")
except JobLookupError as e:
logger.exception(f"Failed to unpause job {job_id}")
await interaction.followup.send(content=f"Failed to unpause reminder with ID {job_id}. {e}", ephemeral=True)
logger.info(f"Job {job_id} unpaused in the scheduler.")

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,145 @@
from __future__ import annotations
import json
from typing import TYPE_CHECKING, Any
import discord
from apscheduler.job import Job
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
from loguru import logger
from discord_reminder_bot.parsers import get_human_readable_time
from interactions.api.models.misc import Snowflake
if TYPE_CHECKING:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from discord.guild import GuildChannel
from discord.types.channel import _BaseChannel
def generate_markdown_state(state: dict[str, Any]) -> str:
"""Format the __getstate__ dictionary for Discord markdown.
Args:
state (dict): The __getstate__ dictionary.
Returns:
str: The formatted string.
"""
if not state:
return "```json\nNo state found.\n```"
# Convert the IntervalTrigger to a string representation
for key, value in state.items():
if isinstance(value, IntervalTrigger):
state[key] = "IntervalTrigger"
elif isinstance(value, DateTrigger):
state[key] = "DateTrigger"
elif isinstance(value, Job):
state[key] = "Job"
elif isinstance(value, Snowflake):
state[key] = str(value)
try:
msg: str = json.dumps(state, indent=4, default=str)
except TypeError as e:
e.add_note("This is likely due to a non-serializable object in the state. Please check the state for any non-serializable objects.")
e.add_note(f"{state=}")
logger.error(f"Failed to serialize state: {e}")
return "```json\nFailed to serialize state.\n```"
return "```json\n" + msg + "\n```"
def generate_reminder_summary(ctx: discord.Interaction, bot: discord.Client, scheduler: AsyncIOScheduler) -> list[str]: # noqa: PLR0912
"""Create a message with all the jobs, splitting messages into chunks of up to 2000 characters.
Args:
ctx (discord.Interaction): The context of the interaction.
bot (discord.Client): The Discord bot client.
scheduler (AsyncIOScheduler): The scheduler to get the jobs from.
Returns:
list[str]: A list of messages with all the jobs.
"""
jobs: list[Job] = scheduler.get_jobs()
msgs: list[str] = []
guild: discord.Guild | None = None
if isinstance(ctx.channel, discord.abc.GuildChannel):
guild = ctx.channel.guild
channels: list[GuildChannel] | list[_BaseChannel] = list(guild.channels) if guild else []
channels_in_this_guild: list[int] = [c.id for c in channels]
jobs_in_guild: list[Job] = []
for job in jobs:
guild_id: int = guild.id if guild else -1
guild_id_from_kwargs = int(job.kwargs.get("guild_id", 0))
if guild_id_from_kwargs and guild_id_from_kwargs != guild_id:
logger.debug(f"Skipping job: {job.id} because it's not in the current guild.")
continue
if job.kwargs.get("channel_id") not in channels_in_this_guild:
logger.debug(f"Skipping job: {job.id} because it's not in the current guild.")
continue
logger.debug(f"Adding job: {job.id} to the list.")
jobs_in_guild.append(job)
if len(jobs) != len(jobs_in_guild):
logger.info(f"Filtered out {len(jobs) - len(jobs_in_guild)} jobs that are not in the current guild.")
jobs = jobs_in_guild
if not jobs:
return ["No scheduled jobs found in the database."]
header = (
"You can use the following commands to manage reminders:\n"
"Only jobs in the current guild are shown.\n"
"`/remind pause <job_id>` - Pause a reminder\n"
"`/remind unpause <job_id>` - Unpause a reminder\n"
"`/remind remove <job_id>` - Remove a reminder\n"
"`/remind modify <job_id>` - Modify the time of a reminder\n"
"List of all reminders:\n"
)
current_msg: str = header
for job in jobs:
# Build job-specific message
job_msg: str = "```md\n"
job_msg += f"# {job.kwargs.get('message', '')}\n"
job_msg += f" * {job.id}\n"
job_msg += f" * {job.trigger} {get_human_readable_time(job)}"
if job.kwargs.get("user_id"):
job_msg += f" <@{job.kwargs.get('user_id')}>"
if job.kwargs.get("channel_id"):
channel = bot.get_channel(job.kwargs.get("channel_id"))
if channel and isinstance(channel, discord.abc.GuildChannel | discord.Thread):
job_msg += f" in #{channel.name}"
if job.kwargs.get("guild_id"):
guild = bot.get_guild(job.kwargs.get("guild_id"))
if guild:
job_msg += f" in {guild.name}"
job_msg += f" {job.kwargs.get('guild_id')}"
job_msg += "```"
# If adding this job exceeds 2000 characters, push the current message and start a new one.
if len(current_msg) + len(job_msg) > 2000:
msgs.append(current_msg)
current_msg = job_msg
else:
current_msg += job_msg
# Append any remaining content in current_msg.
if current_msg:
msgs.append(current_msg)
return msgs

View File

@ -0,0 +1,114 @@
from __future__ import annotations
import datetime
import logging
import os
from typing import TYPE_CHECKING
from zoneinfo import ZoneInfo
import dateparser
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
if TYPE_CHECKING:
from apscheduler.job import Job
logger: logging.Logger = logging.getLogger("discord_reminder_bot")
def parse_time(date_to_parse: str | None, timezone: str | None = os.getenv("TIMEZONE")) -> datetime.datetime | None:
"""Parse a date string into a datetime object.
Args:
date_to_parse(str): The date string to parse.
timezone(str, optional): The timezone to use. Defaults to the TIMEZONE environment variable.
Returns:
datetime.datetime: The parsed datetime object.
"""
if not date_to_parse:
logger.error("No date provided to parse.")
return None
if not timezone:
logger.error("No timezone provided to parse date.")
return None
logger.info("Parsing date: '%s' with timezone: '%s'", date_to_parse, timezone)
try:
parsed_date: datetime.datetime | None = dateparser.parse(
date_string=date_to_parse,
settings={
"PREFER_DATES_FROM": "future",
"TIMEZONE": f"{timezone}",
"RETURN_AS_TIMEZONE_AWARE": True,
"RELATIVE_BASE": datetime.datetime.now(tz=ZoneInfo(str(timezone))),
},
)
except (ValueError, TypeError):
logger.exception("Failed to parse date: '%s'", date_to_parse)
return None
logger.debug("Parsed date: %s with timezone: %s", parsed_date, timezone)
return parsed_date
def calculate(job: Job) -> str:
"""Calculate the time left for a job.
Args:
job: The job to calculate the time for.
Returns:
str: The time left for the job or "Paused" if the job is paused or has no next run time.
"""
trigger_time = None
if isinstance(job.trigger, DateTrigger | IntervalTrigger):
trigger_time = job.next_run_time or None
elif isinstance(job.trigger, CronTrigger):
if not job.next_run_time:
logger.debug("No next run time found for '%s', probably paused?", job.id)
logger.debug("%s", job.__getstate__())
return "Paused"
trigger_time = job.trigger.get_next_fire_time(None, datetime.datetime.now(tz=job._scheduler.timezone)) # noqa: SLF001
logger.debug("Trigger type: %s, Trigger time: %s", type(job.trigger), trigger_time)
if not trigger_time:
logger.debug("No trigger time found")
return "Paused"
return f"<t:{int(trigger_time.timestamp())}:R>"
def get_human_readable_time(job: Job) -> str:
"""Get the human-readable time for a job.
Args:
job: The job to get the time for.
Returns:
str: The human-readable time.
"""
trigger_time = None
if isinstance(job.trigger, DateTrigger | IntervalTrigger):
trigger_time = job.next_run_time or None
elif isinstance(job.trigger, CronTrigger):
if not job.next_run_time:
logger.debug("No next run time found for '%s', probably paused?", job.id)
logger.debug("%s", job.__getstate__())
return "Paused"
trigger_time = job.trigger.get_next_fire_time(None, datetime.datetime.now(tz=job._scheduler.timezone)) # noqa: SLF001
if not trigger_time:
logger.debug("No trigger time found")
return "Paused"
return trigger_time.strftime("%Y-%m-%d %H:%M:%S")

View File

@ -0,0 +1,58 @@
from __future__ import annotations
import logging
import os
import platform
from functools import lru_cache
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
import pytz
import sentry_sdk
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from dotenv import load_dotenv
logger: logging.Logger = logging.getLogger("discord_reminder_bot")
load_dotenv()
default_sentry_dsn: str = "https://c4c61a52838be9b5042144420fba5aaa@o4505228040339456.ingest.us.sentry.io/4508707268984832"
sentry_sdk.init(
dsn=os.getenv("SENTRY_DSN", default_sentry_dsn),
environment=platform.node() or "Unknown",
traces_sample_rate=1.0,
send_default_pii=True,
)
config_timezone: str = os.getenv("TIMEZONE", default="")
if not config_timezone:
msg = "Missing timezone. Please set the TIMEZONE environment variable."
raise ValueError(msg)
# Test if the timezone is valid
try:
ZoneInfo(config_timezone)
except (ZoneInfoNotFoundError, ModuleNotFoundError) as e:
msg: str = f"Invalid timezone: {config_timezone}. Error: {e}"
raise ValueError(msg) from e
sqlite_location: str = os.getenv("SQLITE_LOCATION", default="/jobs.sqlite")
logger.info("Using SQLite database at: %s", sqlite_location)
@lru_cache(maxsize=1)
def get_scheduler() -> AsyncIOScheduler:
"""Return the scheduler instance.
Uses the SQLITE_LOCATION environment variable for the SQLite database location.
Returns:
AsyncIOScheduler: The scheduler instance.
"""
jobstores: dict[str, SQLAlchemyJobStore] = {"default": SQLAlchemyJobStore(url=f"sqlite://{sqlite_location}")}
job_defaults: dict[str, bool] = {"coalesce": True}
timezone = pytz.timezone(config_timezone)
return AsyncIOScheduler(jobstores=jobstores, timezone=timezone, job_defaults=job_defaults)
scheduler: AsyncIOScheduler = get_scheduler()

View File

@ -12,7 +12,7 @@ from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
from discord_reminder_bot import main
from discord_reminder_bot.main import calculate, parse_time
from discord_reminder_bot.parsers import calculate, parse_time
if TYPE_CHECKING:
from apscheduler.job import Job