Refactor calculate() to properly handle paused jobs and fix tests
This commit is contained in:
@ -73,25 +73,26 @@ def dummy_job() -> None:
|
||||
def test_calculate() -> None:
|
||||
"""Test the calculate function with various job inputs."""
|
||||
scheduler = BackgroundScheduler()
|
||||
scheduler.timezone = timezone.utc
|
||||
scheduler.start()
|
||||
|
||||
# Create a job with a DateTrigger
|
||||
run_date = datetime(2270, 10, 1, 12, 0, 0, tzinfo=timezone.utc)
|
||||
run_date = datetime(2270, 10, 1, 12, 0, 0, tzinfo=scheduler.timezone)
|
||||
job: Job = scheduler.add_job(dummy_job, trigger=DateTrigger(run_date=run_date), id="test_job", name="Test Job")
|
||||
|
||||
expected_output = "<t:9490737600:R>"
|
||||
assert_msg: str = f"Expected {expected_output}, got {calculate(job)}"
|
||||
assert_msg: str = f"Expected {expected_output}, got {calculate(job)}\nState:{job.__getstate__()}"
|
||||
assert calculate(job) == expected_output, assert_msg
|
||||
|
||||
# Modify the job to have a next_run_time
|
||||
job.modify(next_run_time=run_date)
|
||||
assert_msg: str = f"Expected {expected_output}, got {calculate(job)}"
|
||||
assert_msg: str = f"Expected {expected_output}, got {calculate(job)}\nState:{job.__getstate__()}"
|
||||
assert calculate(job) == expected_output, assert_msg
|
||||
|
||||
# Paused job should still return the same output
|
||||
# Paused job should return "Paused"
|
||||
job.pause()
|
||||
assert_msg: str = f"Expected None, got {calculate(job)}"
|
||||
assert not calculate(job), assert_msg
|
||||
assert_msg: str = f"Expected 'Paused', got {calculate(job)}\nState:{job.__getstate__()}"
|
||||
assert calculate(job) == "Paused", assert_msg
|
||||
|
||||
scheduler.shutdown()
|
||||
|
||||
@ -101,7 +102,7 @@ def test_calculate_cronjob() -> None:
|
||||
scheduler = BackgroundScheduler()
|
||||
scheduler.start()
|
||||
|
||||
run_date = datetime(2270, 10, 1, 12, 0, 0, tzinfo=timezone.utc)
|
||||
run_date = datetime(2270, 10, 1, 12, 0, 0, tzinfo=scheduler.timezone)
|
||||
job: Job = scheduler.add_job(
|
||||
dummy_job,
|
||||
trigger=CronTrigger(
|
||||
@ -117,11 +118,10 @@ def test_calculate_cronjob() -> None:
|
||||
job.modify(next_run_time=run_date)
|
||||
|
||||
expected_output: str = f"<t:{int(run_date.timestamp())}:R>"
|
||||
assert calculate(job) == expected_output, f"Expected {expected_output}, got {calculate(job)}"
|
||||
assert calculate(job) == expected_output, f"Expected {expected_output}, got {calculate(job)}\nState:{job.__getstate__()}"
|
||||
|
||||
# You can't pause a CronTrigger job so this should return the same output
|
||||
job.pause()
|
||||
assert calculate(job) == expected_output, f"Expected {expected_output}, got {calculate(job)}"
|
||||
assert calculate(job) == "Paused", f"Expected Paused, got {calculate(job)}\nState:{job.__getstate__()}"
|
||||
scheduler.shutdown()
|
||||
|
||||
|
||||
@ -130,15 +130,15 @@ def test_calculate_intervaljob() -> None:
|
||||
scheduler = BackgroundScheduler()
|
||||
scheduler.start()
|
||||
|
||||
run_date = datetime(2270, 12, 31, 23, 59, 59, tzinfo=timezone.utc)
|
||||
run_date = datetime(2270, 12, 31, 23, 59, 59, tzinfo=scheduler.timezone)
|
||||
job = scheduler.add_job(dummy_job, trigger=IntervalTrigger(seconds=3600), id="test_interval_job", name="Test Interval Job")
|
||||
# Force next_run_time to expected value for testing
|
||||
job.modify(next_run_time=run_date)
|
||||
|
||||
expected_output = f"<t:{int(run_date.timestamp())}:R>"
|
||||
assert calculate(job) == expected_output, f"Expected {expected_output}, got {calculate(job)}"
|
||||
assert calculate(job) == expected_output, f"Expected {expected_output}, got {calculate(job)}\nState:{job.__getstate__()}"
|
||||
|
||||
# Paused job should return False
|
||||
# Paused job should return "Paused"
|
||||
job.pause()
|
||||
assert not calculate(job), f"Expected None, got {calculate(job)}"
|
||||
assert calculate(job) == "Paused", f"Expected Paused, got {calculate(job)}\nState:{job.__getstate__()}"
|
||||
scheduler.shutdown()
|
||||
|
Reference in New Issue
Block a user