Refactor calculate function and move it to main.py; remove misc.py and update tests

This commit is contained in:
2025-02-25 16:19:21 +01:00
parent b61ea5af19
commit cc293e800d
4 changed files with 123 additions and 189 deletions

View File

@ -18,12 +18,14 @@ from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_MISSED, JobExecutionEv
from apscheduler.job import Job from apscheduler.job import Job
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
from discord.abc import PrivateChannel from discord.abc import PrivateChannel
from discord_webhook import DiscordWebhook from discord_webhook import DiscordWebhook
from dotenv import load_dotenv from dotenv import load_dotenv
from loguru import logger from loguru import logger
from discord_reminder_bot.misc import calculate
from discord_reminder_bot.parser import parse_time from discord_reminder_bot.parser import parse_time
if TYPE_CHECKING: if TYPE_CHECKING:
@ -44,6 +46,35 @@ sentry_sdk.init(
) )
def calculate(job: Job) -> str:
"""Calculate the time left for a job.
Args:
job: The job to calculate the time for.
Returns:
str: The time left for the job.
"""
trigger_time = None
if isinstance(job.trigger, DateTrigger | IntervalTrigger):
trigger_time = job.next_run_time or None
elif isinstance(job.trigger, CronTrigger):
if not job.next_run_time:
logger.debug(f"No next run time found for '{job.id}', probably paused? {job.__getstate__()}")
return "Paused"
trigger_time = job.trigger.get_next_fire_time(None, datetime.datetime.now(tz=job._scheduler.timezone)) # noqa: SLF001
logger.debug(f"{type(job.trigger)=}, {trigger_time=}")
if not trigger_time:
logger.debug("No trigger time found")
return "Paused"
return f"<t:{int(trigger_time.timestamp())}:R>"
@lru_cache(maxsize=1) @lru_cache(maxsize=1)
def get_scheduler() -> AsyncIOScheduler: def get_scheduler() -> AsyncIOScheduler:
"""Return the scheduler instance. """Return the scheduler instance.

View File

@ -1,67 +0,0 @@
from __future__ import annotations
import datetime
from typing import TYPE_CHECKING
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
from loguru import logger
if TYPE_CHECKING:
from apscheduler.job import Job
def calculate(job: Job) -> str:
"""Calculate the time left for a job.
Args:
job: The job to calculate the time for.
Returns:
str: The time left for the job.
"""
trigger_time = None
if isinstance(job.trigger, DateTrigger | IntervalTrigger):
trigger_time = job.next_run_time or None
elif isinstance(job.trigger, CronTrigger):
if not job.next_run_time:
logger.debug("No next run time found so probably paused?")
return "Paused"
trigger_time = job.trigger.get_next_fire_time(None, datetime.datetime.now(tz=job._scheduler.timezone)) # noqa: SLF001
logger.debug(f"{type(job.trigger)=}, {trigger_time=}")
if not trigger_time:
logger.debug("No trigger time found")
return "Paused"
return f"<t:{int(trigger_time.timestamp())}:R>"
def get_human_time(time: datetime.timedelta) -> str:
"""Convert timedelta to human-readable format.
Args:
time: The timedelta to convert.
Returns:
str: The human-readable time.
"""
days, seconds = divmod(time.total_seconds(), 86400)
hours, seconds = divmod(seconds, 3600)
minutes, seconds = divmod(seconds, 60)
time_str: str = ""
if days:
time_str += f"{int(days)}d"
if hours:
time_str += f"{int(hours)}h"
if minutes:
time_str += f"{int(minutes)}m"
if seconds:
time_str += f"{int(seconds)}s"
return time_str

View File

@ -1,6 +1,97 @@
from __future__ import annotations from __future__ import annotations
from datetime import datetime, timezone
from typing import TYPE_CHECKING
from apscheduler.job import Job
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
from discord_reminder_bot import main from discord_reminder_bot import main
from discord_reminder_bot.main import calculate
if TYPE_CHECKING:
from apscheduler.job import Job
def dummy_job() -> None:
"""Dummy job function for testing."""
def test_calculate() -> None:
"""Test the calculate function with various job inputs."""
scheduler = BackgroundScheduler()
scheduler.timezone = timezone.utc
scheduler.start()
# Create a job with a DateTrigger
run_date = datetime(2270, 10, 1, 12, 0, 0, tzinfo=scheduler.timezone)
job: Job = scheduler.add_job(dummy_job, trigger=DateTrigger(run_date=run_date), id="test_job", name="Test Job")
expected_output = "<t:9490737600:R>"
assert_msg: str = f"Expected {expected_output}, got {calculate(job)}\nState:{job.__getstate__()}"
assert calculate(job) == expected_output, assert_msg
# Modify the job to have a next_run_time
job.modify(next_run_time=run_date)
assert_msg: str = f"Expected {expected_output}, got {calculate(job)}\nState:{job.__getstate__()}"
assert calculate(job) == expected_output, assert_msg
# Paused job should return "Paused"
job.pause()
assert_msg: str = f"Expected 'Paused', got {calculate(job)}\nState:{job.__getstate__()}"
assert calculate(job) == "Paused", assert_msg
scheduler.shutdown()
def test_calculate_cronjob() -> None:
"""Test the calculate function with a CronTrigger job."""
scheduler = BackgroundScheduler()
scheduler.start()
run_date = datetime(2270, 10, 1, 12, 0, 0, tzinfo=scheduler.timezone)
job: Job = scheduler.add_job(
dummy_job,
trigger=CronTrigger(
second=run_date.second,
minute=run_date.minute,
hour=run_date.hour,
day=run_date.day,
month=run_date.month,
year=run_date.year,
),
)
# Force next_run_time to expected value for testing
job.modify(next_run_time=run_date)
expected_output: str = f"<t:{int(run_date.timestamp())}:R>"
assert calculate(job) == expected_output, f"Expected {expected_output}, got {calculate(job)}\nState:{job.__getstate__()}"
job.pause()
assert calculate(job) == "Paused", f"Expected Paused, got {calculate(job)}\nState:{job.__getstate__()}"
scheduler.shutdown()
def test_calculate_intervaljob() -> None:
"""Test the calculate function with an IntervalTrigger job."""
scheduler = BackgroundScheduler()
scheduler.start()
run_date = datetime(2270, 12, 31, 23, 59, 59, tzinfo=scheduler.timezone)
job = scheduler.add_job(dummy_job, trigger=IntervalTrigger(seconds=3600), id="test_interval_job", name="Test Interval Job")
# Force next_run_time to expected value for testing
job.modify(next_run_time=run_date)
expected_output = f"<t:{int(run_date.timestamp())}:R>"
assert calculate(job) == expected_output, f"Expected {expected_output}, got {calculate(job)}\nState:{job.__getstate__()}"
# Paused job should return "Paused"
job.pause()
assert calculate(job) == "Paused", f"Expected Paused, got {calculate(job)}\nState:{job.__getstate__()}"
scheduler.shutdown()
def test_if_send_to_discord_is_in_main() -> None: def test_if_send_to_discord_is_in_main() -> None:

View File

@ -1,121 +0,0 @@
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING
from apscheduler.job import Job
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
from discord_reminder_bot.misc import calculate, get_human_time
if TYPE_CHECKING:
from apscheduler.job import Job
def test_get_human_time() -> None:
"""Test the get_human_time function with various timedelta inputs."""
test_timedelta = timedelta(days=1, hours=2, minutes=3, seconds=4)
expected_output: str = "1d2h3m4s"
assert_msg: str = f"Expected {expected_output}, got {get_human_time(test_timedelta)}"
assert get_human_time(test_timedelta) == expected_output, assert_msg
test_timedelta = timedelta(hours=5, minutes=6, seconds=7)
expected_output: str = "5h6m7s"
assert_msg = f"Expected {expected_output}, got {get_human_time(test_timedelta)}"
assert get_human_time(test_timedelta) == expected_output, assert_msg
test_timedelta = timedelta(minutes=8, seconds=9)
expected_output: str = "8m9s"
assert_msg = f"Expected {expected_output}, got {get_human_time(test_timedelta)}"
assert get_human_time(test_timedelta) == expected_output, assert_msg
test_timedelta = timedelta(seconds=10)
expected_output: str = "10s"
assert_msg = f"Expected {expected_output}, got {get_human_time(test_timedelta)}"
assert get_human_time(test_timedelta) == expected_output, assert_msg
test_timedelta = timedelta(days=0, hours=0, minutes=0, seconds=0)
expected_output: str = ""
assert_msg = f"Expected {expected_output}, got {get_human_time(test_timedelta)}"
assert get_human_time(test_timedelta) == expected_output, assert_msg
def dummy_job() -> None:
"""Dummy job function for testing."""
def test_calculate() -> None:
"""Test the calculate function with various job inputs."""
scheduler = BackgroundScheduler()
scheduler.timezone = timezone.utc
scheduler.start()
# Create a job with a DateTrigger
run_date = datetime(2270, 10, 1, 12, 0, 0, tzinfo=scheduler.timezone)
job: Job = scheduler.add_job(dummy_job, trigger=DateTrigger(run_date=run_date), id="test_job", name="Test Job")
expected_output = "<t:9490737600:R>"
assert_msg: str = f"Expected {expected_output}, got {calculate(job)}\nState:{job.__getstate__()}"
assert calculate(job) == expected_output, assert_msg
# Modify the job to have a next_run_time
job.modify(next_run_time=run_date)
assert_msg: str = f"Expected {expected_output}, got {calculate(job)}\nState:{job.__getstate__()}"
assert calculate(job) == expected_output, assert_msg
# Paused job should return "Paused"
job.pause()
assert_msg: str = f"Expected 'Paused', got {calculate(job)}\nState:{job.__getstate__()}"
assert calculate(job) == "Paused", assert_msg
scheduler.shutdown()
def test_calculate_cronjob() -> None:
"""Test the calculate function with a CronTrigger job."""
scheduler = BackgroundScheduler()
scheduler.start()
run_date = datetime(2270, 10, 1, 12, 0, 0, tzinfo=scheduler.timezone)
job: Job = scheduler.add_job(
dummy_job,
trigger=CronTrigger(
second=run_date.second,
minute=run_date.minute,
hour=run_date.hour,
day=run_date.day,
month=run_date.month,
year=run_date.year,
),
)
# Force next_run_time to expected value for testing
job.modify(next_run_time=run_date)
expected_output: str = f"<t:{int(run_date.timestamp())}:R>"
assert calculate(job) == expected_output, f"Expected {expected_output}, got {calculate(job)}\nState:{job.__getstate__()}"
job.pause()
assert calculate(job) == "Paused", f"Expected Paused, got {calculate(job)}\nState:{job.__getstate__()}"
scheduler.shutdown()
def test_calculate_intervaljob() -> None:
"""Test the calculate function with an IntervalTrigger job."""
scheduler = BackgroundScheduler()
scheduler.start()
run_date = datetime(2270, 12, 31, 23, 59, 59, tzinfo=scheduler.timezone)
job = scheduler.add_job(dummy_job, trigger=IntervalTrigger(seconds=3600), id="test_interval_job", name="Test Interval Job")
# Force next_run_time to expected value for testing
job.modify(next_run_time=run_date)
expected_output = f"<t:{int(run_date.timestamp())}:R>"
assert calculate(job) == expected_output, f"Expected {expected_output}, got {calculate(job)}\nState:{job.__getstate__()}"
# Paused job should return "Paused"
job.pause()
assert calculate(job) == "Paused", f"Expected Paused, got {calculate(job)}\nState:{job.__getstate__()}"
scheduler.shutdown()