Use celery tasks instead of systemd timers for periodic work; and add more tests
All checks were successful
Deploy to Server / deploy (push) Successful in 26s

This commit is contained in:
Joakim Hellsén 2026-04-08 03:23:18 +02:00
commit 66ea46cf23
Signed by: Joakim Hellsén
SSH key fingerprint: SHA256:/9h/CsExpFp+PRhsfA0xznFx2CGfTT5R/kpuFfUgEQk
25 changed files with 2133 additions and 104 deletions

View file

@ -1,13 +1,22 @@
import json
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING
from unittest import skipIf
from unittest.mock import patch
from django.db import connection
from django.test import TestCase
from twitch.management.commands.better_import_drops import Command
from twitch.management.commands.better_import_drops import detect_error_only_response
from twitch.management.commands.better_import_drops import detect_non_campaign_keyword
from twitch.management.commands.better_import_drops import (
extract_operation_name_from_parsed,
)
from twitch.management.commands.better_import_drops import move_completed_file
from twitch.management.commands.better_import_drops import move_file_to_broken_subdir
from twitch.management.commands.better_import_drops import repair_partially_broken_json
from twitch.models import DropBenefit
from twitch.models import DropCampaign
from twitch.models import Game
@ -426,6 +435,92 @@ class CampaignStructureDetectionTests(TestCase):
structure: str | None = command._detect_campaign_structure(response)
assert structure == "current_user_drop_campaigns"
def test_detects_user_drop_campaign_structure(self) -> None:
"""Ensure user.dropCampaign structure is correctly detected."""
command = Command()
response: dict[str, object] = {
"data": {
"user": {
"id": "123",
"dropCampaign": {"id": "c1", "name": "Test Campaign"},
"__typename": "User",
},
},
}
structure: str | None = command._detect_campaign_structure(response)
assert structure == "user_drop_campaign"
def test_detects_channel_viewer_campaigns_structure(self) -> None:
"""Ensure channel.viewerDropCampaigns structure is correctly detected."""
command = Command()
response: dict[str, object] = {
"data": {
"channel": {
"id": "123",
"viewerDropCampaigns": [{"id": "c1", "name": "Test Campaign"}],
"__typename": "Channel",
},
},
}
structure: str | None = command._detect_campaign_structure(response)
assert structure == "channel_viewer_campaigns"
class FileMoveUtilityTests(TestCase):
"""Tests for imported/broken file move utility helpers."""
def test_move_completed_file_sanitizes_operation_directory_name(self) -> None:
"""Ensure operation names are sanitized and campaign structure subdir is respected."""
with TemporaryDirectory() as tmp_dir:
root_path = Path(tmp_dir)
imported_root = root_path / "imported"
source_file = root_path / "payload.json"
source_file.write_text("{}", encoding="utf-8")
with patch(
"twitch.management.commands.better_import_drops.get_imported_directory_root",
return_value=imported_root,
):
target_dir = move_completed_file(
file_path=source_file,
operation_name="My Op/Name\\v1",
campaign_structure="inventory_campaigns",
)
expected_dir = imported_root / "My_Op_Name_v1" / "inventory_campaigns"
assert target_dir == expected_dir
assert not source_file.exists()
assert (expected_dir / "payload.json").exists()
def test_move_file_to_broken_subdir_avoids_duplicate_operation_segment(
self,
) -> None:
"""Ensure matching reason and operation names do not create duplicate directories."""
with TemporaryDirectory() as tmp_dir:
root_path = Path(tmp_dir)
broken_root = root_path / "broken"
source_file = root_path / "broken_payload.json"
source_file.write_text("{}", encoding="utf-8")
with patch(
"twitch.management.commands.better_import_drops.get_broken_directory_root",
return_value=broken_root,
):
broken_dir = move_file_to_broken_subdir(
file_path=source_file,
subdir="validation_failed",
operation_name="validation_failed",
)
path_segments = broken_dir.as_posix().split("/")
assert path_segments.count("validation_failed") == 1
assert not source_file.exists()
assert (broken_dir / "broken_payload.json").exists()
class OperationNameFilteringTests(TestCase):
"""Tests for filtering campaigns by operation_name field."""
@ -864,3 +959,179 @@ class ErrorOnlyResponseDetectionTests(TestCase):
result = detect_error_only_response(parsed_json)
assert result == "error_only: unknown error"
class NonCampaignKeywordDetectionTests(TestCase):
"""Tests for non-campaign operation keyword detection."""
def test_detects_known_non_campaign_operation(self) -> None:
"""Ensure known operationName values are detected as non-campaign payloads."""
raw_text = json.dumps({"extensions": {"operationName": "PlaybackAccessToken"}})
result = detect_non_campaign_keyword(raw_text)
assert result == "PlaybackAccessToken"
def test_returns_none_for_unknown_operation(self) -> None:
"""Ensure unrelated operation names are not flagged."""
raw_text = json.dumps({"extensions": {"operationName": "DropCampaignDetails"}})
result = detect_non_campaign_keyword(raw_text)
assert result is None
class OperationNameExtractionTests(TestCase):
"""Tests for operation name extraction across supported payload shapes."""
def test_extracts_operation_name_from_json_repair_tuple(self) -> None:
"""Ensure extraction supports tuple payloads returned by json_repair."""
payload = (
{"extensions": {"operationName": "ViewerDropsDashboard"}},
[{"json_repair": "log"}],
)
result = extract_operation_name_from_parsed(payload)
assert result == "ViewerDropsDashboard"
def test_extracts_operation_name_from_list_payload(self) -> None:
"""Ensure extraction inspects the first response in list payloads."""
payload = [
{"extensions": {"operationName": "Inventory"}},
{"extensions": {"operationName": "IgnoredSecondItem"}},
]
result = extract_operation_name_from_parsed(payload)
assert result == "Inventory"
def test_returns_none_for_empty_list_payload(self) -> None:
"""Ensure extraction returns None for empty list payloads."""
result = extract_operation_name_from_parsed([])
assert result is None
class JsonRepairTests(TestCase):
"""Tests for partial JSON repair fallback behavior."""
def test_repair_filters_non_graphql_items_from_list(self) -> None:
"""Ensure repaired list output only keeps GraphQL-like response objects."""
raw_text = '[{"foo": 1}, {"data": {"currentUser": {"id": "1"}}}]'
repaired = repair_partially_broken_json(raw_text)
parsed = json.loads(repaired)
assert parsed == [{"data": {"currentUser": {"id": "1"}}}]
class ProcessFileWorkerTests(TestCase):
"""Tests for process_file_worker early-return behaviors."""
def test_returns_reason_when_drop_campaign_key_missing(self) -> None:
"""Ensure files without dropCampaign are marked failed with clear reason."""
command = Command()
repo_root: Path = Path(__file__).resolve().parents[2]
temp_path: Path = repo_root / "twitch" / "tests" / "tmp_no_drop_campaign.json"
temp_path.write_text(
json.dumps({"data": {"currentUser": {"id": "123"}}}),
encoding="utf-8",
)
try:
result = command.process_file_worker(
file_path=temp_path,
options={"crash_on_error": False, "skip_broken_moves": True},
)
finally:
if temp_path.exists():
temp_path.unlink()
assert result["success"] is False
assert result["broken_dir"] == "(skipped)"
assert result["reason"] == "no dropCampaign present"
def test_returns_reason_for_error_only_response(self) -> None:
"""Ensure error-only responses are marked failed with extracted reason."""
command = Command()
repo_root: Path = Path(__file__).resolve().parents[2]
temp_path: Path = repo_root / "twitch" / "tests" / "tmp_error_only.json"
temp_path.write_text(
json.dumps(
{
"errors": [{"message": "service timeout"}],
"data": None,
},
),
encoding="utf-8",
)
try:
result = command.process_file_worker(
file_path=temp_path,
options={"crash_on_error": False, "skip_broken_moves": True},
)
finally:
if temp_path.exists():
temp_path.unlink()
assert result["success"] is False
assert result["broken_dir"] == "(skipped)"
assert result["reason"] == "error_only: service timeout"
def test_returns_reason_for_known_non_campaign_keyword(self) -> None:
"""Ensure known non-campaign operation payloads are rejected with reason."""
command = Command()
repo_root: Path = Path(__file__).resolve().parents[2]
temp_path: Path = (
repo_root / "twitch" / "tests" / "tmp_non_campaign_keyword.json"
)
temp_path.write_text(
json.dumps(
{
"data": {"currentUser": {"id": "123"}},
"extensions": {"operationName": "PlaybackAccessToken"},
},
),
encoding="utf-8",
)
try:
result = command.process_file_worker(
file_path=temp_path,
options={"crash_on_error": False, "skip_broken_moves": True},
)
finally:
if temp_path.exists():
temp_path.unlink()
assert result["success"] is False
assert result["broken_dir"] == "(skipped)"
assert result["reason"] == "matched 'PlaybackAccessToken'"
class NormalizeResponsesTests(TestCase):
"""Tests for response normalization across supported payload formats."""
def test_normalizes_batched_responses_wrapper(self) -> None:
"""Ensure batched payloads under responses key are unwrapped and filtered."""
command = Command()
parsed_json: dict[str, object] = {
"responses": [
{"data": {"currentUser": {"id": "1"}}},
"invalid-item",
{"extensions": {"operationName": "Inventory"}},
],
}
normalized = command._normalize_responses(parsed_json)
assert len(normalized) == 2
assert normalized[0]["data"]["currentUser"]["id"] == "1"
assert normalized[1]["extensions"]["operationName"] == "Inventory"
def test_returns_empty_list_for_empty_tuple_payload(self) -> None:
"""Ensure empty tuple payloads from json_repair produce no responses."""
command = Command()
normalized = command._normalize_responses(()) # type: ignore[arg-type]
assert normalized == []

View file

@ -0,0 +1,150 @@
from io import StringIO
from unittest.mock import patch
import httpx
import pytest
from django.core.management import CommandError
from django.core.management import call_command
from twitch.models import ChatBadge
from twitch.models import ChatBadgeSet
from twitch.schemas import GlobalChatBadgesResponse
pytestmark: pytest.MarkDecorator = pytest.mark.django_db
def _build_response(title: str = "VIP") -> GlobalChatBadgesResponse:
"""Build a valid GlobalChatBadgesResponse payload for tests.
Returns:
A validated Twitch global chat badges response object.
"""
return GlobalChatBadgesResponse.model_validate({
"data": [
{
"set_id": "vip",
"versions": [
{
"id": "1",
"image_url_1x": "https://example.com/vip-1x.png",
"image_url_2x": "https://example.com/vip-2x.png",
"image_url_4x": "https://example.com/vip-4x.png",
"title": title,
"description": "VIP Badge",
"click_action": "visit_url",
"click_url": "https://help.twitch.tv",
},
],
},
],
})
def test_raises_when_client_id_missing(monkeypatch: pytest.MonkeyPatch) -> None:
"""Command should fail when client ID is not provided."""
monkeypatch.delenv("TWITCH_CLIENT_ID", raising=False)
monkeypatch.delenv("TWITCH_CLIENT_SECRET", raising=False)
monkeypatch.delenv("TWITCH_ACCESS_TOKEN", raising=False)
with pytest.raises(CommandError, match="Twitch Client ID is required"):
call_command("import_chat_badges", stdout=StringIO())
def test_raises_when_token_and_secret_missing(monkeypatch: pytest.MonkeyPatch) -> None:
"""Command should fail when no token and no client secret are available."""
monkeypatch.delenv("TWITCH_CLIENT_SECRET", raising=False)
monkeypatch.delenv("TWITCH_ACCESS_TOKEN", raising=False)
with pytest.raises(CommandError, match="Either --access-token or --client-secret"):
call_command(
"import_chat_badges",
"--client-id",
"client-id",
stdout=StringIO(),
)
@pytest.mark.django_db
def test_import_creates_then_updates_existing_badge() -> None:
"""Running import twice should update existing badges rather than duplicate them."""
with patch(
"twitch.management.commands.import_chat_badges.Command._fetch_global_chat_badges",
side_effect=[_build_response("VIP"), _build_response("VIP Updated")],
) as fetch_mock:
call_command(
"import_chat_badges",
"--client-id",
"client-id",
"--access-token",
"access-token",
stdout=StringIO(),
)
call_command(
"import_chat_badges",
"--client-id",
"client-id",
"--access-token",
"access-token",
stdout=StringIO(),
)
assert fetch_mock.call_count == 2
assert ChatBadgeSet.objects.count() == 1
assert ChatBadge.objects.count() == 1
badge_set = ChatBadgeSet.objects.get(set_id="vip")
badge = ChatBadge.objects.get(badge_set=badge_set, badge_id="1")
assert badge.title == "VIP Updated"
assert badge.click_action == "visit_url"
assert badge.click_url == "https://help.twitch.tv"
@pytest.mark.django_db
def test_uses_client_credentials_when_access_token_missing() -> None:
"""Command should obtain token from Twitch when no access token is provided."""
with (
patch(
"twitch.management.commands.import_chat_badges.Command._get_app_access_token",
return_value="generated-token",
) as token_mock,
patch(
"twitch.management.commands.import_chat_badges.Command._fetch_global_chat_badges",
return_value=GlobalChatBadgesResponse.model_validate({"data": []}),
) as fetch_mock,
):
call_command(
"import_chat_badges",
"--client-id",
"client-id",
"--client-secret",
"client-secret",
stdout=StringIO(),
)
token_mock.assert_called_once_with("client-id", "client-secret")
fetch_mock.assert_called_once_with(
client_id="client-id",
access_token="generated-token",
)
def test_wraps_http_errors_from_badges_fetch() -> None:
"""Command should convert HTTP client errors to CommandError."""
with (
patch(
"twitch.management.commands.import_chat_badges.Command._fetch_global_chat_badges",
side_effect=httpx.HTTPError("boom"),
),
pytest.raises(
CommandError,
match="Failed to fetch chat badges from Twitch API",
),
):
call_command(
"import_chat_badges",
"--client-id",
"client-id",
"--access-token",
"access-token",
stdout=StringIO(),
)

575
twitch/tests/test_tasks.py Normal file
View file

@ -0,0 +1,575 @@
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import MagicMock
from unittest.mock import call
from unittest.mock import patch
import httpx
import pytest
from twitch.models import DropBenefit
from twitch.models import DropCampaign
from twitch.models import RewardCampaign
from twitch.tasks import _convert_to_modern_formats
from twitch.tasks import _download_and_save
from twitch.tasks import backup_database
from twitch.tasks import download_all_images
from twitch.tasks import download_benefit_image
from twitch.tasks import download_campaign_image
from twitch.tasks import download_game_image
from twitch.tasks import download_reward_campaign_image
from twitch.tasks import import_chat_badges
from twitch.tasks import import_twitch_file
from twitch.tasks import scan_pending_twitch_files
def test_scan_pending_twitch_files_dispatches_json_files(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
"""It should dispatch an import task for each JSON file in the pending directory."""
first = tmp_path / "one.json"
second = tmp_path / "two.json"
ignored = tmp_path / "notes.txt"
first.write_text("{}", encoding="utf-8")
second.write_text("{}", encoding="utf-8")
ignored.write_text("ignored", encoding="utf-8")
monkeypatch.setenv("TTVDROPS_PENDING_DIR", str(tmp_path))
with patch("twitch.tasks.import_twitch_file.delay") as delay_mock:
scan_pending_twitch_files.run()
delay_mock.assert_has_calls([call(str(first)), call(str(second))], any_order=True)
assert delay_mock.call_count == 2
def test_scan_pending_twitch_files_skips_when_env_missing(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""It should do nothing when TTVDROPS_PENDING_DIR is not configured."""
monkeypatch.delenv("TTVDROPS_PENDING_DIR", raising=False)
with patch("twitch.tasks.import_twitch_file.delay") as delay_mock:
scan_pending_twitch_files.run()
delay_mock.assert_not_called()
def test_import_twitch_file_calls_importer_for_existing_file(tmp_path: Path) -> None:
"""It should run BetterImportDrops with the provided path when the file exists."""
source = tmp_path / "drops.json"
source.write_text("{}", encoding="utf-8")
with patch(
"twitch.management.commands.better_import_drops.Command.handle",
) as handle_mock:
import_twitch_file.run(str(source))
assert handle_mock.call_count == 1
assert handle_mock.call_args.kwargs["path"] == source
def test_import_twitch_file_retries_on_import_error(tmp_path: Path) -> None:
"""It should retry when the importer raises an exception."""
source = tmp_path / "drops.json"
source.write_text("{}", encoding="utf-8")
error = RuntimeError("boom")
with (
patch(
"twitch.management.commands.better_import_drops.Command.handle",
side_effect=error,
),
patch.object(
import_twitch_file,
"retry",
side_effect=RuntimeError("retried"),
) as retry_mock,
pytest.raises(RuntimeError, match="retried"),
):
import_twitch_file.run(str(source))
retry_mock.assert_called_once_with(exc=error)
def test_download_and_save_skips_when_image_already_cached() -> None:
"""It should not download when the target ImageField already has a file name."""
file_field = MagicMock()
file_field.name = "already-there.jpg"
with patch("twitch.tasks.httpx.Client") as client_mock:
result = _download_and_save(
url="https://example.com/test.jpg",
name="game-1",
file_field=file_field,
)
assert result is False
client_mock.assert_not_called()
def test_download_and_save_saves_downloaded_content() -> None:
"""It should save downloaded bytes and trigger modern format conversion."""
file_field = MagicMock()
file_field.name = ""
file_field.path = "C:/cache/game-1.png"
response = MagicMock()
response.content = b"img-bytes"
response.raise_for_status.return_value = None
client = MagicMock()
client.get.return_value = response
client_cm = MagicMock()
client_cm.__enter__.return_value = client
with (
patch("twitch.tasks.httpx.Client", return_value=client_cm),
patch("twitch.tasks._convert_to_modern_formats") as convert_mock,
):
result = _download_and_save(
url="https://example.com/path/picture.png",
name="game-1",
file_field=file_field,
)
assert result is True
file_field.save.assert_called_once()
assert file_field.save.call_args.args[0] == "game-1.png"
assert file_field.save.call_args.kwargs["save"] is True
convert_mock.assert_called_once_with(Path("C:/cache/game-1.png"))
def test_download_and_save_returns_false_on_http_error() -> None:
"""It should return False when HTTP requests fail."""
file_field = MagicMock()
file_field.name = ""
client = MagicMock()
client.get.side_effect = httpx.HTTPError("network down")
client_cm = MagicMock()
client_cm.__enter__.return_value = client
with patch("twitch.tasks.httpx.Client", return_value=client_cm):
result = _download_and_save(
url="https://example.com/path/picture.png",
name="game-1",
file_field=file_field,
)
assert result is False
file_field.save.assert_not_called()
def test_download_game_image_downloads_normalized_twitch_url() -> None:
"""It should normalize Twitch box art URLs before downloading."""
game = SimpleNamespace(
box_art="https://static-cdn.jtvnw.net/ttv-boxart/1-{width}x{height}.jpg",
twitch_id="123",
box_art_file=MagicMock(),
)
with (
patch("twitch.models.Game.objects.get", return_value=game),
patch(
"twitch.utils.is_twitch_box_art_url",
return_value=True,
) as is_twitch_mock,
patch(
"twitch.utils.normalize_twitch_box_art_url",
return_value="https://cdn.example.com/box.jpg",
) as normalize_mock,
patch("twitch.tasks._download_and_save") as save_mock,
):
download_game_image.run(1)
is_twitch_mock.assert_called_once_with(game.box_art)
normalize_mock.assert_called_once_with(game.box_art)
save_mock.assert_called_once_with(
"https://cdn.example.com/box.jpg",
"123",
game.box_art_file,
)
def test_download_game_image_retries_on_download_error() -> None:
"""It should retry when the image download helper fails unexpectedly."""
game = SimpleNamespace(
box_art="https://static-cdn.jtvnw.net/ttv-boxart/1-{width}x{height}.jpg",
twitch_id="123",
box_art_file=MagicMock(),
)
error = RuntimeError("boom")
with (
patch("twitch.models.Game.objects.get", return_value=game),
patch("twitch.utils.is_twitch_box_art_url", return_value=True),
patch(
"twitch.utils.normalize_twitch_box_art_url",
return_value="https://cdn.example.com/box.jpg",
),
patch("twitch.tasks._download_and_save", side_effect=error),
patch.object(
download_game_image,
"retry",
side_effect=RuntimeError("retried"),
) as retry_mock,
pytest.raises(RuntimeError, match="retried"),
):
download_game_image.run(1)
retry_mock.assert_called_once_with(exc=error)
def test_download_all_images_calls_expected_commands() -> None:
"""It should invoke both image import management commands."""
with patch("twitch.tasks.call_command") as call_command_mock:
download_all_images.run()
call_command_mock.assert_has_calls([
call("download_box_art"),
call("download_campaign_images", model="all"),
])
def test_import_chat_badges_retries_on_error() -> None:
"""It should retry when import_chat_badges command execution fails."""
error = RuntimeError("boom")
with (
patch("twitch.tasks.call_command", side_effect=error),
patch.object(
import_chat_badges,
"retry",
side_effect=RuntimeError("retried"),
) as retry_mock,
pytest.raises(RuntimeError, match="retried"),
):
import_chat_badges.run()
retry_mock.assert_called_once_with(exc=error)
def test_backup_database_calls_backup_command() -> None:
"""It should invoke the backup_db management command."""
with patch("twitch.tasks.call_command") as call_command_mock:
backup_database.run()
call_command_mock.assert_called_once_with("backup_db")
def test_convert_to_modern_formats_skips_non_image_files(tmp_path: Path) -> None:
"""It should skip files that are not jpg, jpeg, or png."""
text_file = tmp_path / "document.txt"
text_file.write_text("Not an image", encoding="utf-8")
with patch("PIL.Image.open") as open_mock:
_convert_to_modern_formats(text_file)
open_mock.assert_not_called()
def test_convert_to_modern_formats_skips_missing_files(tmp_path: Path) -> None:
"""It should skip files that do not exist."""
missing_file = tmp_path / "nonexistent.jpg"
with patch("PIL.Image.open") as open_mock:
_convert_to_modern_formats(missing_file)
open_mock.assert_not_called()
def test_convert_to_modern_formats_converts_rgb_image(tmp_path: Path) -> None:
"""It should save image in WebP and AVIF formats for RGB images."""
original = tmp_path / "image.jpg"
original.write_bytes(b"fake-jpeg")
mock_image = MagicMock()
mock_image.mode = "RGB"
mock_copy = MagicMock()
mock_image.copy.return_value = mock_copy
with (
patch("PIL.Image.open") as open_mock,
):
open_mock.return_value.__enter__.return_value = mock_image
_convert_to_modern_formats(original)
open_mock.assert_called_once_with(original)
assert mock_copy.save.call_count == 2
webp_call = [c for c in mock_copy.save.call_args_list if "webp" in str(c)]
avif_call = [c for c in mock_copy.save.call_args_list if "avif" in str(c)]
assert len(webp_call) == 1
assert len(avif_call) == 1
def test_convert_to_modern_formats_converts_rgba_image_with_background(
tmp_path: Path,
) -> None:
"""It should create RGB background and paste RGBA image onto it."""
original = tmp_path / "image.png"
original.write_bytes(b"fake-png")
mock_rgba = MagicMock()
mock_rgba.mode = "RGBA"
mock_rgba.split.return_value = [MagicMock(), MagicMock(), MagicMock(), MagicMock()]
mock_rgba.size = (100, 100)
mock_rgba.convert.return_value = mock_rgba
mock_bg = MagicMock()
with (
patch("PIL.Image.open") as open_mock,
patch("PIL.Image.new", return_value=mock_bg) as new_mock,
):
open_mock.return_value.__enter__.return_value = mock_rgba
_convert_to_modern_formats(original)
new_mock.assert_called_once_with("RGB", (100, 100), (255, 255, 255))
mock_bg.paste.assert_called_once()
assert mock_bg.save.call_count == 2
def test_convert_to_modern_formats_handles_conversion_error(tmp_path: Path) -> None:
"""It should gracefully handle format conversion failures."""
original = tmp_path / "image.jpg"
original.write_bytes(b"fake-jpeg")
mock_image = MagicMock()
mock_image.mode = "RGB"
mock_image.copy.return_value = MagicMock()
mock_image.copy.return_value.save.side_effect = Exception("PIL error")
with (
patch("PIL.Image.open") as open_mock,
):
open_mock.return_value.__enter__.return_value = mock_image
_convert_to_modern_formats(original) # Should not raise
def test_download_campaign_image_downloads_when_url_exists() -> None:
"""It should download and save campaign image when image_url is present."""
campaign = SimpleNamespace(
image_url="https://example.com/campaign.jpg",
twitch_id="camp123",
image_file=MagicMock(),
)
with (
patch("twitch.models.DropCampaign.objects.get", return_value=campaign),
patch("twitch.tasks._download_and_save", return_value=True) as save_mock,
):
download_campaign_image.run(1)
save_mock.assert_called_once_with(
"https://example.com/campaign.jpg",
"camp123",
campaign.image_file,
)
def test_download_campaign_image_skips_when_no_url() -> None:
"""It should skip when campaign has no image_url."""
campaign = SimpleNamespace(
image_url=None,
twitch_id="camp123",
image_file=MagicMock(),
)
with (
patch("twitch.models.DropCampaign.objects.get", return_value=campaign),
patch("twitch.tasks._download_and_save") as save_mock,
):
download_campaign_image.run(1)
save_mock.assert_not_called()
def test_download_campaign_image_skips_when_not_found() -> None:
"""It should skip when campaign does not exist."""
with (
patch(
"twitch.models.DropCampaign.objects.get",
side_effect=DropCampaign.DoesNotExist(),
),
patch("twitch.tasks._download_and_save") as save_mock,
):
download_campaign_image.run(1)
save_mock.assert_not_called()
def test_download_campaign_image_retries_on_error() -> None:
"""It should retry when image download fails unexpectedly."""
campaign = SimpleNamespace(
image_url="https://example.com/campaign.jpg",
twitch_id="camp123",
image_file=MagicMock(),
)
error = RuntimeError("boom")
with (
patch("twitch.models.DropCampaign.objects.get", return_value=campaign),
patch("twitch.tasks._download_and_save", side_effect=error),
patch.object(
download_campaign_image,
"retry",
side_effect=RuntimeError("retried"),
) as retry_mock,
pytest.raises(RuntimeError, match="retried"),
):
download_campaign_image.run(1)
retry_mock.assert_called_once_with(exc=error)
def test_download_benefit_image_downloads_when_url_exists() -> None:
"""It should download and save benefit image when image_asset_url is present."""
benefit = SimpleNamespace(
image_asset_url="https://example.com/benefit.png",
twitch_id="benefit123",
image_file=MagicMock(),
)
with (
patch("twitch.models.DropBenefit.objects.get", return_value=benefit),
patch("twitch.tasks._download_and_save", return_value=True) as save_mock,
):
download_benefit_image.run(1)
save_mock.assert_called_once_with(
url="https://example.com/benefit.png",
name="benefit123",
file_field=benefit.image_file,
)
def test_download_benefit_image_skips_when_no_url() -> None:
"""It should skip when benefit has no image_asset_url."""
benefit = SimpleNamespace(
image_asset_url=None,
twitch_id="benefit123",
image_file=MagicMock(),
)
with (
patch("twitch.models.DropBenefit.objects.get", return_value=benefit),
patch("twitch.tasks._download_and_save") as save_mock,
):
download_benefit_image.run(1)
save_mock.assert_not_called()
def test_download_benefit_image_skips_when_not_found() -> None:
"""It should skip when benefit does not exist."""
with (
patch(
"twitch.models.DropBenefit.objects.get",
side_effect=DropBenefit.DoesNotExist(),
),
patch("twitch.tasks._download_and_save") as save_mock,
):
download_benefit_image.run(1)
save_mock.assert_not_called()
def test_download_benefit_image_retries_on_error() -> None:
"""It should retry when image download fails unexpectedly."""
benefit = SimpleNamespace(
image_asset_url="https://example.com/benefit.png",
twitch_id="benefit123",
image_file=MagicMock(),
)
error = RuntimeError("boom")
with (
patch("twitch.models.DropBenefit.objects.get", return_value=benefit),
patch("twitch.tasks._download_and_save", side_effect=error),
patch.object(
download_benefit_image,
"retry",
side_effect=RuntimeError("retried"),
) as retry_mock,
pytest.raises(RuntimeError, match="retried"),
):
download_benefit_image.run(1)
retry_mock.assert_called_once_with(exc=error)
def test_download_reward_campaign_image_downloads_when_url_exists() -> None:
"""It should download and save reward campaign image when image_url is present."""
reward = SimpleNamespace(
image_url="https://example.com/reward.jpg",
twitch_id="reward123",
image_file=MagicMock(),
)
with (
patch("twitch.models.RewardCampaign.objects.get", return_value=reward),
patch("twitch.tasks._download_and_save", return_value=True) as save_mock,
):
download_reward_campaign_image.run(1)
save_mock.assert_called_once_with(
"https://example.com/reward.jpg",
"reward123",
reward.image_file,
)
def test_download_reward_campaign_image_skips_when_no_url() -> None:
"""It should skip when reward has no image_url."""
reward = SimpleNamespace(
image_url=None,
twitch_id="reward123",
image_file=MagicMock(),
)
with (
patch("twitch.models.RewardCampaign.objects.get", return_value=reward),
patch("twitch.tasks._download_and_save") as save_mock,
):
download_reward_campaign_image.run(1)
save_mock.assert_not_called()
def test_download_reward_campaign_image_skips_when_not_found() -> None:
"""It should skip when reward does not exist."""
with (
patch(
"twitch.models.RewardCampaign.objects.get",
side_effect=RewardCampaign.DoesNotExist(),
),
patch("twitch.tasks._download_and_save") as save_mock,
):
download_reward_campaign_image.run(1)
save_mock.assert_not_called()
def test_download_reward_campaign_image_retries_on_error() -> None:
"""It should retry when image download fails unexpectedly."""
reward = SimpleNamespace(
image_url="https://example.com/reward.jpg",
twitch_id="reward123",
image_file=MagicMock(),
)
error = RuntimeError("boom")
with (
patch("twitch.models.RewardCampaign.objects.get", return_value=reward),
patch("twitch.tasks._download_and_save", side_effect=error),
patch.object(
download_reward_campaign_image,
"retry",
side_effect=RuntimeError("retried"),
) as retry_mock,
pytest.raises(RuntimeError, match="retried"),
):
download_reward_campaign_image.run(1)
retry_mock.assert_called_once_with(exc=error)