From 66ea46cf2331470d9e4ac20c134161b55e64e4bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Hells=C3=A9n?= Date: Wed, 8 Apr 2026 03:23:18 +0200 Subject: [PATCH] Use celery tasks instead of systemd timers for periodic work; and add more tests --- .env.example | 28 +- README.md | 48 +- chzzk/tasks.py | 26 + chzzk/tests/test_management_commands.py | 481 +++++++++++++++ chzzk/tests/test_tasks.py | 58 ++ config/settings.py | 60 ++ core/tasks.py | 17 + kick/tasks.py | 17 + pyproject.toml | 2 +- tools/systemd/ttvdrops-backup.service | 10 - tools/systemd/ttvdrops-backup.timer | 9 - tools/systemd/ttvdrops-celery-beat.service | 27 + tools/systemd/ttvdrops-celery-worker.service | 31 + tools/systemd/ttvdrops-import-drops.service | 34 -- .../ttvdrops-import-kick-drops.service | 26 - .../systemd/ttvdrops-import-kick-drops.timer | 9 - twitch/apps.py | 18 + .../commands/cleanup_orphaned_channels.py | 3 +- .../convert_images_to_modern_formats.py | 4 +- twitch/management/commands/watch_imports.py | 11 +- twitch/signals.py | 65 ++ twitch/tasks.py | 257 ++++++++ twitch/tests/test_better_import_drops.py | 271 +++++++++ twitch/tests/test_import_chat_badges.py | 150 +++++ twitch/tests/test_tasks.py | 575 ++++++++++++++++++ 25 files changed, 2133 insertions(+), 104 deletions(-) create mode 100644 chzzk/tasks.py create mode 100644 chzzk/tests/test_management_commands.py create mode 100644 chzzk/tests/test_tasks.py create mode 100644 core/tasks.py create mode 100644 kick/tasks.py delete mode 100644 tools/systemd/ttvdrops-backup.service delete mode 100644 tools/systemd/ttvdrops-backup.timer create mode 100644 tools/systemd/ttvdrops-celery-beat.service create mode 100644 tools/systemd/ttvdrops-celery-worker.service delete mode 100644 tools/systemd/ttvdrops-import-drops.service delete mode 100644 tools/systemd/ttvdrops-import-kick-drops.service delete mode 100644 tools/systemd/ttvdrops-import-kick-drops.timer create mode 100644 twitch/signals.py create mode 100644 twitch/tasks.py create mode 100644 twitch/tests/test_import_chat_badges.py create mode 100644 twitch/tests/test_tasks.py diff --git a/.env.example b/.env.example index 7ccfd56..380538a 100644 --- a/.env.example +++ b/.env.example @@ -1,7 +1,13 @@ -# Django Configuration -# Set to False in production +# Environment variables for ttvdrops Django application +# Copy this file to `.env` and fill in the appropriate values before running the application. +# For local development, you can use the default values provided here, but make sure to change the secret key and Twitch API credentials for production use. + +# Debug mode (set to False in production) DEBUG=True +# Base URL used for absolute URL generation +BASE_URL=https://ttvdrops.lovinator.space + # Django Secret Key # Generate a new secret key for production: python -c 'from django.core.management.utils import get_random_secret_key; print(get_random_secret_key())' DJANGO_SECRET_KEY=your-secret-key-here @@ -11,6 +17,8 @@ DJANGO_SECRET_KEY=your-secret-key-here # You can use either an app access token or user access token TWITCH_CLIENT_ID=your-twitch-client-id TWITCH_CLIENT_SECRET=your-twitch-client-secret +# Optional: if omitted, some commands can fetch an app access token using the client credentials +TWITCH_ACCESS_TOKEN= # Email Configuration # SMTP Host (examples below) @@ -42,10 +50,24 @@ POSTGRES_HOST=/run/postgresql # Note: Changed from 5432 to 6432 to use PgBouncer POSTGRES_PORT=5432 + +# Optional database connection tuning +# CONN_MAX_AGE=60 +# CONN_HEALTH_CHECKS=True +# DB_CONNECT_TIMEOUT=10 + +# Use SQLite instead of PostgreSQL (useful for local development) +USE_SQLITE=False + # Where to store Twitch API responses TTVDROPS_IMPORTED_DIR=/mnt/fourteen/Data/Responses/imported + +# Where to store Twitch API responses that failed processing (e.g. due to missing fields or unrecognized formats) TTVDROPS_BROKEN_DIR=/mnt/fourteen/Data/Responses/broken -# Redis Configuration +# Where to store Twitch API responses that are pending processing (e.g. waiting for a periodic task to process them) +TTVDROPS_PENDING_DIR=/mnt/fourteen/Data/Responses/pending + +# Redis Configuration for caching and Celery REDIS_URL_CACHE=unix:///var/run/redis/redis.sock?db=0 REDIS_URL_CELERY=unix:///var/run/redis/redis.sock?db=1 diff --git a/README.md b/README.md index b7273b3..d6cc782 100644 --- a/README.md +++ b/README.md @@ -61,13 +61,20 @@ sudo systemctl enable --now ttvdrops.service curl --unix-socket /run/ttvdrops/ttvdrops.sock https://ttvdrops.lovinator.space ``` -Install and enable timers: +Enable Celery worker and Beat services: ```bash -sudo install -m 0644 tools/systemd/ttvdrops-backup.{service,timer} /etc/systemd/system/ -sudo install -m 0644 tools/systemd/ttvdrops-import-drops.{service,timer} /etc/systemd/system/ +sudo install -m 0644 tools/systemd/ttvdrops-celery-worker.service /etc/systemd/system/ +sudo install -m 0644 tools/systemd/ttvdrops-celery-beat.service /etc/systemd/system/ sudo systemctl daemon-reload -sudo systemctl enable --now ttvdrops-backup.timer ttvdrops-import-drops.timer +sudo systemctl enable --now ttvdrops-celery-worker.service +sudo systemctl enable --now ttvdrops-celery-beat.service +``` + +Set `TTVDROPS_PENDING_DIR` in `.env` to the directory where Twitch JSON drop files are dropped: + +```env +TTVDROPS_PENDING_DIR=/mnt/fourteen/Data/Responses/pending ``` ## Development @@ -83,13 +90,40 @@ uv run pytest ## Celery -Start a worker: +Celery powers all periodic and background work. Three services are required: + +| Service file | Queues | Purpose | +|---|---|---| +| `ttvdrops-celery-worker.service` | `imports`, `api-fetches`, `default` | Twitch/Kick/Chzzk imports, backup | +| `ttvdrops-celery-beat.service` | — | Periodic task scheduler (Beat) | + +Start workers manually during development: ```bash -uv run celery -A config worker --loglevel=info -uv run celery -A config beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler +# All-in-one worker (development) +uv run celery -A config worker --queues imports,api-fetches,image-downloads,default --loglevel=info + +# Beat scheduler (requires database Beat tables — run migrate first) +uv run celery -A config beat --scheduler django_celery_beat.schedulers:DatabaseScheduler --loglevel=info + +# Monitor tasks in the browser (optional) +uv run celery -A config flower ``` +**Periodic tasks configured via `CELERY_BEAT_SCHEDULE`:** + +| Task | Schedule | Queue | +|---|---|---| +| `twitch.tasks.scan_pending_twitch_files` | every 10 s | `imports` | +| `kick.tasks.import_kick_drops` | :01/:16/:31/:46 | `api-fetches` | +| `chzzk.tasks.discover_chzzk_campaigns` | every 2 h | `api-fetches` | +| `twitch.tasks.backup_database` | daily 02:15 UTC | `default` | +| `twitch.tasks.download_all_images` | Sunday 04:00 UTC | `image-downloads` | +| `twitch.tasks.import_chat_badges` | Sunday 03:00 UTC | `api-fetches` | + +Image downloads also run **immediately** on record creation via `post_save` signals +(`Game`, `DropCampaign`, `DropBenefit`, `RewardCampaign`). + ## Import Drops ```bash diff --git a/chzzk/tasks.py b/chzzk/tasks.py new file mode 100644 index 0000000..5707beb --- /dev/null +++ b/chzzk/tasks.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +import logging + +from celery import shared_task +from django.core.management import call_command + +logger = logging.getLogger("ttvdrops.tasks") + + +@shared_task(bind=True, queue="imports", max_retries=3, default_retry_delay=60) +def import_chzzk_campaign_task(self, campaign_no: int) -> None: # noqa: ANN001 + """Import a single Chzzk campaign by its campaign number.""" + try: + call_command("import_chzzk_campaign", str(campaign_no)) + except Exception as exc: + raise self.retry(exc=exc) from exc + + +@shared_task(bind=True, queue="api-fetches", max_retries=3, default_retry_delay=120) +def discover_chzzk_campaigns(self) -> None: # noqa: ANN001 + """Discover and import the latest Chzzk campaigns (equivalent to --latest flag).""" + try: + call_command("import_chzzk_campaign", latest=True) + except Exception as exc: + raise self.retry(exc=exc) from exc diff --git a/chzzk/tests/test_management_commands.py b/chzzk/tests/test_management_commands.py new file mode 100644 index 0000000..ec41d22 --- /dev/null +++ b/chzzk/tests/test_management_commands.py @@ -0,0 +1,481 @@ +from datetime import timedelta +from io import StringIO +from unittest.mock import Mock +from unittest.mock import call +from unittest.mock import patch + +import pytest +import requests +from django.core.management import CommandError +from django.core.management import call_command +from django.test import TestCase +from django.utils import timezone + +from chzzk.management.commands.import_chzzk_campaign import ( + Command as ImportChzzkCampaignCommand, +) +from chzzk.models import ChzzkCampaign +from chzzk.models import ChzzkReward +from chzzk.schemas import ChzzkCampaignV2 +from chzzk.schemas import ChzzkRewardV2 + + +class ImportChzzkCampaignRangeCommandTest(TestCase): + """Tests for the import_chzzk_campaign_range management command.""" + + def test_imports_campaigns_in_range_descending_by_default(self) -> None: + """Test that campaigns are imported in descending order when start > end.""" + stdout = StringIO() + with patch( + "chzzk.management.commands.import_chzzk_campaign_range.call_command", + ) as call_command_mock: + call_command( + "import_chzzk_campaign_range", + "5", + "3", + stdout=stdout, + ) + + # Verify call_command was called for each campaign in descending order + expected_calls = [ + call("import_chzzk_campaign", "5"), + call("import_chzzk_campaign", "4"), + call("import_chzzk_campaign", "3"), + ] + call_command_mock.assert_has_calls(expected_calls) + assert call_command_mock.call_count == 3 + + def test_imports_campaigns_in_range_ascending_by_default(self) -> None: + """Test that campaigns are imported in ascending order when start < end.""" + stdout = StringIO() + with patch( + "chzzk.management.commands.import_chzzk_campaign_range.call_command", + ) as call_command_mock: + call_command( + "import_chzzk_campaign_range", + "3", + "5", + stdout=stdout, + ) + + expected_calls = [ + call("import_chzzk_campaign", "3"), + call("import_chzzk_campaign", "4"), + call("import_chzzk_campaign", "5"), + ] + call_command_mock.assert_has_calls(expected_calls) + assert call_command_mock.call_count == 3 + + def test_imports_single_campaign_when_start_equals_end(self) -> None: + """Test that a single campaign is imported when start equals end.""" + stdout = StringIO() + with patch( + "chzzk.management.commands.import_chzzk_campaign_range.call_command", + ) as call_command_mock: + call_command( + "import_chzzk_campaign_range", + "5", + "5", + stdout=stdout, + ) + + call_command_mock.assert_called_once_with("import_chzzk_campaign", "5") + + def test_respects_custom_step_parameter(self) -> None: + """Test that custom step parameter is respected.""" + stdout = StringIO() + with patch( + "chzzk.management.commands.import_chzzk_campaign_range.call_command", + ) as call_command_mock: + call_command( + "import_chzzk_campaign_range", + "1", + "10", + "--step", + "2", + stdout=stdout, + ) + + expected_calls = [ + call("import_chzzk_campaign", "1"), + call("import_chzzk_campaign", "3"), + call("import_chzzk_campaign", "5"), + call("import_chzzk_campaign", "7"), + call("import_chzzk_campaign", "9"), + ] + call_command_mock.assert_has_calls(expected_calls) + assert call_command_mock.call_count == 5 + + def test_respects_custom_negative_step(self) -> None: + """Test that custom negative step parameter works correctly.""" + stdout = StringIO() + with patch( + "chzzk.management.commands.import_chzzk_campaign_range.call_command", + ) as call_command_mock: + call_command( + "import_chzzk_campaign_range", + "10", + "1", + "--step", + "-2", + stdout=stdout, + ) + + expected_calls = [ + call("import_chzzk_campaign", "10"), + call("import_chzzk_campaign", "8"), + call("import_chzzk_campaign", "6"), + call("import_chzzk_campaign", "4"), + call("import_chzzk_campaign", "2"), + ] + call_command_mock.assert_has_calls(expected_calls) + assert call_command_mock.call_count == 5 + + def test_handles_command_error_gracefully(self) -> None: + """Test that CommandError from import_chzzk_campaign is caught and reported.""" + stdout = StringIO() + stderr = StringIO() + + def side_effect(command: str, *args: str, **kwargs: object) -> None: + if "4" in args: + msg = "Campaign 4 not found" + raise CommandError(msg) + + with patch( + "chzzk.management.commands.import_chzzk_campaign_range.call_command", + side_effect=side_effect, + ): + call_command( + "import_chzzk_campaign_range", + "3", + "5", + stdout=stdout, + stderr=stderr, + ) + + output = stdout.getvalue() + assert "Importing campaign 3" in output + assert "Importing campaign 4" in output + assert "Importing campaign 5" in output + assert "Failed campaign 4" in output + assert "Campaign 4 not found" in output + assert "Batch import complete" in output + + def test_continues_after_command_error(self) -> None: + """Test that import continues after encountering a CommandError.""" + call_count = 0 + + def side_effect(command: str, campaign_no: str) -> None: + nonlocal call_count + call_count += 1 + if campaign_no == "4": + msg = "Campaign 4 error" + raise CommandError(msg) + + with patch( + "chzzk.management.commands.import_chzzk_campaign_range.call_command", + side_effect=side_effect, + ): + call_command( + "import_chzzk_campaign_range", + "3", + "5", + stdout=StringIO(), + ) + + # Verify all campaigns were attempted + assert call_count == 3 + + def test_outputs_success_messages(self) -> None: + """Test that success messages are written to stdout.""" + stdout = StringIO() + with patch( + "chzzk.management.commands.import_chzzk_campaign_range.call_command", + ): + call_command( + "import_chzzk_campaign_range", + "1", + "2", + stdout=stdout, + ) + + output: str = stdout.getvalue() + assert "Importing campaigns from 1 to 2 with step 1" in output + assert "Batch import complete" in output + + def test_raises_error_when_step_is_zero(self) -> None: + """Test that ValueError is raised when step is 0.""" + with pytest.raises(ValueError, match="Step cannot be 0"): + call_command( + "import_chzzk_campaign_range", + "1", + "5", + "--step", + "0", + stdout=StringIO(), + ) + + def test_handles_large_range(self) -> None: + """Test that large ranges are handled correctly.""" + stdout = StringIO() + with patch( + "chzzk.management.commands.import_chzzk_campaign_range.call_command", + ) as call_command_mock: + call_command( + "import_chzzk_campaign_range", + "1", + "100", + "--step", + "10", + stdout=stdout, + ) + + assert call_command_mock.call_count == 10 + first_call = call_command_mock.call_args_list[0] + assert first_call == call("import_chzzk_campaign", "1") + last_call = call_command_mock.call_args_list[-1] + assert last_call == call("import_chzzk_campaign", "91") + + +class ImportChzzkCampaignCommandTest(TestCase): + """Tests for the import_chzzk_campaign management command.""" + + def _create_campaign(self, campaign_no: int) -> ChzzkCampaign: + now = timezone.now() + return ChzzkCampaign.objects.create( + campaign_no=campaign_no, + title=f"Campaign {campaign_no}", + description="Campaign description", + category_type="game", + category_id="1", + category_value="Game", + service_id="chzzk", + state="ACTIVE", + start_date=now - timedelta(days=1), + end_date=now + timedelta(days=1), + has_ios_based_reward=False, + drops_campaign_not_started=False, + source_api="unit-test", + ) + + def test_requires_campaign_no_when_latest_not_used(self) -> None: + """Command should fail without campaign_no unless --latest is provided.""" + with pytest.raises( + CommandError, + match="campaign_no is required unless --latest is used", + ): + call_command("import_chzzk_campaign", stdout=StringIO()) + + def test_imports_single_campaign_no(self) -> None: + """Command should import the provided campaign number.""" + with patch( + "chzzk.management.commands.import_chzzk_campaign.Command._import_campaign", + autospec=True, + ) as import_campaign_mock: + call_command("import_chzzk_campaign", "42", stdout=StringIO()) + + assert import_campaign_mock.call_count == 1 + assert import_campaign_mock.call_args.args[1] == 42 + + def test_latest_imports_candidates(self) -> None: + """Command should import all candidates returned by get_campaign_import_candidates.""" + with ( + patch( + "chzzk.management.commands.import_chzzk_campaign.Command.get_campaign_import_candidates", + autospec=True, + return_value=[9, 10, 11], + ), + patch( + "chzzk.management.commands.import_chzzk_campaign.Command._import_campaign", + autospec=True, + ) as import_campaign_mock, + ): + call_command("import_chzzk_campaign", "--latest", stdout=StringIO()) + + called_campaigns = [ + call_.args[1] for call_ in import_campaign_mock.call_args_list + ] + assert called_campaigns == [9, 10, 11] + + def test_latest_with_no_candidates_exits_cleanly(self) -> None: + """Command should not import anything when --latest has no candidates.""" + stdout = StringIO() + + with ( + patch( + "chzzk.management.commands.import_chzzk_campaign.Command.get_campaign_import_candidates", + autospec=True, + return_value=[], + ), + patch( + "chzzk.management.commands.import_chzzk_campaign.Command._import_campaign", + autospec=True, + ) as import_campaign_mock, + ): + call_command("import_chzzk_campaign", "--latest", stdout=stdout) + + assert import_campaign_mock.call_count == 0 + assert "Nothing to import with --latest at this time." in stdout.getvalue() + + def test_get_campaign_import_candidates_uses_initial_range_on_empty_db( + self, + ) -> None: + """When there are no campaigns, candidates should be 1..5.""" + command: ImportChzzkCampaignCommand = ImportChzzkCampaignCommand() + candidates: list[int] = command.get_campaign_import_candidates() + assert candidates == [1, 2, 3, 4, 5] + + def test_get_campaign_import_candidates_adds_backfill_and_new_candidates( + self, + ) -> None: + """Candidates should include missing IDs from latest-5..latest-1 plus latest+1..latest+5.""" + self._create_campaign(10) + self._create_campaign(8) + self._create_campaign(6) + + command: ImportChzzkCampaignCommand = ImportChzzkCampaignCommand() + candidates: list[int] = command.get_campaign_import_candidates() + + assert candidates == [5, 7, 9, 11, 12, 13, 14, 15] + + def test_get_campaign_import_candidates_ignores_outlier_max_campaign_no( + self, + ) -> None: + """If max campaign_no is an outlier, the second max should be used.""" + self._create_campaign(250) + self._create_campaign(100_002_000) + + command: ImportChzzkCampaignCommand = ImportChzzkCampaignCommand() + candidates: list[int] = command.get_campaign_import_candidates() + + assert candidates == [245, 246, 247, 248, 249, 251, 252, 253, 254, 255] + + def test_import_campaign_handles_http_error_with_json_message(self) -> None: + """Import should fail gracefully on HTTP errors and include JSON message when present.""" + command = ImportChzzkCampaignCommand() + + response = Mock() + response.raise_for_status.side_effect = requests.HTTPError("404 Client Error") + response.headers = {"Content-Type": "application/json; charset=utf-8"} + response.json.return_value = {"message": "Campaign not found"} + + with ( + patch( + "chzzk.management.commands.import_chzzk_campaign.requests.get", + return_value=response, + ), + patch.object(command.stdout, "write") as stdout_write, + patch.object(command, "import_campaign_data") as import_campaign_data_mock, + ): + command._import_campaign(12345) + + assert import_campaign_data_mock.call_count == 0 + assert stdout_write.call_count == 1 + msg = stdout_write.call_args.args[0] + assert "Failed to fetch campaign 12345" in msg + assert "Campaign not found" in msg + + def test_update_or_create_reward_updates_existing_reward(self) -> None: + """Existing rewards should be updated when incoming reward data differs.""" + campaign = self._create_campaign(500) + existing_reward = ChzzkReward.objects.create( + campaign=campaign, + reward_no=1, + title="Old title", + image_url="https://example.com/old.png", + reward_type="OLD", + campaign_reward_type="", + condition_type="TIME", + condition_for_minutes=10, + ios_based_reward=False, + code_remaining_count=5, + ) + + reward_data = ChzzkRewardV2.model_validate( + { + "title": "New title", + "rewardNo": 1, + "imageUrl": "https://example.com/new.png", + "rewardType": "NEW", + "conditionType": "WATCH", + "conditionForMinutes": 20, + "iosBasedReward": True, + "codeRemainingCount": 99, + }, + ) + + command = ImportChzzkCampaignCommand() + command.update_or_create_reward(500, campaign, reward_data) + + existing_reward.refresh_from_db() + assert existing_reward.title == "New title" + assert existing_reward.image_url == "https://example.com/new.png" + assert existing_reward.reward_type == "NEW" + assert not existing_reward.campaign_reward_type + assert existing_reward.condition_type == "WATCH" + assert existing_reward.condition_for_minutes == 20 + assert existing_reward.ios_based_reward is True + assert existing_reward.code_remaining_count == 99 + + def test_import_campaign_data_updates_existing_campaign(self) -> None: + """Existing campaigns should be updated when imported fields have changed.""" + campaign = self._create_campaign(600) + original_scraped_at = campaign.scraped_at + + command = ImportChzzkCampaignCommand() + campaign_data = ChzzkCampaignV2.model_validate( + { + "campaignNo": 600, + "title": "Updated title", + "imageUrl": "https://example.com/new-campaign.png", + "description": "Updated description", + "categoryType": "game", + "categoryId": "2", + "categoryValue": "Game 2", + "pcLinkUrl": "https://example.com/pc", + "mobileLinkUrl": "https://example.com/mobile", + "serviceId": "chzzk", + "state": "ACTIVE", + "startDate": campaign.start_date.isoformat(), + "endDate": campaign.end_date.isoformat(), + "rewardList": [], + "hasIosBasedReward": True, + "dropsCampaignNotStarted": False, + "rewardType": "DROP", + "accountLinkUrl": "https://example.com/account", + }, + ) + + updated_campaign = command.import_campaign_data( + campaign_no=600, + api_version="v2", + data={"key": "value"}, + cd=campaign_data, + ) + + updated_campaign.refresh_from_db() + assert updated_campaign.title == "Updated title" + assert updated_campaign.description == "Updated description" + assert updated_campaign.category_id == "2" + assert updated_campaign.has_ios_based_reward is True + assert not updated_campaign.campaign_reward_type + assert updated_campaign.reward_type == "DROP" + assert updated_campaign.account_link_url == "https://example.com/account" + assert updated_campaign.raw_json_v2 == {"key": "value"} + assert updated_campaign.scrape_status == "success" + assert updated_campaign.scraped_at >= original_scraped_at + + def test_apply_updates_if_changed_returns_false_on_noop(self) -> None: + """Helper should return False when values are unchanged.""" + campaign = self._create_campaign(700) + command = ImportChzzkCampaignCommand() + + changed = command._apply_updates_if_changed( + campaign, + { + "title": campaign.title, + "description": campaign.description, + }, + ) + + assert changed is False diff --git a/chzzk/tests/test_tasks.py b/chzzk/tests/test_tasks.py new file mode 100644 index 0000000..1111aa5 --- /dev/null +++ b/chzzk/tests/test_tasks.py @@ -0,0 +1,58 @@ +from unittest.mock import patch + +import pytest + +from chzzk.tasks import discover_chzzk_campaigns +from chzzk.tasks import import_chzzk_campaign_task + + +def test_import_chzzk_campaign_task_calls_command_with_campaign_no() -> None: + """It should invoke import_chzzk_campaign for the given campaign number.""" + with patch("chzzk.tasks.call_command") as call_command_mock: + import_chzzk_campaign_task.run(905) + + call_command_mock.assert_called_once_with("import_chzzk_campaign", "905") + + +def test_import_chzzk_campaign_task_retries_on_command_error() -> None: + """It should retry when import_chzzk_campaign raises an exception.""" + error = RuntimeError("boom") + + with ( + patch("chzzk.tasks.call_command", side_effect=error), + patch.object( + import_chzzk_campaign_task, + "retry", + side_effect=RuntimeError("retried"), + ) as retry_mock, + pytest.raises(RuntimeError, match="retried"), + ): + import_chzzk_campaign_task.run(905) + + retry_mock.assert_called_once_with(exc=error) + + +def test_discover_chzzk_campaigns_calls_command_with_latest_flag() -> None: + """It should invoke import_chzzk_campaign with latest=True.""" + with patch("chzzk.tasks.call_command") as call_command_mock: + discover_chzzk_campaigns.run() + + call_command_mock.assert_called_once_with("import_chzzk_campaign", latest=True) + + +def test_discover_chzzk_campaigns_retries_on_command_error() -> None: + """It should retry when import_chzzk_campaign raises an exception.""" + error = RuntimeError("boom") + + with ( + patch("chzzk.tasks.call_command", side_effect=error), + patch.object( + discover_chzzk_campaigns, + "retry", + side_effect=RuntimeError("retried"), + ) as retry_mock, + pytest.raises(RuntimeError, match="retried"), + ): + discover_chzzk_campaigns.run() + + retry_mock.assert_called_once_with(exc=error) diff --git a/config/settings.py b/config/settings.py index a084435..7fb2c0f 100644 --- a/config/settings.py +++ b/config/settings.py @@ -5,6 +5,7 @@ from pathlib import Path from typing import Any import sentry_sdk +from celery.schedules import crontab from dotenv import load_dotenv from platformdirs import user_data_dir @@ -109,6 +110,8 @@ STATICFILES_DIRS: list[Path] = [BASE_DIR / "static"] TIME_ZONE = "UTC" WSGI_APPLICATION = "config.wsgi.application" +TTVDROPS_PENDING_DIR: str = os.getenv("TTVDROPS_PENDING_DIR", "") + INTERNAL_IPS: list[str] = [] if DEBUG: INTERNAL_IPS = ["127.0.0.1", "localhost"] # pyright: ignore[reportConstantRedefinition] @@ -259,6 +262,63 @@ CELERY_BROKER_URL: str = REDIS_URL_CELERY CELERY_RESULT_BACKEND = "django-db" CELERY_RESULT_EXTENDED = True CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler" +CELERY_TASK_SOFT_TIME_LIMIT: int = 3600 # warn at 1 h +CELERY_TASK_TIME_LIMIT: int = 3900 # hard-kill at 1 h 5 min + +CELERY_TASK_ROUTES: dict[str, dict[str, str]] = { + "twitch.tasks.scan_pending_twitch_files": {"queue": "imports"}, + "twitch.tasks.import_twitch_file": {"queue": "imports"}, + "twitch.tasks.download_game_image": {"queue": "image-downloads"}, + "twitch.tasks.download_campaign_image": {"queue": "image-downloads"}, + "twitch.tasks.download_benefit_image": {"queue": "image-downloads"}, + "twitch.tasks.download_reward_campaign_image": {"queue": "image-downloads"}, + "twitch.tasks.download_all_images": {"queue": "image-downloads"}, + "twitch.tasks.import_chat_badges": {"queue": "api-fetches"}, + "twitch.tasks.backup_database": {"queue": "default"}, + "kick.tasks.import_kick_drops": {"queue": "api-fetches"}, + "chzzk.tasks.discover_chzzk_campaigns": {"queue": "api-fetches"}, + "chzzk.tasks.import_chzzk_campaign_task": {"queue": "imports"}, + "core.tasks.submit_indexnow_task": {"queue": "default"}, +} + +CELERY_BEAT_SCHEDULE: dict[str, Any] = { + # Scan for new Twitch JSON drops every 10 seconds. + "scan-pending-twitch-files": { + "task": "twitch.tasks.scan_pending_twitch_files", + "schedule": 10.0, + "options": {"queue": "imports"}, + }, + # Import Kick drops from the API (:01, :16, :31, :46 each hour). + "import-kick-drops": { + "task": "kick.tasks.import_kick_drops", + "schedule": crontab(minute="1,16,31,46"), + "options": {"queue": "api-fetches"}, + }, + # Backup database nightly at 02:15. + "backup-database": { + "task": "twitch.tasks.backup_database", + "schedule": crontab(hour=2, minute=15), + "options": {"queue": "default"}, + }, + # Discover new Chzzk campaigns every 2 hours at minute 0. + "discover-chzzk-campaigns": { + "task": "chzzk.tasks.discover_chzzk_campaigns", + "schedule": crontab(minute=0, hour="*/2"), + "options": {"queue": "api-fetches"}, + }, + # Weekly full image refresh (Sunday 04:00 UTC). + "download-all-images-weekly": { + "task": "twitch.tasks.download_all_images", + "schedule": crontab(hour=4, minute=0, day_of_week=0), + "options": {"queue": "image-downloads"}, + }, + # Weekly chat badge refresh (Sunday 03:00 UTC). + "import-chat-badges-weekly": { + "task": "twitch.tasks.import_chat_badges", + "schedule": crontab(hour=3, minute=0, day_of_week=0), + "options": {"queue": "api-fetches"}, + }, +} # Define BASE_URL for dynamic URL generation BASE_URL: str = "https://ttvdrops.lovinator.space" diff --git a/core/tasks.py b/core/tasks.py new file mode 100644 index 0000000..5b20599 --- /dev/null +++ b/core/tasks.py @@ -0,0 +1,17 @@ +from __future__ import annotations + +import logging + +from celery import shared_task +from django.core.management import call_command + +logger = logging.getLogger("ttvdrops.tasks") + + +@shared_task(bind=True, queue="default", max_retries=3, default_retry_delay=300) +def submit_indexnow_task(self) -> None: # noqa: ANN001 + """Submit all site URLs to the IndexNow search index.""" + try: + call_command("submit_indexnow") + except Exception as exc: + raise self.retry(exc=exc) from exc diff --git a/kick/tasks.py b/kick/tasks.py new file mode 100644 index 0000000..9c50a9e --- /dev/null +++ b/kick/tasks.py @@ -0,0 +1,17 @@ +from __future__ import annotations + +import logging + +from celery import shared_task +from django.core.management import call_command + +logger = logging.getLogger("ttvdrops.tasks") + + +@shared_task(bind=True, queue="api-fetches", max_retries=3, default_retry_delay=120) +def import_kick_drops(self) -> None: # noqa: ANN001 + """Fetch and upsert Kick drop campaigns from the public API.""" + try: + call_command("import_kick_drops") + except Exception as exc: + raise self.retry(exc=exc) from exc diff --git a/pyproject.toml b/pyproject.toml index d681dc7..836228a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,7 +50,7 @@ dev = [ [tool.pytest.ini_options] DJANGO_SETTINGS_MODULE = "config.settings" python_files = ["test_*.py", "*_test.py"] -addopts = "" +addopts = "--tb=short -n auto --cov" filterwarnings = [ "ignore:Parsing dates involving a day of month without a year specified is ambiguous:DeprecationWarning", ] diff --git a/tools/systemd/ttvdrops-backup.service b/tools/systemd/ttvdrops-backup.service deleted file mode 100644 index c4f73c8..0000000 --- a/tools/systemd/ttvdrops-backup.service +++ /dev/null @@ -1,10 +0,0 @@ -[Unit] -Description=TTVDrops database backup - -[Service] -Type=oneshot -User=ttvdrops -Group=ttvdrops -WorkingDirectory=/home/ttvdrops/ttvdrops -EnvironmentFile=/home/ttvdrops/ttvdrops/.env -ExecStart=/usr/bin/uv run python manage.py backup_db diff --git a/tools/systemd/ttvdrops-backup.timer b/tools/systemd/ttvdrops-backup.timer deleted file mode 100644 index 1546d5e..0000000 --- a/tools/systemd/ttvdrops-backup.timer +++ /dev/null @@ -1,9 +0,0 @@ -[Unit] -Description=Nightly TTVDrops database backup - -[Timer] -OnCalendar=*-*-* 02:15:00 -Persistent=true - -[Install] -WantedBy=timers.target diff --git a/tools/systemd/ttvdrops-celery-beat.service b/tools/systemd/ttvdrops-celery-beat.service new file mode 100644 index 0000000..c09db14 --- /dev/null +++ b/tools/systemd/ttvdrops-celery-beat.service @@ -0,0 +1,27 @@ +[Unit] +Description=TTVDrops Celery Beat scheduler +After=network-online.target valkey.service ttvdrops-celery-worker.service +Wants=network-online.target valkey.service + +[Service] +Type=simple +User=ttvdrops +Group=ttvdrops +SupplementaryGroups=http +UMask=0002 +WorkingDirectory=/home/ttvdrops/ttvdrops +EnvironmentFile=/home/ttvdrops/ttvdrops/.env +ExecStart=/usr/bin/uv run celery -A config beat \ + --scheduler django_celery_beat.schedulers:DatabaseScheduler \ + --loglevel INFO +ExecStop=/bin/kill -s TERM $MAINPID + +StandardOutput=journal +StandardError=journal +SyslogIdentifier=ttvdrops-celery-beat + +Restart=on-failure +RestartSec=10s + +[Install] +WantedBy=multi-user.target diff --git a/tools/systemd/ttvdrops-celery-worker.service b/tools/systemd/ttvdrops-celery-worker.service new file mode 100644 index 0000000..2095273 --- /dev/null +++ b/tools/systemd/ttvdrops-celery-worker.service @@ -0,0 +1,31 @@ +[Unit] +Description=TTVDrops Celery worker +After=network-online.target valkey.service +Wants=network-online.target valkey.service + +[Service] +Type=simple +User=ttvdrops +Group=ttvdrops +SupplementaryGroups=http +UMask=0002 +WorkingDirectory=/home/ttvdrops/ttvdrops +EnvironmentFile=/home/ttvdrops/ttvdrops/.env +ExecStart=/usr/bin/uv run celery -A config worker \ + --queues imports,api-fetches,default \ + --concurrency 4 \ + --loglevel INFO +ExecStop=/bin/kill -s TERM $MAINPID +ExecReload=/bin/kill -s HUP $MAINPID + +MemoryLimit=512M +CPUQuota=75% +StandardOutput=journal +StandardError=journal +SyslogIdentifier=ttvdrops-celery-worker + +Restart=on-failure +RestartSec=10s + +[Install] +WantedBy=multi-user.target diff --git a/tools/systemd/ttvdrops-import-drops.service b/tools/systemd/ttvdrops-import-drops.service deleted file mode 100644 index d4e204b..0000000 --- a/tools/systemd/ttvdrops-import-drops.service +++ /dev/null @@ -1,34 +0,0 @@ -[Unit] -Description=TTVDrops watch and import drops from pending directory -After=network-online.target -Wants=network-online.target - -[Service] -Type=simple -User=ttvdrops -Group=ttvdrops -SupplementaryGroups=http -UMask=0002 -WorkingDirectory=/home/ttvdrops/ttvdrops -EnvironmentFile=/home/ttvdrops/ttvdrops/.env -ExecStart=/usr/bin/uv run python manage.py watch_imports /mnt/fourteen/Data/Responses/pending --verbose - -# Restart policy -Restart=on-failure -RestartSec=5s - -# Process management -KillMode=mixed -KillSignal=SIGTERM - -# Resource limits -MemoryLimit=512M -CPUQuota=50% - -# Logging -StandardOutput=journal -StandardError=journal -SyslogIdentifier=ttvdrops-watch - -[Install] -WantedBy=multi-user.target diff --git a/tools/systemd/ttvdrops-import-kick-drops.service b/tools/systemd/ttvdrops-import-kick-drops.service deleted file mode 100644 index 2932aab..0000000 --- a/tools/systemd/ttvdrops-import-kick-drops.service +++ /dev/null @@ -1,26 +0,0 @@ -[Unit] -Description=TTVDrops import Kick drops -After=network-online.target -Wants=network-online.target - -[Service] -Type=oneshot -User=ttvdrops -Group=ttvdrops -SupplementaryGroups=http -UMask=0002 -WorkingDirectory=/home/ttvdrops/ttvdrops -EnvironmentFile=/home/ttvdrops/ttvdrops/.env -ExecStart=/usr/bin/uv run python manage.py import_kick_drops - -# Logging -StandardOutput=journal -StandardError=journal -SyslogIdentifier=ttvdrops-import-kick - -# Resource limits -MemoryLimit=512M -CPUQuota=50% - -[Install] -WantedBy=multi-user.target diff --git a/tools/systemd/ttvdrops-import-kick-drops.timer b/tools/systemd/ttvdrops-import-kick-drops.timer deleted file mode 100644 index af98bce..0000000 --- a/tools/systemd/ttvdrops-import-kick-drops.timer +++ /dev/null @@ -1,9 +0,0 @@ -[Unit] -Description=TTVDrops import Kick drops at :01, :16, :31, and :46 - -[Timer] -OnCalendar=*-*-* *:01,16,31,46:00 -Persistent=true - -[Install] -WantedBy=timers.target diff --git a/twitch/apps.py b/twitch/apps.py index 8f557e4..07e8fe2 100644 --- a/twitch/apps.py +++ b/twitch/apps.py @@ -36,3 +36,21 @@ class TwitchConfig(AppConfig): FieldFile.open = _safe_open except (AttributeError, TypeError) as exc: logger.debug("Failed to patch FieldFile.open: %s", exc) + + # Register post_save signal handlers that dispatch image download tasks + # when new Twitch records are created. + from django.db.models.signals import post_save # noqa: PLC0415 + + from twitch.models import DropBenefit # noqa: PLC0415 + from twitch.models import DropCampaign # noqa: PLC0415 + from twitch.models import Game # noqa: PLC0415 + from twitch.models import RewardCampaign # noqa: PLC0415 + from twitch.signals import on_drop_benefit_saved # noqa: PLC0415 + from twitch.signals import on_drop_campaign_saved # noqa: PLC0415 + from twitch.signals import on_game_saved # noqa: PLC0415 + from twitch.signals import on_reward_campaign_saved # noqa: PLC0415 + + post_save.connect(on_game_saved, sender=Game) + post_save.connect(on_drop_campaign_saved, sender=DropCampaign) + post_save.connect(on_drop_benefit_saved, sender=DropBenefit) + post_save.connect(on_reward_campaign_saved, sender=RewardCampaign) diff --git a/twitch/management/commands/cleanup_orphaned_channels.py b/twitch/management/commands/cleanup_orphaned_channels.py index a99aa90..5d13863 100644 --- a/twitch/management/commands/cleanup_orphaned_channels.py +++ b/twitch/management/commands/cleanup_orphaned_channels.py @@ -8,7 +8,8 @@ from twitch.models import Channel if TYPE_CHECKING: from argparse import ArgumentParser - from debug_toolbar.panels.templates.panel import QuerySet + from django.db.models import QuerySet + SAMPLE_PREVIEW_COUNT = 10 diff --git a/twitch/management/commands/convert_images_to_modern_formats.py b/twitch/management/commands/convert_images_to_modern_formats.py index 3b516f2..d5bced5 100644 --- a/twitch/management/commands/convert_images_to_modern_formats.py +++ b/twitch/management/commands/convert_images_to_modern_formats.py @@ -193,8 +193,8 @@ class Command(BaseCommand): img.mode == "P" and "transparency" in img.info ): # Create white background for transparency - background = Image.new("RGB", img.size, (255, 255, 255)) - rgba_img = img.convert("RGBA") if img.mode == "P" else img + background: Image.Image = Image.new("RGB", img.size, (255, 255, 255)) + rgba_img: Image.Image = img.convert("RGBA") if img.mode == "P" else img background.paste( rgba_img, mask=rgba_img.split()[-1] if rgba_img.mode in {"RGBA", "LA"} else None, diff --git a/twitch/management/commands/watch_imports.py b/twitch/management/commands/watch_imports.py index a95bb20..d1f6ca3 100644 --- a/twitch/management/commands/watch_imports.py +++ b/twitch/management/commands/watch_imports.py @@ -17,9 +17,16 @@ logger: logging.Logger = logging.getLogger("ttvdrops.watch_imports") class Command(BaseCommand): - """Watch for JSON files in a directory and import them automatically.""" + """Watch for JSON files in a directory and import them automatically. - help = "Watch a directory for JSON files and import them automatically" + .. deprecated:: + This command is superseded by the Celery Beat task + ``twitch.tasks.scan_pending_twitch_files`` (runs every 10 s via + ``ttvdrops-celery-beat.service``). Keep this command for ad-hoc use + or in environments that run without a Celery worker. + """ + + help = "Watch a directory for JSON files and import them automatically (superseded by Celery Beat)" requires_migrations_checks = True def add_arguments(self, parser: CommandParser) -> None: diff --git a/twitch/signals.py b/twitch/signals.py new file mode 100644 index 0000000..f8f66af --- /dev/null +++ b/twitch/signals.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +import logging +from typing import Any + +logger = logging.getLogger("ttvdrops.signals") + + +def _dispatch(task_fn: Any, pk: int) -> None: # noqa: ANN401 + """Dispatch a Celery task, logging rather than raising when the broker is unavailable.""" + try: + task_fn.delay(pk) + except Exception: # noqa: BLE001 + logger.debug( + "Could not dispatch %s(%d) — broker may be unavailable.", + task_fn.name, + pk, + ) + + +def on_game_saved(sender: Any, instance: Any, created: bool, **kwargs: Any) -> None: # noqa: ANN401, FBT001 + """Dispatch a box-art download task when a new Game is created.""" + if created: + from twitch.tasks import download_game_image # noqa: PLC0415 + + _dispatch(download_game_image, instance.pk) + + +def on_drop_campaign_saved( + sender: Any, # noqa: ANN401 + instance: Any, # noqa: ANN401 + created: bool, # noqa: FBT001 + **kwargs: Any, # noqa: ANN401 +) -> None: + """Dispatch an image download task when a new DropCampaign is created.""" + if created: + from twitch.tasks import download_campaign_image # noqa: PLC0415 + + _dispatch(download_campaign_image, instance.pk) + + +def on_drop_benefit_saved( + sender: Any, # noqa: ANN401 + instance: Any, # noqa: ANN401 + created: bool, # noqa: FBT001 + **kwargs: Any, # noqa: ANN401 +) -> None: + """Dispatch an image download task when a new DropBenefit is created.""" + if created: + from twitch.tasks import download_benefit_image # noqa: PLC0415 + + _dispatch(download_benefit_image, instance.pk) + + +def on_reward_campaign_saved( + sender: Any, # noqa: ANN401 + instance: Any, # noqa: ANN401 + created: bool, # noqa: FBT001 + **kwargs: Any, # noqa: ANN401 +) -> None: + """Dispatch an image download task when a new RewardCampaign is created.""" + if created: + from twitch.tasks import download_reward_campaign_image # noqa: PLC0415 + + _dispatch(download_reward_campaign_image, instance.pk) diff --git a/twitch/tasks.py b/twitch/tasks.py new file mode 100644 index 0000000..32e5100 --- /dev/null +++ b/twitch/tasks.py @@ -0,0 +1,257 @@ +from __future__ import annotations + +import logging +import os +from pathlib import Path +from typing import TYPE_CHECKING +from urllib.parse import urlparse + +import httpx +from celery import shared_task +from django.core.files.base import ContentFile +from django.core.management import call_command +from PIL.Image import Image + +if TYPE_CHECKING: + from urllib.parse import ParseResult + + from django.db.models.fields.files import ImageFieldFile + from PIL.Image import Image + from PIL.ImageFile import ImageFile + +logger: logging.Logger = logging.getLogger("ttvdrops.tasks") + + +@shared_task(bind=True, queue="imports", max_retries=3, default_retry_delay=60) +def scan_pending_twitch_files(self) -> None: # noqa: ANN001 + """Scan TTVDROPS_PENDING_DIR for JSON files and dispatch an import task for each.""" + pending_dir: str = os.getenv("TTVDROPS_PENDING_DIR", "") + if not pending_dir: + logger.debug("TTVDROPS_PENDING_DIR not configured; skipping scan.") + return + + path = Path(pending_dir) + if not path.is_dir(): + logger.warning("TTVDROPS_PENDING_DIR %r is not a directory.", pending_dir) + return + + json_files: list[Path] = [ + f for f in path.iterdir() if f.suffix == ".json" and f.is_file() + ] + for json_file in json_files: + import_twitch_file.delay(str(json_file)) + + if json_files: + logger.info("Dispatched %d Twitch file import task(s).", len(json_files)) + + +@shared_task(bind=True, queue="imports", max_retries=3, default_retry_delay=60) +def import_twitch_file(self, file_path: str) -> None: # noqa: ANN001 + """Import a single Twitch JSON drop file via BetterImportDrops logic.""" + from twitch.management.commands.better_import_drops import Command as Importer # noqa: I001, PLC0415 + + path = Path(file_path) + if not path.is_file(): + # Already moved to imported/ or broken/ by a prior run. + logger.debug("File %s no longer exists; skipping.", file_path) + return + + try: + Importer().handle(path=path) + except Exception as exc: + logger.exception("Failed to import %s.", file_path) + raise self.retry(exc=exc) from exc + + +def _download_and_save(url: str, name: str, file_field: ImageFieldFile) -> bool: + """Download url and save the content to file_field (Django ImageField). + + Files that are already cached (non-empty .name) are skipped. + + Returns: + True when the image was saved, False when skipped or on error. + """ + if not url or file_field is None: + return False + + if getattr(file_field, "name", None): + return False # already cached + + parsed: ParseResult = urlparse(url) + suffix: str = Path(parsed.path).suffix or ".jpg" + file_name: str = f"{name}{suffix}" + + try: + with httpx.Client(timeout=20, follow_redirects=True) as client: + response = client.get(url) + response.raise_for_status() + except httpx.HTTPError: + logger.warning("HTTP error downloading image for %r.", name) + return False + + file_field.save(file_name, ContentFile(response.content), save=True) # type: ignore[union-attr] + + image_path: str | None = getattr(file_field, "path", None) + if image_path: + _convert_to_modern_formats(Path(image_path)) + + return True + + +def _convert_to_modern_formats(source: Path) -> None: + """Convert *source* image to WebP and AVIF alongside the original.""" + if not source.exists() or source.suffix.lower() not in {".jpg", ".jpeg", ".png"}: + return + + try: + from PIL import Image # noqa: PLC0415 + except ImportError: + return + + try: + with Image.open(source) as raw: + if raw.mode in {"RGBA", "LA"} or ( + raw.mode == "P" and "transparency" in raw.info + ): + background: Image = Image.new("RGB", raw.size, (255, 255, 255)) + rgba: Image | ImageFile = ( + raw.convert("RGBA") if raw.mode == "P" else raw + ) + mask: Image | None = ( + rgba.split()[-1] if rgba.mode in {"RGBA", "LA"} else None + ) + background.paste(rgba, mask=mask) + img: Image = background + elif raw.mode != "RGB": + img = raw.convert("RGB") + else: + img: Image = raw.copy() + + for fmt, ext in (("WEBP", ".webp"), ("AVIF", ".avif")): + out: Path = source.with_suffix(ext) + try: + img.save(out, fmt, quality=80) + except Exception: # noqa: BLE001 + logger.debug("Could not convert %s to %s.", source, fmt) + except Exception: # noqa: BLE001 + logger.debug("Format conversion failed for %s.", source) + + +# --------------------------------------------------------------------------- +# Per-model image tasks — triggered by post_save signals on new records +# --------------------------------------------------------------------------- + + +@shared_task(bind=True, queue="image-downloads", max_retries=3, default_retry_delay=300) +def download_game_image(self, game_pk: int) -> None: # noqa: ANN001 + """Download and cache the box art image for a single Game.""" + from twitch.models import Game # noqa: PLC0415 + from twitch.utils import is_twitch_box_art_url # noqa: PLC0415 + from twitch.utils import normalize_twitch_box_art_url # noqa: PLC0415 + + try: + game = Game.objects.get(pk=game_pk) + except Game.DoesNotExist: + return + + if not game.box_art or not is_twitch_box_art_url(game.box_art): + return + + url = normalize_twitch_box_art_url(game.box_art) + try: + _download_and_save(url, game.twitch_id, game.box_art_file) + except Exception as exc: + raise self.retry(exc=exc) from exc + + +@shared_task(bind=True, queue="image-downloads", max_retries=3, default_retry_delay=300) +def download_campaign_image(self, campaign_pk: int) -> None: # noqa: ANN001 + """Download and cache the image for a single DropCampaign.""" + from twitch.models import DropCampaign # noqa: PLC0415 + + try: + campaign = DropCampaign.objects.get(pk=campaign_pk) + except DropCampaign.DoesNotExist: + return + + if not campaign.image_url: + return + + try: + _download_and_save(campaign.image_url, campaign.twitch_id, campaign.image_file) + except Exception as exc: + raise self.retry(exc=exc) from exc + + +@shared_task(bind=True, queue="image-downloads", max_retries=3, default_retry_delay=300) +def download_benefit_image(self, benefit_pk: int) -> None: # noqa: ANN001 + """Download and cache the image for a single DropBenefit.""" + from twitch.models import DropBenefit # noqa: PLC0415 + + try: + benefit = DropBenefit.objects.get(pk=benefit_pk) + except DropBenefit.DoesNotExist: + return + + if not benefit.image_asset_url: + return + + try: + _download_and_save( + url=benefit.image_asset_url, + name=benefit.twitch_id, + file_field=benefit.image_file, + ) + except Exception as exc: + raise self.retry(exc=exc) from exc + + +@shared_task(bind=True, queue="image-downloads", max_retries=3, default_retry_delay=300) +def download_reward_campaign_image(self, reward_pk: int) -> None: # noqa: ANN001 + """Download and cache the image for a single RewardCampaign.""" + from twitch.models import RewardCampaign # noqa: PLC0415 + + try: + reward = RewardCampaign.objects.get(pk=reward_pk) + except RewardCampaign.DoesNotExist: + return + + if not reward.image_url: + return + + try: + _download_and_save(reward.image_url, reward.twitch_id, reward.image_file) + except Exception as exc: + raise self.retry(exc=exc) from exc + + +@shared_task(queue="image-downloads") +def download_all_images() -> None: + """Weekly full-refresh: download images for all Twitch models.""" + call_command("download_box_art") + call_command("download_campaign_images", model="all") + + +# --------------------------------------------------------------------------- +# Twitch API tasks +# --------------------------------------------------------------------------- + + +@shared_task(bind=True, queue="api-fetches", max_retries=3, default_retry_delay=120) +def import_chat_badges(self) -> None: # noqa: ANN001 + """Fetch and upsert Twitch global chat badges via the Helix API.""" + try: + call_command("import_chat_badges") + except Exception as exc: + raise self.retry(exc=exc) from exc + + +# --------------------------------------------------------------------------- +# Maintenance +# --------------------------------------------------------------------------- + + +@shared_task(queue="default") +def backup_database() -> None: + """Create a zstd-compressed SQL backup of the dataset tables.""" + call_command("backup_db") diff --git a/twitch/tests/test_better_import_drops.py b/twitch/tests/test_better_import_drops.py index 92effc5..64fda50 100644 --- a/twitch/tests/test_better_import_drops.py +++ b/twitch/tests/test_better_import_drops.py @@ -1,13 +1,22 @@ import json from pathlib import Path +from tempfile import TemporaryDirectory from typing import TYPE_CHECKING from unittest import skipIf +from unittest.mock import patch from django.db import connection from django.test import TestCase from twitch.management.commands.better_import_drops import Command from twitch.management.commands.better_import_drops import detect_error_only_response +from twitch.management.commands.better_import_drops import detect_non_campaign_keyword +from twitch.management.commands.better_import_drops import ( + extract_operation_name_from_parsed, +) +from twitch.management.commands.better_import_drops import move_completed_file +from twitch.management.commands.better_import_drops import move_file_to_broken_subdir +from twitch.management.commands.better_import_drops import repair_partially_broken_json from twitch.models import DropBenefit from twitch.models import DropCampaign from twitch.models import Game @@ -426,6 +435,92 @@ class CampaignStructureDetectionTests(TestCase): structure: str | None = command._detect_campaign_structure(response) assert structure == "current_user_drop_campaigns" + def test_detects_user_drop_campaign_structure(self) -> None: + """Ensure user.dropCampaign structure is correctly detected.""" + command = Command() + + response: dict[str, object] = { + "data": { + "user": { + "id": "123", + "dropCampaign": {"id": "c1", "name": "Test Campaign"}, + "__typename": "User", + }, + }, + } + + structure: str | None = command._detect_campaign_structure(response) + assert structure == "user_drop_campaign" + + def test_detects_channel_viewer_campaigns_structure(self) -> None: + """Ensure channel.viewerDropCampaigns structure is correctly detected.""" + command = Command() + + response: dict[str, object] = { + "data": { + "channel": { + "id": "123", + "viewerDropCampaigns": [{"id": "c1", "name": "Test Campaign"}], + "__typename": "Channel", + }, + }, + } + + structure: str | None = command._detect_campaign_structure(response) + assert structure == "channel_viewer_campaigns" + + +class FileMoveUtilityTests(TestCase): + """Tests for imported/broken file move utility helpers.""" + + def test_move_completed_file_sanitizes_operation_directory_name(self) -> None: + """Ensure operation names are sanitized and campaign structure subdir is respected.""" + with TemporaryDirectory() as tmp_dir: + root_path = Path(tmp_dir) + imported_root = root_path / "imported" + source_file = root_path / "payload.json" + source_file.write_text("{}", encoding="utf-8") + + with patch( + "twitch.management.commands.better_import_drops.get_imported_directory_root", + return_value=imported_root, + ): + target_dir = move_completed_file( + file_path=source_file, + operation_name="My Op/Name\\v1", + campaign_structure="inventory_campaigns", + ) + + expected_dir = imported_root / "My_Op_Name_v1" / "inventory_campaigns" + assert target_dir == expected_dir + assert not source_file.exists() + assert (expected_dir / "payload.json").exists() + + def test_move_file_to_broken_subdir_avoids_duplicate_operation_segment( + self, + ) -> None: + """Ensure matching reason and operation names do not create duplicate directories.""" + with TemporaryDirectory() as tmp_dir: + root_path = Path(tmp_dir) + broken_root = root_path / "broken" + source_file = root_path / "broken_payload.json" + source_file.write_text("{}", encoding="utf-8") + + with patch( + "twitch.management.commands.better_import_drops.get_broken_directory_root", + return_value=broken_root, + ): + broken_dir = move_file_to_broken_subdir( + file_path=source_file, + subdir="validation_failed", + operation_name="validation_failed", + ) + + path_segments = broken_dir.as_posix().split("/") + assert path_segments.count("validation_failed") == 1 + assert not source_file.exists() + assert (broken_dir / "broken_payload.json").exists() + class OperationNameFilteringTests(TestCase): """Tests for filtering campaigns by operation_name field.""" @@ -864,3 +959,179 @@ class ErrorOnlyResponseDetectionTests(TestCase): result = detect_error_only_response(parsed_json) assert result == "error_only: unknown error" + + +class NonCampaignKeywordDetectionTests(TestCase): + """Tests for non-campaign operation keyword detection.""" + + def test_detects_known_non_campaign_operation(self) -> None: + """Ensure known operationName values are detected as non-campaign payloads.""" + raw_text = json.dumps({"extensions": {"operationName": "PlaybackAccessToken"}}) + + result = detect_non_campaign_keyword(raw_text) + assert result == "PlaybackAccessToken" + + def test_returns_none_for_unknown_operation(self) -> None: + """Ensure unrelated operation names are not flagged.""" + raw_text = json.dumps({"extensions": {"operationName": "DropCampaignDetails"}}) + + result = detect_non_campaign_keyword(raw_text) + assert result is None + + +class OperationNameExtractionTests(TestCase): + """Tests for operation name extraction across supported payload shapes.""" + + def test_extracts_operation_name_from_json_repair_tuple(self) -> None: + """Ensure extraction supports tuple payloads returned by json_repair.""" + payload = ( + {"extensions": {"operationName": "ViewerDropsDashboard"}}, + [{"json_repair": "log"}], + ) + + result = extract_operation_name_from_parsed(payload) + assert result == "ViewerDropsDashboard" + + def test_extracts_operation_name_from_list_payload(self) -> None: + """Ensure extraction inspects the first response in list payloads.""" + payload = [ + {"extensions": {"operationName": "Inventory"}}, + {"extensions": {"operationName": "IgnoredSecondItem"}}, + ] + + result = extract_operation_name_from_parsed(payload) + assert result == "Inventory" + + def test_returns_none_for_empty_list_payload(self) -> None: + """Ensure extraction returns None for empty list payloads.""" + result = extract_operation_name_from_parsed([]) + assert result is None + + +class JsonRepairTests(TestCase): + """Tests for partial JSON repair fallback behavior.""" + + def test_repair_filters_non_graphql_items_from_list(self) -> None: + """Ensure repaired list output only keeps GraphQL-like response objects.""" + raw_text = '[{"foo": 1}, {"data": {"currentUser": {"id": "1"}}}]' + + repaired = repair_partially_broken_json(raw_text) + parsed = json.loads(repaired) + + assert parsed == [{"data": {"currentUser": {"id": "1"}}}] + + +class ProcessFileWorkerTests(TestCase): + """Tests for process_file_worker early-return behaviors.""" + + def test_returns_reason_when_drop_campaign_key_missing(self) -> None: + """Ensure files without dropCampaign are marked failed with clear reason.""" + command = Command() + + repo_root: Path = Path(__file__).resolve().parents[2] + temp_path: Path = repo_root / "twitch" / "tests" / "tmp_no_drop_campaign.json" + temp_path.write_text( + json.dumps({"data": {"currentUser": {"id": "123"}}}), + encoding="utf-8", + ) + + try: + result = command.process_file_worker( + file_path=temp_path, + options={"crash_on_error": False, "skip_broken_moves": True}, + ) + finally: + if temp_path.exists(): + temp_path.unlink() + + assert result["success"] is False + assert result["broken_dir"] == "(skipped)" + assert result["reason"] == "no dropCampaign present" + + def test_returns_reason_for_error_only_response(self) -> None: + """Ensure error-only responses are marked failed with extracted reason.""" + command = Command() + + repo_root: Path = Path(__file__).resolve().parents[2] + temp_path: Path = repo_root / "twitch" / "tests" / "tmp_error_only.json" + temp_path.write_text( + json.dumps( + { + "errors": [{"message": "service timeout"}], + "data": None, + }, + ), + encoding="utf-8", + ) + + try: + result = command.process_file_worker( + file_path=temp_path, + options={"crash_on_error": False, "skip_broken_moves": True}, + ) + finally: + if temp_path.exists(): + temp_path.unlink() + + assert result["success"] is False + assert result["broken_dir"] == "(skipped)" + assert result["reason"] == "error_only: service timeout" + + def test_returns_reason_for_known_non_campaign_keyword(self) -> None: + """Ensure known non-campaign operation payloads are rejected with reason.""" + command = Command() + + repo_root: Path = Path(__file__).resolve().parents[2] + temp_path: Path = ( + repo_root / "twitch" / "tests" / "tmp_non_campaign_keyword.json" + ) + temp_path.write_text( + json.dumps( + { + "data": {"currentUser": {"id": "123"}}, + "extensions": {"operationName": "PlaybackAccessToken"}, + }, + ), + encoding="utf-8", + ) + + try: + result = command.process_file_worker( + file_path=temp_path, + options={"crash_on_error": False, "skip_broken_moves": True}, + ) + finally: + if temp_path.exists(): + temp_path.unlink() + + assert result["success"] is False + assert result["broken_dir"] == "(skipped)" + assert result["reason"] == "matched 'PlaybackAccessToken'" + + +class NormalizeResponsesTests(TestCase): + """Tests for response normalization across supported payload formats.""" + + def test_normalizes_batched_responses_wrapper(self) -> None: + """Ensure batched payloads under responses key are unwrapped and filtered.""" + command = Command() + + parsed_json: dict[str, object] = { + "responses": [ + {"data": {"currentUser": {"id": "1"}}}, + "invalid-item", + {"extensions": {"operationName": "Inventory"}}, + ], + } + + normalized = command._normalize_responses(parsed_json) + assert len(normalized) == 2 + assert normalized[0]["data"]["currentUser"]["id"] == "1" + assert normalized[1]["extensions"]["operationName"] == "Inventory" + + def test_returns_empty_list_for_empty_tuple_payload(self) -> None: + """Ensure empty tuple payloads from json_repair produce no responses.""" + command = Command() + + normalized = command._normalize_responses(()) # type: ignore[arg-type] + assert normalized == [] diff --git a/twitch/tests/test_import_chat_badges.py b/twitch/tests/test_import_chat_badges.py new file mode 100644 index 0000000..bea6e90 --- /dev/null +++ b/twitch/tests/test_import_chat_badges.py @@ -0,0 +1,150 @@ +from io import StringIO +from unittest.mock import patch + +import httpx +import pytest +from django.core.management import CommandError +from django.core.management import call_command + +from twitch.models import ChatBadge +from twitch.models import ChatBadgeSet +from twitch.schemas import GlobalChatBadgesResponse + +pytestmark: pytest.MarkDecorator = pytest.mark.django_db + + +def _build_response(title: str = "VIP") -> GlobalChatBadgesResponse: + """Build a valid GlobalChatBadgesResponse payload for tests. + + Returns: + A validated Twitch global chat badges response object. + """ + return GlobalChatBadgesResponse.model_validate({ + "data": [ + { + "set_id": "vip", + "versions": [ + { + "id": "1", + "image_url_1x": "https://example.com/vip-1x.png", + "image_url_2x": "https://example.com/vip-2x.png", + "image_url_4x": "https://example.com/vip-4x.png", + "title": title, + "description": "VIP Badge", + "click_action": "visit_url", + "click_url": "https://help.twitch.tv", + }, + ], + }, + ], + }) + + +def test_raises_when_client_id_missing(monkeypatch: pytest.MonkeyPatch) -> None: + """Command should fail when client ID is not provided.""" + monkeypatch.delenv("TWITCH_CLIENT_ID", raising=False) + monkeypatch.delenv("TWITCH_CLIENT_SECRET", raising=False) + monkeypatch.delenv("TWITCH_ACCESS_TOKEN", raising=False) + + with pytest.raises(CommandError, match="Twitch Client ID is required"): + call_command("import_chat_badges", stdout=StringIO()) + + +def test_raises_when_token_and_secret_missing(monkeypatch: pytest.MonkeyPatch) -> None: + """Command should fail when no token and no client secret are available.""" + monkeypatch.delenv("TWITCH_CLIENT_SECRET", raising=False) + monkeypatch.delenv("TWITCH_ACCESS_TOKEN", raising=False) + + with pytest.raises(CommandError, match="Either --access-token or --client-secret"): + call_command( + "import_chat_badges", + "--client-id", + "client-id", + stdout=StringIO(), + ) + + +@pytest.mark.django_db +def test_import_creates_then_updates_existing_badge() -> None: + """Running import twice should update existing badges rather than duplicate them.""" + with patch( + "twitch.management.commands.import_chat_badges.Command._fetch_global_chat_badges", + side_effect=[_build_response("VIP"), _build_response("VIP Updated")], + ) as fetch_mock: + call_command( + "import_chat_badges", + "--client-id", + "client-id", + "--access-token", + "access-token", + stdout=StringIO(), + ) + call_command( + "import_chat_badges", + "--client-id", + "client-id", + "--access-token", + "access-token", + stdout=StringIO(), + ) + + assert fetch_mock.call_count == 2 + assert ChatBadgeSet.objects.count() == 1 + assert ChatBadge.objects.count() == 1 + + badge_set = ChatBadgeSet.objects.get(set_id="vip") + badge = ChatBadge.objects.get(badge_set=badge_set, badge_id="1") + assert badge.title == "VIP Updated" + assert badge.click_action == "visit_url" + assert badge.click_url == "https://help.twitch.tv" + + +@pytest.mark.django_db +def test_uses_client_credentials_when_access_token_missing() -> None: + """Command should obtain token from Twitch when no access token is provided.""" + with ( + patch( + "twitch.management.commands.import_chat_badges.Command._get_app_access_token", + return_value="generated-token", + ) as token_mock, + patch( + "twitch.management.commands.import_chat_badges.Command._fetch_global_chat_badges", + return_value=GlobalChatBadgesResponse.model_validate({"data": []}), + ) as fetch_mock, + ): + call_command( + "import_chat_badges", + "--client-id", + "client-id", + "--client-secret", + "client-secret", + stdout=StringIO(), + ) + + token_mock.assert_called_once_with("client-id", "client-secret") + fetch_mock.assert_called_once_with( + client_id="client-id", + access_token="generated-token", + ) + + +def test_wraps_http_errors_from_badges_fetch() -> None: + """Command should convert HTTP client errors to CommandError.""" + with ( + patch( + "twitch.management.commands.import_chat_badges.Command._fetch_global_chat_badges", + side_effect=httpx.HTTPError("boom"), + ), + pytest.raises( + CommandError, + match="Failed to fetch chat badges from Twitch API", + ), + ): + call_command( + "import_chat_badges", + "--client-id", + "client-id", + "--access-token", + "access-token", + stdout=StringIO(), + ) diff --git a/twitch/tests/test_tasks.py b/twitch/tests/test_tasks.py new file mode 100644 index 0000000..191b352 --- /dev/null +++ b/twitch/tests/test_tasks.py @@ -0,0 +1,575 @@ +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import MagicMock +from unittest.mock import call +from unittest.mock import patch + +import httpx +import pytest + +from twitch.models import DropBenefit +from twitch.models import DropCampaign +from twitch.models import RewardCampaign +from twitch.tasks import _convert_to_modern_formats +from twitch.tasks import _download_and_save +from twitch.tasks import backup_database +from twitch.tasks import download_all_images +from twitch.tasks import download_benefit_image +from twitch.tasks import download_campaign_image +from twitch.tasks import download_game_image +from twitch.tasks import download_reward_campaign_image +from twitch.tasks import import_chat_badges +from twitch.tasks import import_twitch_file +from twitch.tasks import scan_pending_twitch_files + + +def test_scan_pending_twitch_files_dispatches_json_files( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + """It should dispatch an import task for each JSON file in the pending directory.""" + first = tmp_path / "one.json" + second = tmp_path / "two.json" + ignored = tmp_path / "notes.txt" + first.write_text("{}", encoding="utf-8") + second.write_text("{}", encoding="utf-8") + ignored.write_text("ignored", encoding="utf-8") + monkeypatch.setenv("TTVDROPS_PENDING_DIR", str(tmp_path)) + + with patch("twitch.tasks.import_twitch_file.delay") as delay_mock: + scan_pending_twitch_files.run() + + delay_mock.assert_has_calls([call(str(first)), call(str(second))], any_order=True) + assert delay_mock.call_count == 2 + + +def test_scan_pending_twitch_files_skips_when_env_missing( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """It should do nothing when TTVDROPS_PENDING_DIR is not configured.""" + monkeypatch.delenv("TTVDROPS_PENDING_DIR", raising=False) + + with patch("twitch.tasks.import_twitch_file.delay") as delay_mock: + scan_pending_twitch_files.run() + + delay_mock.assert_not_called() + + +def test_import_twitch_file_calls_importer_for_existing_file(tmp_path: Path) -> None: + """It should run BetterImportDrops with the provided path when the file exists.""" + source = tmp_path / "drops.json" + source.write_text("{}", encoding="utf-8") + + with patch( + "twitch.management.commands.better_import_drops.Command.handle", + ) as handle_mock: + import_twitch_file.run(str(source)) + + assert handle_mock.call_count == 1 + assert handle_mock.call_args.kwargs["path"] == source + + +def test_import_twitch_file_retries_on_import_error(tmp_path: Path) -> None: + """It should retry when the importer raises an exception.""" + source = tmp_path / "drops.json" + source.write_text("{}", encoding="utf-8") + error = RuntimeError("boom") + + with ( + patch( + "twitch.management.commands.better_import_drops.Command.handle", + side_effect=error, + ), + patch.object( + import_twitch_file, + "retry", + side_effect=RuntimeError("retried"), + ) as retry_mock, + pytest.raises(RuntimeError, match="retried"), + ): + import_twitch_file.run(str(source)) + + retry_mock.assert_called_once_with(exc=error) + + +def test_download_and_save_skips_when_image_already_cached() -> None: + """It should not download when the target ImageField already has a file name.""" + file_field = MagicMock() + file_field.name = "already-there.jpg" + + with patch("twitch.tasks.httpx.Client") as client_mock: + result = _download_and_save( + url="https://example.com/test.jpg", + name="game-1", + file_field=file_field, + ) + + assert result is False + client_mock.assert_not_called() + + +def test_download_and_save_saves_downloaded_content() -> None: + """It should save downloaded bytes and trigger modern format conversion.""" + file_field = MagicMock() + file_field.name = "" + file_field.path = "C:/cache/game-1.png" + + response = MagicMock() + response.content = b"img-bytes" + response.raise_for_status.return_value = None + + client = MagicMock() + client.get.return_value = response + client_cm = MagicMock() + client_cm.__enter__.return_value = client + + with ( + patch("twitch.tasks.httpx.Client", return_value=client_cm), + patch("twitch.tasks._convert_to_modern_formats") as convert_mock, + ): + result = _download_and_save( + url="https://example.com/path/picture.png", + name="game-1", + file_field=file_field, + ) + + assert result is True + file_field.save.assert_called_once() + assert file_field.save.call_args.args[0] == "game-1.png" + assert file_field.save.call_args.kwargs["save"] is True + convert_mock.assert_called_once_with(Path("C:/cache/game-1.png")) + + +def test_download_and_save_returns_false_on_http_error() -> None: + """It should return False when HTTP requests fail.""" + file_field = MagicMock() + file_field.name = "" + + client = MagicMock() + client.get.side_effect = httpx.HTTPError("network down") + client_cm = MagicMock() + client_cm.__enter__.return_value = client + + with patch("twitch.tasks.httpx.Client", return_value=client_cm): + result = _download_and_save( + url="https://example.com/path/picture.png", + name="game-1", + file_field=file_field, + ) + + assert result is False + file_field.save.assert_not_called() + + +def test_download_game_image_downloads_normalized_twitch_url() -> None: + """It should normalize Twitch box art URLs before downloading.""" + game = SimpleNamespace( + box_art="https://static-cdn.jtvnw.net/ttv-boxart/1-{width}x{height}.jpg", + twitch_id="123", + box_art_file=MagicMock(), + ) + + with ( + patch("twitch.models.Game.objects.get", return_value=game), + patch( + "twitch.utils.is_twitch_box_art_url", + return_value=True, + ) as is_twitch_mock, + patch( + "twitch.utils.normalize_twitch_box_art_url", + return_value="https://cdn.example.com/box.jpg", + ) as normalize_mock, + patch("twitch.tasks._download_and_save") as save_mock, + ): + download_game_image.run(1) + + is_twitch_mock.assert_called_once_with(game.box_art) + normalize_mock.assert_called_once_with(game.box_art) + save_mock.assert_called_once_with( + "https://cdn.example.com/box.jpg", + "123", + game.box_art_file, + ) + + +def test_download_game_image_retries_on_download_error() -> None: + """It should retry when the image download helper fails unexpectedly.""" + game = SimpleNamespace( + box_art="https://static-cdn.jtvnw.net/ttv-boxart/1-{width}x{height}.jpg", + twitch_id="123", + box_art_file=MagicMock(), + ) + error = RuntimeError("boom") + + with ( + patch("twitch.models.Game.objects.get", return_value=game), + patch("twitch.utils.is_twitch_box_art_url", return_value=True), + patch( + "twitch.utils.normalize_twitch_box_art_url", + return_value="https://cdn.example.com/box.jpg", + ), + patch("twitch.tasks._download_and_save", side_effect=error), + patch.object( + download_game_image, + "retry", + side_effect=RuntimeError("retried"), + ) as retry_mock, + pytest.raises(RuntimeError, match="retried"), + ): + download_game_image.run(1) + + retry_mock.assert_called_once_with(exc=error) + + +def test_download_all_images_calls_expected_commands() -> None: + """It should invoke both image import management commands.""" + with patch("twitch.tasks.call_command") as call_command_mock: + download_all_images.run() + + call_command_mock.assert_has_calls([ + call("download_box_art"), + call("download_campaign_images", model="all"), + ]) + + +def test_import_chat_badges_retries_on_error() -> None: + """It should retry when import_chat_badges command execution fails.""" + error = RuntimeError("boom") + + with ( + patch("twitch.tasks.call_command", side_effect=error), + patch.object( + import_chat_badges, + "retry", + side_effect=RuntimeError("retried"), + ) as retry_mock, + pytest.raises(RuntimeError, match="retried"), + ): + import_chat_badges.run() + + retry_mock.assert_called_once_with(exc=error) + + +def test_backup_database_calls_backup_command() -> None: + """It should invoke the backup_db management command.""" + with patch("twitch.tasks.call_command") as call_command_mock: + backup_database.run() + + call_command_mock.assert_called_once_with("backup_db") + + +def test_convert_to_modern_formats_skips_non_image_files(tmp_path: Path) -> None: + """It should skip files that are not jpg, jpeg, or png.""" + text_file = tmp_path / "document.txt" + text_file.write_text("Not an image", encoding="utf-8") + + with patch("PIL.Image.open") as open_mock: + _convert_to_modern_formats(text_file) + + open_mock.assert_not_called() + + +def test_convert_to_modern_formats_skips_missing_files(tmp_path: Path) -> None: + """It should skip files that do not exist.""" + missing_file = tmp_path / "nonexistent.jpg" + + with patch("PIL.Image.open") as open_mock: + _convert_to_modern_formats(missing_file) + + open_mock.assert_not_called() + + +def test_convert_to_modern_formats_converts_rgb_image(tmp_path: Path) -> None: + """It should save image in WebP and AVIF formats for RGB images.""" + original = tmp_path / "image.jpg" + original.write_bytes(b"fake-jpeg") + + mock_image = MagicMock() + mock_image.mode = "RGB" + mock_copy = MagicMock() + mock_image.copy.return_value = mock_copy + + with ( + patch("PIL.Image.open") as open_mock, + ): + open_mock.return_value.__enter__.return_value = mock_image + _convert_to_modern_formats(original) + + open_mock.assert_called_once_with(original) + assert mock_copy.save.call_count == 2 + webp_call = [c for c in mock_copy.save.call_args_list if "webp" in str(c)] + avif_call = [c for c in mock_copy.save.call_args_list if "avif" in str(c)] + assert len(webp_call) == 1 + assert len(avif_call) == 1 + + +def test_convert_to_modern_formats_converts_rgba_image_with_background( + tmp_path: Path, +) -> None: + """It should create RGB background and paste RGBA image onto it.""" + original = tmp_path / "image.png" + original.write_bytes(b"fake-png") + + mock_rgba = MagicMock() + mock_rgba.mode = "RGBA" + mock_rgba.split.return_value = [MagicMock(), MagicMock(), MagicMock(), MagicMock()] + mock_rgba.size = (100, 100) + mock_rgba.convert.return_value = mock_rgba + + mock_bg = MagicMock() + + with ( + patch("PIL.Image.open") as open_mock, + patch("PIL.Image.new", return_value=mock_bg) as new_mock, + ): + open_mock.return_value.__enter__.return_value = mock_rgba + _convert_to_modern_formats(original) + + new_mock.assert_called_once_with("RGB", (100, 100), (255, 255, 255)) + mock_bg.paste.assert_called_once() + assert mock_bg.save.call_count == 2 + + +def test_convert_to_modern_formats_handles_conversion_error(tmp_path: Path) -> None: + """It should gracefully handle format conversion failures.""" + original = tmp_path / "image.jpg" + original.write_bytes(b"fake-jpeg") + + mock_image = MagicMock() + mock_image.mode = "RGB" + mock_image.copy.return_value = MagicMock() + mock_image.copy.return_value.save.side_effect = Exception("PIL error") + + with ( + patch("PIL.Image.open") as open_mock, + ): + open_mock.return_value.__enter__.return_value = mock_image + _convert_to_modern_formats(original) # Should not raise + + +def test_download_campaign_image_downloads_when_url_exists() -> None: + """It should download and save campaign image when image_url is present.""" + campaign = SimpleNamespace( + image_url="https://example.com/campaign.jpg", + twitch_id="camp123", + image_file=MagicMock(), + ) + + with ( + patch("twitch.models.DropCampaign.objects.get", return_value=campaign), + patch("twitch.tasks._download_and_save", return_value=True) as save_mock, + ): + download_campaign_image.run(1) + + save_mock.assert_called_once_with( + "https://example.com/campaign.jpg", + "camp123", + campaign.image_file, + ) + + +def test_download_campaign_image_skips_when_no_url() -> None: + """It should skip when campaign has no image_url.""" + campaign = SimpleNamespace( + image_url=None, + twitch_id="camp123", + image_file=MagicMock(), + ) + + with ( + patch("twitch.models.DropCampaign.objects.get", return_value=campaign), + patch("twitch.tasks._download_and_save") as save_mock, + ): + download_campaign_image.run(1) + + save_mock.assert_not_called() + + +def test_download_campaign_image_skips_when_not_found() -> None: + """It should skip when campaign does not exist.""" + with ( + patch( + "twitch.models.DropCampaign.objects.get", + side_effect=DropCampaign.DoesNotExist(), + ), + patch("twitch.tasks._download_and_save") as save_mock, + ): + download_campaign_image.run(1) + + save_mock.assert_not_called() + + +def test_download_campaign_image_retries_on_error() -> None: + """It should retry when image download fails unexpectedly.""" + campaign = SimpleNamespace( + image_url="https://example.com/campaign.jpg", + twitch_id="camp123", + image_file=MagicMock(), + ) + error = RuntimeError("boom") + + with ( + patch("twitch.models.DropCampaign.objects.get", return_value=campaign), + patch("twitch.tasks._download_and_save", side_effect=error), + patch.object( + download_campaign_image, + "retry", + side_effect=RuntimeError("retried"), + ) as retry_mock, + pytest.raises(RuntimeError, match="retried"), + ): + download_campaign_image.run(1) + + retry_mock.assert_called_once_with(exc=error) + + +def test_download_benefit_image_downloads_when_url_exists() -> None: + """It should download and save benefit image when image_asset_url is present.""" + benefit = SimpleNamespace( + image_asset_url="https://example.com/benefit.png", + twitch_id="benefit123", + image_file=MagicMock(), + ) + + with ( + patch("twitch.models.DropBenefit.objects.get", return_value=benefit), + patch("twitch.tasks._download_and_save", return_value=True) as save_mock, + ): + download_benefit_image.run(1) + + save_mock.assert_called_once_with( + url="https://example.com/benefit.png", + name="benefit123", + file_field=benefit.image_file, + ) + + +def test_download_benefit_image_skips_when_no_url() -> None: + """It should skip when benefit has no image_asset_url.""" + benefit = SimpleNamespace( + image_asset_url=None, + twitch_id="benefit123", + image_file=MagicMock(), + ) + + with ( + patch("twitch.models.DropBenefit.objects.get", return_value=benefit), + patch("twitch.tasks._download_and_save") as save_mock, + ): + download_benefit_image.run(1) + + save_mock.assert_not_called() + + +def test_download_benefit_image_skips_when_not_found() -> None: + """It should skip when benefit does not exist.""" + with ( + patch( + "twitch.models.DropBenefit.objects.get", + side_effect=DropBenefit.DoesNotExist(), + ), + patch("twitch.tasks._download_and_save") as save_mock, + ): + download_benefit_image.run(1) + + save_mock.assert_not_called() + + +def test_download_benefit_image_retries_on_error() -> None: + """It should retry when image download fails unexpectedly.""" + benefit = SimpleNamespace( + image_asset_url="https://example.com/benefit.png", + twitch_id="benefit123", + image_file=MagicMock(), + ) + error = RuntimeError("boom") + + with ( + patch("twitch.models.DropBenefit.objects.get", return_value=benefit), + patch("twitch.tasks._download_and_save", side_effect=error), + patch.object( + download_benefit_image, + "retry", + side_effect=RuntimeError("retried"), + ) as retry_mock, + pytest.raises(RuntimeError, match="retried"), + ): + download_benefit_image.run(1) + + retry_mock.assert_called_once_with(exc=error) + + +def test_download_reward_campaign_image_downloads_when_url_exists() -> None: + """It should download and save reward campaign image when image_url is present.""" + reward = SimpleNamespace( + image_url="https://example.com/reward.jpg", + twitch_id="reward123", + image_file=MagicMock(), + ) + + with ( + patch("twitch.models.RewardCampaign.objects.get", return_value=reward), + patch("twitch.tasks._download_and_save", return_value=True) as save_mock, + ): + download_reward_campaign_image.run(1) + + save_mock.assert_called_once_with( + "https://example.com/reward.jpg", + "reward123", + reward.image_file, + ) + + +def test_download_reward_campaign_image_skips_when_no_url() -> None: + """It should skip when reward has no image_url.""" + reward = SimpleNamespace( + image_url=None, + twitch_id="reward123", + image_file=MagicMock(), + ) + + with ( + patch("twitch.models.RewardCampaign.objects.get", return_value=reward), + patch("twitch.tasks._download_and_save") as save_mock, + ): + download_reward_campaign_image.run(1) + + save_mock.assert_not_called() + + +def test_download_reward_campaign_image_skips_when_not_found() -> None: + """It should skip when reward does not exist.""" + with ( + patch( + "twitch.models.RewardCampaign.objects.get", + side_effect=RewardCampaign.DoesNotExist(), + ), + patch("twitch.tasks._download_and_save") as save_mock, + ): + download_reward_campaign_image.run(1) + + save_mock.assert_not_called() + + +def test_download_reward_campaign_image_retries_on_error() -> None: + """It should retry when image download fails unexpectedly.""" + reward = SimpleNamespace( + image_url="https://example.com/reward.jpg", + twitch_id="reward123", + image_file=MagicMock(), + ) + error = RuntimeError("boom") + + with ( + patch("twitch.models.RewardCampaign.objects.get", return_value=reward), + patch("twitch.tasks._download_and_save", side_effect=error), + patch.object( + download_reward_campaign_image, + "retry", + side_effect=RuntimeError("retried"), + ) as retry_mock, + pytest.raises(RuntimeError, match="retried"), + ): + download_reward_campaign_image.run(1) + + retry_mock.assert_called_once_with(exc=error)