diff --git a/.env.example b/.env.example index 380538a..7ccfd56 100644 --- a/.env.example +++ b/.env.example @@ -1,13 +1,7 @@ -# Environment variables for ttvdrops Django application -# Copy this file to `.env` and fill in the appropriate values before running the application. -# For local development, you can use the default values provided here, but make sure to change the secret key and Twitch API credentials for production use. - -# Debug mode (set to False in production) +# Django Configuration +# Set to False in production DEBUG=True -# Base URL used for absolute URL generation -BASE_URL=https://ttvdrops.lovinator.space - # Django Secret Key # Generate a new secret key for production: python -c 'from django.core.management.utils import get_random_secret_key; print(get_random_secret_key())' DJANGO_SECRET_KEY=your-secret-key-here @@ -17,8 +11,6 @@ DJANGO_SECRET_KEY=your-secret-key-here # You can use either an app access token or user access token TWITCH_CLIENT_ID=your-twitch-client-id TWITCH_CLIENT_SECRET=your-twitch-client-secret -# Optional: if omitted, some commands can fetch an app access token using the client credentials -TWITCH_ACCESS_TOKEN= # Email Configuration # SMTP Host (examples below) @@ -50,24 +42,10 @@ POSTGRES_HOST=/run/postgresql # Note: Changed from 5432 to 6432 to use PgBouncer POSTGRES_PORT=5432 - -# Optional database connection tuning -# CONN_MAX_AGE=60 -# CONN_HEALTH_CHECKS=True -# DB_CONNECT_TIMEOUT=10 - -# Use SQLite instead of PostgreSQL (useful for local development) -USE_SQLITE=False - # Where to store Twitch API responses TTVDROPS_IMPORTED_DIR=/mnt/fourteen/Data/Responses/imported - -# Where to store Twitch API responses that failed processing (e.g. due to missing fields or unrecognized formats) TTVDROPS_BROKEN_DIR=/mnt/fourteen/Data/Responses/broken -# Where to store Twitch API responses that are pending processing (e.g. waiting for a periodic task to process them) -TTVDROPS_PENDING_DIR=/mnt/fourteen/Data/Responses/pending - -# Redis Configuration for caching and Celery +# Redis Configuration REDIS_URL_CACHE=unix:///var/run/redis/redis.sock?db=0 REDIS_URL_CELERY=unix:///var/run/redis/redis.sock?db=1 diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 9b4ffb0..182ab46 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -1,40 +1,78 @@ -# Project Guidelines +# Django Guidelines -## Code Style -- Follow PEP 8 with a 120 character line limit. -- Use double quotes for Python strings. -- Keep comments focused on why, not what. +## Python Best Practices +- Follow PEP 8 with 120 character line limit +- Use double quotes for Python strings -## Build and Test -- Install/update dependencies with `uv sync -U`. -- Run tests with `uv run pytest`. -- Run Django commands with `uv run python manage.py `. -- Typical local sequence: `uv run python manage.py makemigrations`, `uv run python manage.py migrate`, `uv run python manage.py runserver`. -- Before finishing model changes, verify migrations with `uv run python manage.py makemigrations --check --dry-run`. +## Django Best Practices +- Follow Django's "batteries included" philosophy - use built-in features before third-party packages +- Prioritize security and follow Django's security best practices +- Use Django's ORM effectively -## Architecture -- This is a Django multi-app project: - - `twitch/`, `kick/`, `chzzk/`, and `youtube/` hold platform-specific domain logic. - - `core/` holds shared cross-app utilities (base URL, SEO, shared views/helpers). - - `config/` holds settings, URLs, WSGI, and Celery app wiring. -- Favor Django's batteries-included approach before adding new third-party dependencies. -- Follow MTV with fat models and thin views; keep template logic simple. +## Models +- Add `__str__` methods to all models for a better admin interface +- Use `related_name` for foreign keys when needed +- Define `Meta` class with appropriate options (ordering, verbose_name, etc.) +- Use `blank=True` for optional form fields, `null=True` for optional database fields -## Conventions -- Models should include `__str__` and explicit `Meta` options (ordering/indexes/verbose names) where relevant. -- Use `blank=True` for optional form fields and `null=True` for optional database fields. -- Prefer `get_object_or_404` over manual DoesNotExist handling in views. -- URL patterns should use descriptive names and trailing slashes. -- For schema models that parse external APIs, prefer Pydantic `extra="forbid"` and normalize variant payloads with validators. -- Add tests for both positive and negative scenarios, especially when handling multiple upstream payload formats. +## Views +- Always validate and sanitize user input +- Handle exceptions gracefully with try/except blocks +- Use `get_object_or_404` instead of manual exception handling +- Implement proper pagination for list views -## Environment and Pitfalls -- `DJANGO_SECRET_KEY` must be set; the app exits if it is missing. -- Database defaults to PostgreSQL unless `USE_SQLITE=true`; tests use in-memory SQLite. -- Redis is used for cache and Celery. Keep `REDIS_URL_CACHE` and `REDIS_URL_CELERY` configured in environments that run background tasks. -- Static and media files are stored under the platform data directory configured in `config/settings.py`, not in the repo tree. +## URLs +- Use descriptive URL names for reverse URL lookups +- Always end URL patterns with a trailing slash -## Documentation -- Link to existing docs instead of duplicating them. -- Use `README.md` as the source of truth for system setup, deployment, Celery, and management command examples. -- Only create new documentation files when explicitly requested. +## Templates +- Use template inheritance with base templates +- Use template tags and filters for common operations +- Avoid complex logic in templates - move it to views or template tags +- Use static files properly with `{% load static %}` +- Avoid hiding controls with `
` or other collapse elements unless explicitly needed +- Prioritize accessibility and discoverability of features + +## Settings +- Use environment variables in a single `settings.py` file +- Never commit secrets to version control + +## Database +- Use migrations for all database changes +- Optimize queries with `select_related` and `prefetch_related` +- Use database indexes for frequently queried fields +- Avoid N+1 query problems + +## Testing +- Always write unit tests and check that they pass for new features +- Test both positive and negative scenarios + +## Pydantic Schemas +- Use `extra="forbid"` in model_config to catch API changes and new fields from external APIs +- Explicitly document all optional fields from different API operation formats +- This ensures validation failures alert you to API changes rather than silently ignoring new data +- When supporting multiple API formats (e.g., ViewerDropsDashboard vs Inventory operations): + - Make fields optional that differ between formats + - Use field validators or model validators to normalize data between formats + - Write tests covering both operation formats to ensure backward compatibility + +## Architectural Patterns +- Follow Django's MTV (Model-Template-View) paradigm; keep business logic in models/services and presentation in templates +- Favor fat models and thin views; extract reusable business logic into services/helpers when complexity grows +- Keep forms and serializers (if added) responsible for validation; avoid duplicating validation in views +- Avoid circular dependencies; keep modules cohesive and decoupled +- Use settings modules and environment variables to configure behavior, not hardcoded constants + +## Technology Stack +- Python 3, Django, PostgreSQL, Redis (Valkey), Celery for background tasks +- HTML templates with Django templating; static assets served from `static/` and collected to `staticfiles/` +- Management commands in `twitch/management/commands/` for data import and maintenance tasks +- Use `pyproject.toml` + uv for dependency and environment management +- Use `uv run python manage.py ` to run Django management commands +- Use `uv run pytest` to run tests + +## Documentation & Project Organization +- Only create documentation files when explicitly requested by the user +- Do not generate markdown files summarizing work or changes unless asked +- Keep code comments focused on "why" not "what"; the code itself should be clear +- Update existing documentation rather than creating new files diff --git a/README.md b/README.md index d6cc782..b7273b3 100644 --- a/README.md +++ b/README.md @@ -61,20 +61,13 @@ sudo systemctl enable --now ttvdrops.service curl --unix-socket /run/ttvdrops/ttvdrops.sock https://ttvdrops.lovinator.space ``` -Enable Celery worker and Beat services: +Install and enable timers: ```bash -sudo install -m 0644 tools/systemd/ttvdrops-celery-worker.service /etc/systemd/system/ -sudo install -m 0644 tools/systemd/ttvdrops-celery-beat.service /etc/systemd/system/ +sudo install -m 0644 tools/systemd/ttvdrops-backup.{service,timer} /etc/systemd/system/ +sudo install -m 0644 tools/systemd/ttvdrops-import-drops.{service,timer} /etc/systemd/system/ sudo systemctl daemon-reload -sudo systemctl enable --now ttvdrops-celery-worker.service -sudo systemctl enable --now ttvdrops-celery-beat.service -``` - -Set `TTVDROPS_PENDING_DIR` in `.env` to the directory where Twitch JSON drop files are dropped: - -```env -TTVDROPS_PENDING_DIR=/mnt/fourteen/Data/Responses/pending +sudo systemctl enable --now ttvdrops-backup.timer ttvdrops-import-drops.timer ``` ## Development @@ -90,40 +83,13 @@ uv run pytest ## Celery -Celery powers all periodic and background work. Three services are required: - -| Service file | Queues | Purpose | -|---|---|---| -| `ttvdrops-celery-worker.service` | `imports`, `api-fetches`, `default` | Twitch/Kick/Chzzk imports, backup | -| `ttvdrops-celery-beat.service` | — | Periodic task scheduler (Beat) | - -Start workers manually during development: +Start a worker: ```bash -# All-in-one worker (development) -uv run celery -A config worker --queues imports,api-fetches,image-downloads,default --loglevel=info - -# Beat scheduler (requires database Beat tables — run migrate first) -uv run celery -A config beat --scheduler django_celery_beat.schedulers:DatabaseScheduler --loglevel=info - -# Monitor tasks in the browser (optional) -uv run celery -A config flower +uv run celery -A config worker --loglevel=info +uv run celery -A config beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler ``` -**Periodic tasks configured via `CELERY_BEAT_SCHEDULE`:** - -| Task | Schedule | Queue | -|---|---|---| -| `twitch.tasks.scan_pending_twitch_files` | every 10 s | `imports` | -| `kick.tasks.import_kick_drops` | :01/:16/:31/:46 | `api-fetches` | -| `chzzk.tasks.discover_chzzk_campaigns` | every 2 h | `api-fetches` | -| `twitch.tasks.backup_database` | daily 02:15 UTC | `default` | -| `twitch.tasks.download_all_images` | Sunday 04:00 UTC | `image-downloads` | -| `twitch.tasks.import_chat_badges` | Sunday 03:00 UTC | `api-fetches` | - -Image downloads also run **immediately** on record creation via `post_save` signals -(`Game`, `DropCampaign`, `DropBenefit`, `RewardCampaign`). - ## Import Drops ```bash diff --git a/chzzk/tasks.py b/chzzk/tasks.py deleted file mode 100644 index 5707beb..0000000 --- a/chzzk/tasks.py +++ /dev/null @@ -1,26 +0,0 @@ -from __future__ import annotations - -import logging - -from celery import shared_task -from django.core.management import call_command - -logger = logging.getLogger("ttvdrops.tasks") - - -@shared_task(bind=True, queue="imports", max_retries=3, default_retry_delay=60) -def import_chzzk_campaign_task(self, campaign_no: int) -> None: # noqa: ANN001 - """Import a single Chzzk campaign by its campaign number.""" - try: - call_command("import_chzzk_campaign", str(campaign_no)) - except Exception as exc: - raise self.retry(exc=exc) from exc - - -@shared_task(bind=True, queue="api-fetches", max_retries=3, default_retry_delay=120) -def discover_chzzk_campaigns(self) -> None: # noqa: ANN001 - """Discover and import the latest Chzzk campaigns (equivalent to --latest flag).""" - try: - call_command("import_chzzk_campaign", latest=True) - except Exception as exc: - raise self.retry(exc=exc) from exc diff --git a/chzzk/tests/test_management_commands.py b/chzzk/tests/test_management_commands.py deleted file mode 100644 index ec41d22..0000000 --- a/chzzk/tests/test_management_commands.py +++ /dev/null @@ -1,481 +0,0 @@ -from datetime import timedelta -from io import StringIO -from unittest.mock import Mock -from unittest.mock import call -from unittest.mock import patch - -import pytest -import requests -from django.core.management import CommandError -from django.core.management import call_command -from django.test import TestCase -from django.utils import timezone - -from chzzk.management.commands.import_chzzk_campaign import ( - Command as ImportChzzkCampaignCommand, -) -from chzzk.models import ChzzkCampaign -from chzzk.models import ChzzkReward -from chzzk.schemas import ChzzkCampaignV2 -from chzzk.schemas import ChzzkRewardV2 - - -class ImportChzzkCampaignRangeCommandTest(TestCase): - """Tests for the import_chzzk_campaign_range management command.""" - - def test_imports_campaigns_in_range_descending_by_default(self) -> None: - """Test that campaigns are imported in descending order when start > end.""" - stdout = StringIO() - with patch( - "chzzk.management.commands.import_chzzk_campaign_range.call_command", - ) as call_command_mock: - call_command( - "import_chzzk_campaign_range", - "5", - "3", - stdout=stdout, - ) - - # Verify call_command was called for each campaign in descending order - expected_calls = [ - call("import_chzzk_campaign", "5"), - call("import_chzzk_campaign", "4"), - call("import_chzzk_campaign", "3"), - ] - call_command_mock.assert_has_calls(expected_calls) - assert call_command_mock.call_count == 3 - - def test_imports_campaigns_in_range_ascending_by_default(self) -> None: - """Test that campaigns are imported in ascending order when start < end.""" - stdout = StringIO() - with patch( - "chzzk.management.commands.import_chzzk_campaign_range.call_command", - ) as call_command_mock: - call_command( - "import_chzzk_campaign_range", - "3", - "5", - stdout=stdout, - ) - - expected_calls = [ - call("import_chzzk_campaign", "3"), - call("import_chzzk_campaign", "4"), - call("import_chzzk_campaign", "5"), - ] - call_command_mock.assert_has_calls(expected_calls) - assert call_command_mock.call_count == 3 - - def test_imports_single_campaign_when_start_equals_end(self) -> None: - """Test that a single campaign is imported when start equals end.""" - stdout = StringIO() - with patch( - "chzzk.management.commands.import_chzzk_campaign_range.call_command", - ) as call_command_mock: - call_command( - "import_chzzk_campaign_range", - "5", - "5", - stdout=stdout, - ) - - call_command_mock.assert_called_once_with("import_chzzk_campaign", "5") - - def test_respects_custom_step_parameter(self) -> None: - """Test that custom step parameter is respected.""" - stdout = StringIO() - with patch( - "chzzk.management.commands.import_chzzk_campaign_range.call_command", - ) as call_command_mock: - call_command( - "import_chzzk_campaign_range", - "1", - "10", - "--step", - "2", - stdout=stdout, - ) - - expected_calls = [ - call("import_chzzk_campaign", "1"), - call("import_chzzk_campaign", "3"), - call("import_chzzk_campaign", "5"), - call("import_chzzk_campaign", "7"), - call("import_chzzk_campaign", "9"), - ] - call_command_mock.assert_has_calls(expected_calls) - assert call_command_mock.call_count == 5 - - def test_respects_custom_negative_step(self) -> None: - """Test that custom negative step parameter works correctly.""" - stdout = StringIO() - with patch( - "chzzk.management.commands.import_chzzk_campaign_range.call_command", - ) as call_command_mock: - call_command( - "import_chzzk_campaign_range", - "10", - "1", - "--step", - "-2", - stdout=stdout, - ) - - expected_calls = [ - call("import_chzzk_campaign", "10"), - call("import_chzzk_campaign", "8"), - call("import_chzzk_campaign", "6"), - call("import_chzzk_campaign", "4"), - call("import_chzzk_campaign", "2"), - ] - call_command_mock.assert_has_calls(expected_calls) - assert call_command_mock.call_count == 5 - - def test_handles_command_error_gracefully(self) -> None: - """Test that CommandError from import_chzzk_campaign is caught and reported.""" - stdout = StringIO() - stderr = StringIO() - - def side_effect(command: str, *args: str, **kwargs: object) -> None: - if "4" in args: - msg = "Campaign 4 not found" - raise CommandError(msg) - - with patch( - "chzzk.management.commands.import_chzzk_campaign_range.call_command", - side_effect=side_effect, - ): - call_command( - "import_chzzk_campaign_range", - "3", - "5", - stdout=stdout, - stderr=stderr, - ) - - output = stdout.getvalue() - assert "Importing campaign 3" in output - assert "Importing campaign 4" in output - assert "Importing campaign 5" in output - assert "Failed campaign 4" in output - assert "Campaign 4 not found" in output - assert "Batch import complete" in output - - def test_continues_after_command_error(self) -> None: - """Test that import continues after encountering a CommandError.""" - call_count = 0 - - def side_effect(command: str, campaign_no: str) -> None: - nonlocal call_count - call_count += 1 - if campaign_no == "4": - msg = "Campaign 4 error" - raise CommandError(msg) - - with patch( - "chzzk.management.commands.import_chzzk_campaign_range.call_command", - side_effect=side_effect, - ): - call_command( - "import_chzzk_campaign_range", - "3", - "5", - stdout=StringIO(), - ) - - # Verify all campaigns were attempted - assert call_count == 3 - - def test_outputs_success_messages(self) -> None: - """Test that success messages are written to stdout.""" - stdout = StringIO() - with patch( - "chzzk.management.commands.import_chzzk_campaign_range.call_command", - ): - call_command( - "import_chzzk_campaign_range", - "1", - "2", - stdout=stdout, - ) - - output: str = stdout.getvalue() - assert "Importing campaigns from 1 to 2 with step 1" in output - assert "Batch import complete" in output - - def test_raises_error_when_step_is_zero(self) -> None: - """Test that ValueError is raised when step is 0.""" - with pytest.raises(ValueError, match="Step cannot be 0"): - call_command( - "import_chzzk_campaign_range", - "1", - "5", - "--step", - "0", - stdout=StringIO(), - ) - - def test_handles_large_range(self) -> None: - """Test that large ranges are handled correctly.""" - stdout = StringIO() - with patch( - "chzzk.management.commands.import_chzzk_campaign_range.call_command", - ) as call_command_mock: - call_command( - "import_chzzk_campaign_range", - "1", - "100", - "--step", - "10", - stdout=stdout, - ) - - assert call_command_mock.call_count == 10 - first_call = call_command_mock.call_args_list[0] - assert first_call == call("import_chzzk_campaign", "1") - last_call = call_command_mock.call_args_list[-1] - assert last_call == call("import_chzzk_campaign", "91") - - -class ImportChzzkCampaignCommandTest(TestCase): - """Tests for the import_chzzk_campaign management command.""" - - def _create_campaign(self, campaign_no: int) -> ChzzkCampaign: - now = timezone.now() - return ChzzkCampaign.objects.create( - campaign_no=campaign_no, - title=f"Campaign {campaign_no}", - description="Campaign description", - category_type="game", - category_id="1", - category_value="Game", - service_id="chzzk", - state="ACTIVE", - start_date=now - timedelta(days=1), - end_date=now + timedelta(days=1), - has_ios_based_reward=False, - drops_campaign_not_started=False, - source_api="unit-test", - ) - - def test_requires_campaign_no_when_latest_not_used(self) -> None: - """Command should fail without campaign_no unless --latest is provided.""" - with pytest.raises( - CommandError, - match="campaign_no is required unless --latest is used", - ): - call_command("import_chzzk_campaign", stdout=StringIO()) - - def test_imports_single_campaign_no(self) -> None: - """Command should import the provided campaign number.""" - with patch( - "chzzk.management.commands.import_chzzk_campaign.Command._import_campaign", - autospec=True, - ) as import_campaign_mock: - call_command("import_chzzk_campaign", "42", stdout=StringIO()) - - assert import_campaign_mock.call_count == 1 - assert import_campaign_mock.call_args.args[1] == 42 - - def test_latest_imports_candidates(self) -> None: - """Command should import all candidates returned by get_campaign_import_candidates.""" - with ( - patch( - "chzzk.management.commands.import_chzzk_campaign.Command.get_campaign_import_candidates", - autospec=True, - return_value=[9, 10, 11], - ), - patch( - "chzzk.management.commands.import_chzzk_campaign.Command._import_campaign", - autospec=True, - ) as import_campaign_mock, - ): - call_command("import_chzzk_campaign", "--latest", stdout=StringIO()) - - called_campaigns = [ - call_.args[1] for call_ in import_campaign_mock.call_args_list - ] - assert called_campaigns == [9, 10, 11] - - def test_latest_with_no_candidates_exits_cleanly(self) -> None: - """Command should not import anything when --latest has no candidates.""" - stdout = StringIO() - - with ( - patch( - "chzzk.management.commands.import_chzzk_campaign.Command.get_campaign_import_candidates", - autospec=True, - return_value=[], - ), - patch( - "chzzk.management.commands.import_chzzk_campaign.Command._import_campaign", - autospec=True, - ) as import_campaign_mock, - ): - call_command("import_chzzk_campaign", "--latest", stdout=stdout) - - assert import_campaign_mock.call_count == 0 - assert "Nothing to import with --latest at this time." in stdout.getvalue() - - def test_get_campaign_import_candidates_uses_initial_range_on_empty_db( - self, - ) -> None: - """When there are no campaigns, candidates should be 1..5.""" - command: ImportChzzkCampaignCommand = ImportChzzkCampaignCommand() - candidates: list[int] = command.get_campaign_import_candidates() - assert candidates == [1, 2, 3, 4, 5] - - def test_get_campaign_import_candidates_adds_backfill_and_new_candidates( - self, - ) -> None: - """Candidates should include missing IDs from latest-5..latest-1 plus latest+1..latest+5.""" - self._create_campaign(10) - self._create_campaign(8) - self._create_campaign(6) - - command: ImportChzzkCampaignCommand = ImportChzzkCampaignCommand() - candidates: list[int] = command.get_campaign_import_candidates() - - assert candidates == [5, 7, 9, 11, 12, 13, 14, 15] - - def test_get_campaign_import_candidates_ignores_outlier_max_campaign_no( - self, - ) -> None: - """If max campaign_no is an outlier, the second max should be used.""" - self._create_campaign(250) - self._create_campaign(100_002_000) - - command: ImportChzzkCampaignCommand = ImportChzzkCampaignCommand() - candidates: list[int] = command.get_campaign_import_candidates() - - assert candidates == [245, 246, 247, 248, 249, 251, 252, 253, 254, 255] - - def test_import_campaign_handles_http_error_with_json_message(self) -> None: - """Import should fail gracefully on HTTP errors and include JSON message when present.""" - command = ImportChzzkCampaignCommand() - - response = Mock() - response.raise_for_status.side_effect = requests.HTTPError("404 Client Error") - response.headers = {"Content-Type": "application/json; charset=utf-8"} - response.json.return_value = {"message": "Campaign not found"} - - with ( - patch( - "chzzk.management.commands.import_chzzk_campaign.requests.get", - return_value=response, - ), - patch.object(command.stdout, "write") as stdout_write, - patch.object(command, "import_campaign_data") as import_campaign_data_mock, - ): - command._import_campaign(12345) - - assert import_campaign_data_mock.call_count == 0 - assert stdout_write.call_count == 1 - msg = stdout_write.call_args.args[0] - assert "Failed to fetch campaign 12345" in msg - assert "Campaign not found" in msg - - def test_update_or_create_reward_updates_existing_reward(self) -> None: - """Existing rewards should be updated when incoming reward data differs.""" - campaign = self._create_campaign(500) - existing_reward = ChzzkReward.objects.create( - campaign=campaign, - reward_no=1, - title="Old title", - image_url="https://example.com/old.png", - reward_type="OLD", - campaign_reward_type="", - condition_type="TIME", - condition_for_minutes=10, - ios_based_reward=False, - code_remaining_count=5, - ) - - reward_data = ChzzkRewardV2.model_validate( - { - "title": "New title", - "rewardNo": 1, - "imageUrl": "https://example.com/new.png", - "rewardType": "NEW", - "conditionType": "WATCH", - "conditionForMinutes": 20, - "iosBasedReward": True, - "codeRemainingCount": 99, - }, - ) - - command = ImportChzzkCampaignCommand() - command.update_or_create_reward(500, campaign, reward_data) - - existing_reward.refresh_from_db() - assert existing_reward.title == "New title" - assert existing_reward.image_url == "https://example.com/new.png" - assert existing_reward.reward_type == "NEW" - assert not existing_reward.campaign_reward_type - assert existing_reward.condition_type == "WATCH" - assert existing_reward.condition_for_minutes == 20 - assert existing_reward.ios_based_reward is True - assert existing_reward.code_remaining_count == 99 - - def test_import_campaign_data_updates_existing_campaign(self) -> None: - """Existing campaigns should be updated when imported fields have changed.""" - campaign = self._create_campaign(600) - original_scraped_at = campaign.scraped_at - - command = ImportChzzkCampaignCommand() - campaign_data = ChzzkCampaignV2.model_validate( - { - "campaignNo": 600, - "title": "Updated title", - "imageUrl": "https://example.com/new-campaign.png", - "description": "Updated description", - "categoryType": "game", - "categoryId": "2", - "categoryValue": "Game 2", - "pcLinkUrl": "https://example.com/pc", - "mobileLinkUrl": "https://example.com/mobile", - "serviceId": "chzzk", - "state": "ACTIVE", - "startDate": campaign.start_date.isoformat(), - "endDate": campaign.end_date.isoformat(), - "rewardList": [], - "hasIosBasedReward": True, - "dropsCampaignNotStarted": False, - "rewardType": "DROP", - "accountLinkUrl": "https://example.com/account", - }, - ) - - updated_campaign = command.import_campaign_data( - campaign_no=600, - api_version="v2", - data={"key": "value"}, - cd=campaign_data, - ) - - updated_campaign.refresh_from_db() - assert updated_campaign.title == "Updated title" - assert updated_campaign.description == "Updated description" - assert updated_campaign.category_id == "2" - assert updated_campaign.has_ios_based_reward is True - assert not updated_campaign.campaign_reward_type - assert updated_campaign.reward_type == "DROP" - assert updated_campaign.account_link_url == "https://example.com/account" - assert updated_campaign.raw_json_v2 == {"key": "value"} - assert updated_campaign.scrape_status == "success" - assert updated_campaign.scraped_at >= original_scraped_at - - def test_apply_updates_if_changed_returns_false_on_noop(self) -> None: - """Helper should return False when values are unchanged.""" - campaign = self._create_campaign(700) - command = ImportChzzkCampaignCommand() - - changed = command._apply_updates_if_changed( - campaign, - { - "title": campaign.title, - "description": campaign.description, - }, - ) - - assert changed is False diff --git a/chzzk/tests/test_tasks.py b/chzzk/tests/test_tasks.py deleted file mode 100644 index 1111aa5..0000000 --- a/chzzk/tests/test_tasks.py +++ /dev/null @@ -1,58 +0,0 @@ -from unittest.mock import patch - -import pytest - -from chzzk.tasks import discover_chzzk_campaigns -from chzzk.tasks import import_chzzk_campaign_task - - -def test_import_chzzk_campaign_task_calls_command_with_campaign_no() -> None: - """It should invoke import_chzzk_campaign for the given campaign number.""" - with patch("chzzk.tasks.call_command") as call_command_mock: - import_chzzk_campaign_task.run(905) - - call_command_mock.assert_called_once_with("import_chzzk_campaign", "905") - - -def test_import_chzzk_campaign_task_retries_on_command_error() -> None: - """It should retry when import_chzzk_campaign raises an exception.""" - error = RuntimeError("boom") - - with ( - patch("chzzk.tasks.call_command", side_effect=error), - patch.object( - import_chzzk_campaign_task, - "retry", - side_effect=RuntimeError("retried"), - ) as retry_mock, - pytest.raises(RuntimeError, match="retried"), - ): - import_chzzk_campaign_task.run(905) - - retry_mock.assert_called_once_with(exc=error) - - -def test_discover_chzzk_campaigns_calls_command_with_latest_flag() -> None: - """It should invoke import_chzzk_campaign with latest=True.""" - with patch("chzzk.tasks.call_command") as call_command_mock: - discover_chzzk_campaigns.run() - - call_command_mock.assert_called_once_with("import_chzzk_campaign", latest=True) - - -def test_discover_chzzk_campaigns_retries_on_command_error() -> None: - """It should retry when import_chzzk_campaign raises an exception.""" - error = RuntimeError("boom") - - with ( - patch("chzzk.tasks.call_command", side_effect=error), - patch.object( - discover_chzzk_campaigns, - "retry", - side_effect=RuntimeError("retried"), - ) as retry_mock, - pytest.raises(RuntimeError, match="retried"), - ): - discover_chzzk_campaigns.run() - - retry_mock.assert_called_once_with(exc=error) diff --git a/config/settings.py b/config/settings.py index 7fb2c0f..a084435 100644 --- a/config/settings.py +++ b/config/settings.py @@ -5,7 +5,6 @@ from pathlib import Path from typing import Any import sentry_sdk -from celery.schedules import crontab from dotenv import load_dotenv from platformdirs import user_data_dir @@ -110,8 +109,6 @@ STATICFILES_DIRS: list[Path] = [BASE_DIR / "static"] TIME_ZONE = "UTC" WSGI_APPLICATION = "config.wsgi.application" -TTVDROPS_PENDING_DIR: str = os.getenv("TTVDROPS_PENDING_DIR", "") - INTERNAL_IPS: list[str] = [] if DEBUG: INTERNAL_IPS = ["127.0.0.1", "localhost"] # pyright: ignore[reportConstantRedefinition] @@ -262,63 +259,6 @@ CELERY_BROKER_URL: str = REDIS_URL_CELERY CELERY_RESULT_BACKEND = "django-db" CELERY_RESULT_EXTENDED = True CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler" -CELERY_TASK_SOFT_TIME_LIMIT: int = 3600 # warn at 1 h -CELERY_TASK_TIME_LIMIT: int = 3900 # hard-kill at 1 h 5 min - -CELERY_TASK_ROUTES: dict[str, dict[str, str]] = { - "twitch.tasks.scan_pending_twitch_files": {"queue": "imports"}, - "twitch.tasks.import_twitch_file": {"queue": "imports"}, - "twitch.tasks.download_game_image": {"queue": "image-downloads"}, - "twitch.tasks.download_campaign_image": {"queue": "image-downloads"}, - "twitch.tasks.download_benefit_image": {"queue": "image-downloads"}, - "twitch.tasks.download_reward_campaign_image": {"queue": "image-downloads"}, - "twitch.tasks.download_all_images": {"queue": "image-downloads"}, - "twitch.tasks.import_chat_badges": {"queue": "api-fetches"}, - "twitch.tasks.backup_database": {"queue": "default"}, - "kick.tasks.import_kick_drops": {"queue": "api-fetches"}, - "chzzk.tasks.discover_chzzk_campaigns": {"queue": "api-fetches"}, - "chzzk.tasks.import_chzzk_campaign_task": {"queue": "imports"}, - "core.tasks.submit_indexnow_task": {"queue": "default"}, -} - -CELERY_BEAT_SCHEDULE: dict[str, Any] = { - # Scan for new Twitch JSON drops every 10 seconds. - "scan-pending-twitch-files": { - "task": "twitch.tasks.scan_pending_twitch_files", - "schedule": 10.0, - "options": {"queue": "imports"}, - }, - # Import Kick drops from the API (:01, :16, :31, :46 each hour). - "import-kick-drops": { - "task": "kick.tasks.import_kick_drops", - "schedule": crontab(minute="1,16,31,46"), - "options": {"queue": "api-fetches"}, - }, - # Backup database nightly at 02:15. - "backup-database": { - "task": "twitch.tasks.backup_database", - "schedule": crontab(hour=2, minute=15), - "options": {"queue": "default"}, - }, - # Discover new Chzzk campaigns every 2 hours at minute 0. - "discover-chzzk-campaigns": { - "task": "chzzk.tasks.discover_chzzk_campaigns", - "schedule": crontab(minute=0, hour="*/2"), - "options": {"queue": "api-fetches"}, - }, - # Weekly full image refresh (Sunday 04:00 UTC). - "download-all-images-weekly": { - "task": "twitch.tasks.download_all_images", - "schedule": crontab(hour=4, minute=0, day_of_week=0), - "options": {"queue": "image-downloads"}, - }, - # Weekly chat badge refresh (Sunday 03:00 UTC). - "import-chat-badges-weekly": { - "task": "twitch.tasks.import_chat_badges", - "schedule": crontab(hour=3, minute=0, day_of_week=0), - "options": {"queue": "api-fetches"}, - }, -} # Define BASE_URL for dynamic URL generation BASE_URL: str = "https://ttvdrops.lovinator.space" diff --git a/core/tasks.py b/core/tasks.py deleted file mode 100644 index 5b20599..0000000 --- a/core/tasks.py +++ /dev/null @@ -1,17 +0,0 @@ -from __future__ import annotations - -import logging - -from celery import shared_task -from django.core.management import call_command - -logger = logging.getLogger("ttvdrops.tasks") - - -@shared_task(bind=True, queue="default", max_retries=3, default_retry_delay=300) -def submit_indexnow_task(self) -> None: # noqa: ANN001 - """Submit all site URLs to the IndexNow search index.""" - try: - call_command("submit_indexnow") - except Exception as exc: - raise self.retry(exc=exc) from exc diff --git a/kick/tasks.py b/kick/tasks.py deleted file mode 100644 index 9c50a9e..0000000 --- a/kick/tasks.py +++ /dev/null @@ -1,17 +0,0 @@ -from __future__ import annotations - -import logging - -from celery import shared_task -from django.core.management import call_command - -logger = logging.getLogger("ttvdrops.tasks") - - -@shared_task(bind=True, queue="api-fetches", max_retries=3, default_retry_delay=120) -def import_kick_drops(self) -> None: # noqa: ANN001 - """Fetch and upsert Kick drop campaigns from the public API.""" - try: - call_command("import_kick_drops") - except Exception as exc: - raise self.retry(exc=exc) from exc diff --git a/pyproject.toml b/pyproject.toml index 836228a..d681dc7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,7 +50,7 @@ dev = [ [tool.pytest.ini_options] DJANGO_SETTINGS_MODULE = "config.settings" python_files = ["test_*.py", "*_test.py"] -addopts = "--tb=short -n auto --cov" +addopts = "" filterwarnings = [ "ignore:Parsing dates involving a day of month without a year specified is ambiguous:DeprecationWarning", ] diff --git a/tools/systemd/ttvdrops-backup.service b/tools/systemd/ttvdrops-backup.service new file mode 100644 index 0000000..c4f73c8 --- /dev/null +++ b/tools/systemd/ttvdrops-backup.service @@ -0,0 +1,10 @@ +[Unit] +Description=TTVDrops database backup + +[Service] +Type=oneshot +User=ttvdrops +Group=ttvdrops +WorkingDirectory=/home/ttvdrops/ttvdrops +EnvironmentFile=/home/ttvdrops/ttvdrops/.env +ExecStart=/usr/bin/uv run python manage.py backup_db diff --git a/tools/systemd/ttvdrops-backup.timer b/tools/systemd/ttvdrops-backup.timer new file mode 100644 index 0000000..1546d5e --- /dev/null +++ b/tools/systemd/ttvdrops-backup.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Nightly TTVDrops database backup + +[Timer] +OnCalendar=*-*-* 02:15:00 +Persistent=true + +[Install] +WantedBy=timers.target diff --git a/tools/systemd/ttvdrops-celery-beat.service b/tools/systemd/ttvdrops-celery-beat.service deleted file mode 100644 index c09db14..0000000 --- a/tools/systemd/ttvdrops-celery-beat.service +++ /dev/null @@ -1,27 +0,0 @@ -[Unit] -Description=TTVDrops Celery Beat scheduler -After=network-online.target valkey.service ttvdrops-celery-worker.service -Wants=network-online.target valkey.service - -[Service] -Type=simple -User=ttvdrops -Group=ttvdrops -SupplementaryGroups=http -UMask=0002 -WorkingDirectory=/home/ttvdrops/ttvdrops -EnvironmentFile=/home/ttvdrops/ttvdrops/.env -ExecStart=/usr/bin/uv run celery -A config beat \ - --scheduler django_celery_beat.schedulers:DatabaseScheduler \ - --loglevel INFO -ExecStop=/bin/kill -s TERM $MAINPID - -StandardOutput=journal -StandardError=journal -SyslogIdentifier=ttvdrops-celery-beat - -Restart=on-failure -RestartSec=10s - -[Install] -WantedBy=multi-user.target diff --git a/tools/systemd/ttvdrops-celery-worker.service b/tools/systemd/ttvdrops-celery-worker.service deleted file mode 100644 index 2095273..0000000 --- a/tools/systemd/ttvdrops-celery-worker.service +++ /dev/null @@ -1,31 +0,0 @@ -[Unit] -Description=TTVDrops Celery worker -After=network-online.target valkey.service -Wants=network-online.target valkey.service - -[Service] -Type=simple -User=ttvdrops -Group=ttvdrops -SupplementaryGroups=http -UMask=0002 -WorkingDirectory=/home/ttvdrops/ttvdrops -EnvironmentFile=/home/ttvdrops/ttvdrops/.env -ExecStart=/usr/bin/uv run celery -A config worker \ - --queues imports,api-fetches,default \ - --concurrency 4 \ - --loglevel INFO -ExecStop=/bin/kill -s TERM $MAINPID -ExecReload=/bin/kill -s HUP $MAINPID - -MemoryLimit=512M -CPUQuota=75% -StandardOutput=journal -StandardError=journal -SyslogIdentifier=ttvdrops-celery-worker - -Restart=on-failure -RestartSec=10s - -[Install] -WantedBy=multi-user.target diff --git a/tools/systemd/ttvdrops-import-drops.service b/tools/systemd/ttvdrops-import-drops.service new file mode 100644 index 0000000..d4e204b --- /dev/null +++ b/tools/systemd/ttvdrops-import-drops.service @@ -0,0 +1,34 @@ +[Unit] +Description=TTVDrops watch and import drops from pending directory +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +User=ttvdrops +Group=ttvdrops +SupplementaryGroups=http +UMask=0002 +WorkingDirectory=/home/ttvdrops/ttvdrops +EnvironmentFile=/home/ttvdrops/ttvdrops/.env +ExecStart=/usr/bin/uv run python manage.py watch_imports /mnt/fourteen/Data/Responses/pending --verbose + +# Restart policy +Restart=on-failure +RestartSec=5s + +# Process management +KillMode=mixed +KillSignal=SIGTERM + +# Resource limits +MemoryLimit=512M +CPUQuota=50% + +# Logging +StandardOutput=journal +StandardError=journal +SyslogIdentifier=ttvdrops-watch + +[Install] +WantedBy=multi-user.target diff --git a/tools/systemd/ttvdrops-import-kick-drops.service b/tools/systemd/ttvdrops-import-kick-drops.service new file mode 100644 index 0000000..2932aab --- /dev/null +++ b/tools/systemd/ttvdrops-import-kick-drops.service @@ -0,0 +1,26 @@ +[Unit] +Description=TTVDrops import Kick drops +After=network-online.target +Wants=network-online.target + +[Service] +Type=oneshot +User=ttvdrops +Group=ttvdrops +SupplementaryGroups=http +UMask=0002 +WorkingDirectory=/home/ttvdrops/ttvdrops +EnvironmentFile=/home/ttvdrops/ttvdrops/.env +ExecStart=/usr/bin/uv run python manage.py import_kick_drops + +# Logging +StandardOutput=journal +StandardError=journal +SyslogIdentifier=ttvdrops-import-kick + +# Resource limits +MemoryLimit=512M +CPUQuota=50% + +[Install] +WantedBy=multi-user.target diff --git a/tools/systemd/ttvdrops-import-kick-drops.timer b/tools/systemd/ttvdrops-import-kick-drops.timer new file mode 100644 index 0000000..af98bce --- /dev/null +++ b/tools/systemd/ttvdrops-import-kick-drops.timer @@ -0,0 +1,9 @@ +[Unit] +Description=TTVDrops import Kick drops at :01, :16, :31, and :46 + +[Timer] +OnCalendar=*-*-* *:01,16,31,46:00 +Persistent=true + +[Install] +WantedBy=timers.target diff --git a/twitch/apps.py b/twitch/apps.py index 07e8fe2..8f557e4 100644 --- a/twitch/apps.py +++ b/twitch/apps.py @@ -36,21 +36,3 @@ class TwitchConfig(AppConfig): FieldFile.open = _safe_open except (AttributeError, TypeError) as exc: logger.debug("Failed to patch FieldFile.open: %s", exc) - - # Register post_save signal handlers that dispatch image download tasks - # when new Twitch records are created. - from django.db.models.signals import post_save # noqa: PLC0415 - - from twitch.models import DropBenefit # noqa: PLC0415 - from twitch.models import DropCampaign # noqa: PLC0415 - from twitch.models import Game # noqa: PLC0415 - from twitch.models import RewardCampaign # noqa: PLC0415 - from twitch.signals import on_drop_benefit_saved # noqa: PLC0415 - from twitch.signals import on_drop_campaign_saved # noqa: PLC0415 - from twitch.signals import on_game_saved # noqa: PLC0415 - from twitch.signals import on_reward_campaign_saved # noqa: PLC0415 - - post_save.connect(on_game_saved, sender=Game) - post_save.connect(on_drop_campaign_saved, sender=DropCampaign) - post_save.connect(on_drop_benefit_saved, sender=DropBenefit) - post_save.connect(on_reward_campaign_saved, sender=RewardCampaign) diff --git a/twitch/management/commands/cleanup_orphaned_channels.py b/twitch/management/commands/cleanup_orphaned_channels.py index 5d13863..a99aa90 100644 --- a/twitch/management/commands/cleanup_orphaned_channels.py +++ b/twitch/management/commands/cleanup_orphaned_channels.py @@ -8,8 +8,7 @@ from twitch.models import Channel if TYPE_CHECKING: from argparse import ArgumentParser - from django.db.models import QuerySet - + from debug_toolbar.panels.templates.panel import QuerySet SAMPLE_PREVIEW_COUNT = 10 diff --git a/twitch/management/commands/convert_images_to_modern_formats.py b/twitch/management/commands/convert_images_to_modern_formats.py index d5bced5..3b516f2 100644 --- a/twitch/management/commands/convert_images_to_modern_formats.py +++ b/twitch/management/commands/convert_images_to_modern_formats.py @@ -193,8 +193,8 @@ class Command(BaseCommand): img.mode == "P" and "transparency" in img.info ): # Create white background for transparency - background: Image.Image = Image.new("RGB", img.size, (255, 255, 255)) - rgba_img: Image.Image = img.convert("RGBA") if img.mode == "P" else img + background = Image.new("RGB", img.size, (255, 255, 255)) + rgba_img = img.convert("RGBA") if img.mode == "P" else img background.paste( rgba_img, mask=rgba_img.split()[-1] if rgba_img.mode in {"RGBA", "LA"} else None, diff --git a/twitch/management/commands/watch_imports.py b/twitch/management/commands/watch_imports.py index d1f6ca3..a95bb20 100644 --- a/twitch/management/commands/watch_imports.py +++ b/twitch/management/commands/watch_imports.py @@ -17,16 +17,9 @@ logger: logging.Logger = logging.getLogger("ttvdrops.watch_imports") class Command(BaseCommand): - """Watch for JSON files in a directory and import them automatically. + """Watch for JSON files in a directory and import them automatically.""" - .. deprecated:: - This command is superseded by the Celery Beat task - ``twitch.tasks.scan_pending_twitch_files`` (runs every 10 s via - ``ttvdrops-celery-beat.service``). Keep this command for ad-hoc use - or in environments that run without a Celery worker. - """ - - help = "Watch a directory for JSON files and import them automatically (superseded by Celery Beat)" + help = "Watch a directory for JSON files and import them automatically" requires_migrations_checks = True def add_arguments(self, parser: CommandParser) -> None: diff --git a/twitch/signals.py b/twitch/signals.py deleted file mode 100644 index f8f66af..0000000 --- a/twitch/signals.py +++ /dev/null @@ -1,65 +0,0 @@ -from __future__ import annotations - -import logging -from typing import Any - -logger = logging.getLogger("ttvdrops.signals") - - -def _dispatch(task_fn: Any, pk: int) -> None: # noqa: ANN401 - """Dispatch a Celery task, logging rather than raising when the broker is unavailable.""" - try: - task_fn.delay(pk) - except Exception: # noqa: BLE001 - logger.debug( - "Could not dispatch %s(%d) — broker may be unavailable.", - task_fn.name, - pk, - ) - - -def on_game_saved(sender: Any, instance: Any, created: bool, **kwargs: Any) -> None: # noqa: ANN401, FBT001 - """Dispatch a box-art download task when a new Game is created.""" - if created: - from twitch.tasks import download_game_image # noqa: PLC0415 - - _dispatch(download_game_image, instance.pk) - - -def on_drop_campaign_saved( - sender: Any, # noqa: ANN401 - instance: Any, # noqa: ANN401 - created: bool, # noqa: FBT001 - **kwargs: Any, # noqa: ANN401 -) -> None: - """Dispatch an image download task when a new DropCampaign is created.""" - if created: - from twitch.tasks import download_campaign_image # noqa: PLC0415 - - _dispatch(download_campaign_image, instance.pk) - - -def on_drop_benefit_saved( - sender: Any, # noqa: ANN401 - instance: Any, # noqa: ANN401 - created: bool, # noqa: FBT001 - **kwargs: Any, # noqa: ANN401 -) -> None: - """Dispatch an image download task when a new DropBenefit is created.""" - if created: - from twitch.tasks import download_benefit_image # noqa: PLC0415 - - _dispatch(download_benefit_image, instance.pk) - - -def on_reward_campaign_saved( - sender: Any, # noqa: ANN401 - instance: Any, # noqa: ANN401 - created: bool, # noqa: FBT001 - **kwargs: Any, # noqa: ANN401 -) -> None: - """Dispatch an image download task when a new RewardCampaign is created.""" - if created: - from twitch.tasks import download_reward_campaign_image # noqa: PLC0415 - - _dispatch(download_reward_campaign_image, instance.pk) diff --git a/twitch/tasks.py b/twitch/tasks.py deleted file mode 100644 index 32e5100..0000000 --- a/twitch/tasks.py +++ /dev/null @@ -1,257 +0,0 @@ -from __future__ import annotations - -import logging -import os -from pathlib import Path -from typing import TYPE_CHECKING -from urllib.parse import urlparse - -import httpx -from celery import shared_task -from django.core.files.base import ContentFile -from django.core.management import call_command -from PIL.Image import Image - -if TYPE_CHECKING: - from urllib.parse import ParseResult - - from django.db.models.fields.files import ImageFieldFile - from PIL.Image import Image - from PIL.ImageFile import ImageFile - -logger: logging.Logger = logging.getLogger("ttvdrops.tasks") - - -@shared_task(bind=True, queue="imports", max_retries=3, default_retry_delay=60) -def scan_pending_twitch_files(self) -> None: # noqa: ANN001 - """Scan TTVDROPS_PENDING_DIR for JSON files and dispatch an import task for each.""" - pending_dir: str = os.getenv("TTVDROPS_PENDING_DIR", "") - if not pending_dir: - logger.debug("TTVDROPS_PENDING_DIR not configured; skipping scan.") - return - - path = Path(pending_dir) - if not path.is_dir(): - logger.warning("TTVDROPS_PENDING_DIR %r is not a directory.", pending_dir) - return - - json_files: list[Path] = [ - f for f in path.iterdir() if f.suffix == ".json" and f.is_file() - ] - for json_file in json_files: - import_twitch_file.delay(str(json_file)) - - if json_files: - logger.info("Dispatched %d Twitch file import task(s).", len(json_files)) - - -@shared_task(bind=True, queue="imports", max_retries=3, default_retry_delay=60) -def import_twitch_file(self, file_path: str) -> None: # noqa: ANN001 - """Import a single Twitch JSON drop file via BetterImportDrops logic.""" - from twitch.management.commands.better_import_drops import Command as Importer # noqa: I001, PLC0415 - - path = Path(file_path) - if not path.is_file(): - # Already moved to imported/ or broken/ by a prior run. - logger.debug("File %s no longer exists; skipping.", file_path) - return - - try: - Importer().handle(path=path) - except Exception as exc: - logger.exception("Failed to import %s.", file_path) - raise self.retry(exc=exc) from exc - - -def _download_and_save(url: str, name: str, file_field: ImageFieldFile) -> bool: - """Download url and save the content to file_field (Django ImageField). - - Files that are already cached (non-empty .name) are skipped. - - Returns: - True when the image was saved, False when skipped or on error. - """ - if not url or file_field is None: - return False - - if getattr(file_field, "name", None): - return False # already cached - - parsed: ParseResult = urlparse(url) - suffix: str = Path(parsed.path).suffix or ".jpg" - file_name: str = f"{name}{suffix}" - - try: - with httpx.Client(timeout=20, follow_redirects=True) as client: - response = client.get(url) - response.raise_for_status() - except httpx.HTTPError: - logger.warning("HTTP error downloading image for %r.", name) - return False - - file_field.save(file_name, ContentFile(response.content), save=True) # type: ignore[union-attr] - - image_path: str | None = getattr(file_field, "path", None) - if image_path: - _convert_to_modern_formats(Path(image_path)) - - return True - - -def _convert_to_modern_formats(source: Path) -> None: - """Convert *source* image to WebP and AVIF alongside the original.""" - if not source.exists() or source.suffix.lower() not in {".jpg", ".jpeg", ".png"}: - return - - try: - from PIL import Image # noqa: PLC0415 - except ImportError: - return - - try: - with Image.open(source) as raw: - if raw.mode in {"RGBA", "LA"} or ( - raw.mode == "P" and "transparency" in raw.info - ): - background: Image = Image.new("RGB", raw.size, (255, 255, 255)) - rgba: Image | ImageFile = ( - raw.convert("RGBA") if raw.mode == "P" else raw - ) - mask: Image | None = ( - rgba.split()[-1] if rgba.mode in {"RGBA", "LA"} else None - ) - background.paste(rgba, mask=mask) - img: Image = background - elif raw.mode != "RGB": - img = raw.convert("RGB") - else: - img: Image = raw.copy() - - for fmt, ext in (("WEBP", ".webp"), ("AVIF", ".avif")): - out: Path = source.with_suffix(ext) - try: - img.save(out, fmt, quality=80) - except Exception: # noqa: BLE001 - logger.debug("Could not convert %s to %s.", source, fmt) - except Exception: # noqa: BLE001 - logger.debug("Format conversion failed for %s.", source) - - -# --------------------------------------------------------------------------- -# Per-model image tasks — triggered by post_save signals on new records -# --------------------------------------------------------------------------- - - -@shared_task(bind=True, queue="image-downloads", max_retries=3, default_retry_delay=300) -def download_game_image(self, game_pk: int) -> None: # noqa: ANN001 - """Download and cache the box art image for a single Game.""" - from twitch.models import Game # noqa: PLC0415 - from twitch.utils import is_twitch_box_art_url # noqa: PLC0415 - from twitch.utils import normalize_twitch_box_art_url # noqa: PLC0415 - - try: - game = Game.objects.get(pk=game_pk) - except Game.DoesNotExist: - return - - if not game.box_art or not is_twitch_box_art_url(game.box_art): - return - - url = normalize_twitch_box_art_url(game.box_art) - try: - _download_and_save(url, game.twitch_id, game.box_art_file) - except Exception as exc: - raise self.retry(exc=exc) from exc - - -@shared_task(bind=True, queue="image-downloads", max_retries=3, default_retry_delay=300) -def download_campaign_image(self, campaign_pk: int) -> None: # noqa: ANN001 - """Download and cache the image for a single DropCampaign.""" - from twitch.models import DropCampaign # noqa: PLC0415 - - try: - campaign = DropCampaign.objects.get(pk=campaign_pk) - except DropCampaign.DoesNotExist: - return - - if not campaign.image_url: - return - - try: - _download_and_save(campaign.image_url, campaign.twitch_id, campaign.image_file) - except Exception as exc: - raise self.retry(exc=exc) from exc - - -@shared_task(bind=True, queue="image-downloads", max_retries=3, default_retry_delay=300) -def download_benefit_image(self, benefit_pk: int) -> None: # noqa: ANN001 - """Download and cache the image for a single DropBenefit.""" - from twitch.models import DropBenefit # noqa: PLC0415 - - try: - benefit = DropBenefit.objects.get(pk=benefit_pk) - except DropBenefit.DoesNotExist: - return - - if not benefit.image_asset_url: - return - - try: - _download_and_save( - url=benefit.image_asset_url, - name=benefit.twitch_id, - file_field=benefit.image_file, - ) - except Exception as exc: - raise self.retry(exc=exc) from exc - - -@shared_task(bind=True, queue="image-downloads", max_retries=3, default_retry_delay=300) -def download_reward_campaign_image(self, reward_pk: int) -> None: # noqa: ANN001 - """Download and cache the image for a single RewardCampaign.""" - from twitch.models import RewardCampaign # noqa: PLC0415 - - try: - reward = RewardCampaign.objects.get(pk=reward_pk) - except RewardCampaign.DoesNotExist: - return - - if not reward.image_url: - return - - try: - _download_and_save(reward.image_url, reward.twitch_id, reward.image_file) - except Exception as exc: - raise self.retry(exc=exc) from exc - - -@shared_task(queue="image-downloads") -def download_all_images() -> None: - """Weekly full-refresh: download images for all Twitch models.""" - call_command("download_box_art") - call_command("download_campaign_images", model="all") - - -# --------------------------------------------------------------------------- -# Twitch API tasks -# --------------------------------------------------------------------------- - - -@shared_task(bind=True, queue="api-fetches", max_retries=3, default_retry_delay=120) -def import_chat_badges(self) -> None: # noqa: ANN001 - """Fetch and upsert Twitch global chat badges via the Helix API.""" - try: - call_command("import_chat_badges") - except Exception as exc: - raise self.retry(exc=exc) from exc - - -# --------------------------------------------------------------------------- -# Maintenance -# --------------------------------------------------------------------------- - - -@shared_task(queue="default") -def backup_database() -> None: - """Create a zstd-compressed SQL backup of the dataset tables.""" - call_command("backup_db") diff --git a/twitch/tests/test_better_import_drops.py b/twitch/tests/test_better_import_drops.py index 64fda50..92effc5 100644 --- a/twitch/tests/test_better_import_drops.py +++ b/twitch/tests/test_better_import_drops.py @@ -1,22 +1,13 @@ import json from pathlib import Path -from tempfile import TemporaryDirectory from typing import TYPE_CHECKING from unittest import skipIf -from unittest.mock import patch from django.db import connection from django.test import TestCase from twitch.management.commands.better_import_drops import Command from twitch.management.commands.better_import_drops import detect_error_only_response -from twitch.management.commands.better_import_drops import detect_non_campaign_keyword -from twitch.management.commands.better_import_drops import ( - extract_operation_name_from_parsed, -) -from twitch.management.commands.better_import_drops import move_completed_file -from twitch.management.commands.better_import_drops import move_file_to_broken_subdir -from twitch.management.commands.better_import_drops import repair_partially_broken_json from twitch.models import DropBenefit from twitch.models import DropCampaign from twitch.models import Game @@ -435,92 +426,6 @@ class CampaignStructureDetectionTests(TestCase): structure: str | None = command._detect_campaign_structure(response) assert structure == "current_user_drop_campaigns" - def test_detects_user_drop_campaign_structure(self) -> None: - """Ensure user.dropCampaign structure is correctly detected.""" - command = Command() - - response: dict[str, object] = { - "data": { - "user": { - "id": "123", - "dropCampaign": {"id": "c1", "name": "Test Campaign"}, - "__typename": "User", - }, - }, - } - - structure: str | None = command._detect_campaign_structure(response) - assert structure == "user_drop_campaign" - - def test_detects_channel_viewer_campaigns_structure(self) -> None: - """Ensure channel.viewerDropCampaigns structure is correctly detected.""" - command = Command() - - response: dict[str, object] = { - "data": { - "channel": { - "id": "123", - "viewerDropCampaigns": [{"id": "c1", "name": "Test Campaign"}], - "__typename": "Channel", - }, - }, - } - - structure: str | None = command._detect_campaign_structure(response) - assert structure == "channel_viewer_campaigns" - - -class FileMoveUtilityTests(TestCase): - """Tests for imported/broken file move utility helpers.""" - - def test_move_completed_file_sanitizes_operation_directory_name(self) -> None: - """Ensure operation names are sanitized and campaign structure subdir is respected.""" - with TemporaryDirectory() as tmp_dir: - root_path = Path(tmp_dir) - imported_root = root_path / "imported" - source_file = root_path / "payload.json" - source_file.write_text("{}", encoding="utf-8") - - with patch( - "twitch.management.commands.better_import_drops.get_imported_directory_root", - return_value=imported_root, - ): - target_dir = move_completed_file( - file_path=source_file, - operation_name="My Op/Name\\v1", - campaign_structure="inventory_campaigns", - ) - - expected_dir = imported_root / "My_Op_Name_v1" / "inventory_campaigns" - assert target_dir == expected_dir - assert not source_file.exists() - assert (expected_dir / "payload.json").exists() - - def test_move_file_to_broken_subdir_avoids_duplicate_operation_segment( - self, - ) -> None: - """Ensure matching reason and operation names do not create duplicate directories.""" - with TemporaryDirectory() as tmp_dir: - root_path = Path(tmp_dir) - broken_root = root_path / "broken" - source_file = root_path / "broken_payload.json" - source_file.write_text("{}", encoding="utf-8") - - with patch( - "twitch.management.commands.better_import_drops.get_broken_directory_root", - return_value=broken_root, - ): - broken_dir = move_file_to_broken_subdir( - file_path=source_file, - subdir="validation_failed", - operation_name="validation_failed", - ) - - path_segments = broken_dir.as_posix().split("/") - assert path_segments.count("validation_failed") == 1 - assert not source_file.exists() - assert (broken_dir / "broken_payload.json").exists() - class OperationNameFilteringTests(TestCase): """Tests for filtering campaigns by operation_name field.""" @@ -959,179 +864,3 @@ class ErrorOnlyResponseDetectionTests(TestCase): result = detect_error_only_response(parsed_json) assert result == "error_only: unknown error" - - -class NonCampaignKeywordDetectionTests(TestCase): - """Tests for non-campaign operation keyword detection.""" - - def test_detects_known_non_campaign_operation(self) -> None: - """Ensure known operationName values are detected as non-campaign payloads.""" - raw_text = json.dumps({"extensions": {"operationName": "PlaybackAccessToken"}}) - - result = detect_non_campaign_keyword(raw_text) - assert result == "PlaybackAccessToken" - - def test_returns_none_for_unknown_operation(self) -> None: - """Ensure unrelated operation names are not flagged.""" - raw_text = json.dumps({"extensions": {"operationName": "DropCampaignDetails"}}) - - result = detect_non_campaign_keyword(raw_text) - assert result is None - - -class OperationNameExtractionTests(TestCase): - """Tests for operation name extraction across supported payload shapes.""" - - def test_extracts_operation_name_from_json_repair_tuple(self) -> None: - """Ensure extraction supports tuple payloads returned by json_repair.""" - payload = ( - {"extensions": {"operationName": "ViewerDropsDashboard"}}, - [{"json_repair": "log"}], - ) - - result = extract_operation_name_from_parsed(payload) - assert result == "ViewerDropsDashboard" - - def test_extracts_operation_name_from_list_payload(self) -> None: - """Ensure extraction inspects the first response in list payloads.""" - payload = [ - {"extensions": {"operationName": "Inventory"}}, - {"extensions": {"operationName": "IgnoredSecondItem"}}, - ] - - result = extract_operation_name_from_parsed(payload) - assert result == "Inventory" - - def test_returns_none_for_empty_list_payload(self) -> None: - """Ensure extraction returns None for empty list payloads.""" - result = extract_operation_name_from_parsed([]) - assert result is None - - -class JsonRepairTests(TestCase): - """Tests for partial JSON repair fallback behavior.""" - - def test_repair_filters_non_graphql_items_from_list(self) -> None: - """Ensure repaired list output only keeps GraphQL-like response objects.""" - raw_text = '[{"foo": 1}, {"data": {"currentUser": {"id": "1"}}}]' - - repaired = repair_partially_broken_json(raw_text) - parsed = json.loads(repaired) - - assert parsed == [{"data": {"currentUser": {"id": "1"}}}] - - -class ProcessFileWorkerTests(TestCase): - """Tests for process_file_worker early-return behaviors.""" - - def test_returns_reason_when_drop_campaign_key_missing(self) -> None: - """Ensure files without dropCampaign are marked failed with clear reason.""" - command = Command() - - repo_root: Path = Path(__file__).resolve().parents[2] - temp_path: Path = repo_root / "twitch" / "tests" / "tmp_no_drop_campaign.json" - temp_path.write_text( - json.dumps({"data": {"currentUser": {"id": "123"}}}), - encoding="utf-8", - ) - - try: - result = command.process_file_worker( - file_path=temp_path, - options={"crash_on_error": False, "skip_broken_moves": True}, - ) - finally: - if temp_path.exists(): - temp_path.unlink() - - assert result["success"] is False - assert result["broken_dir"] == "(skipped)" - assert result["reason"] == "no dropCampaign present" - - def test_returns_reason_for_error_only_response(self) -> None: - """Ensure error-only responses are marked failed with extracted reason.""" - command = Command() - - repo_root: Path = Path(__file__).resolve().parents[2] - temp_path: Path = repo_root / "twitch" / "tests" / "tmp_error_only.json" - temp_path.write_text( - json.dumps( - { - "errors": [{"message": "service timeout"}], - "data": None, - }, - ), - encoding="utf-8", - ) - - try: - result = command.process_file_worker( - file_path=temp_path, - options={"crash_on_error": False, "skip_broken_moves": True}, - ) - finally: - if temp_path.exists(): - temp_path.unlink() - - assert result["success"] is False - assert result["broken_dir"] == "(skipped)" - assert result["reason"] == "error_only: service timeout" - - def test_returns_reason_for_known_non_campaign_keyword(self) -> None: - """Ensure known non-campaign operation payloads are rejected with reason.""" - command = Command() - - repo_root: Path = Path(__file__).resolve().parents[2] - temp_path: Path = ( - repo_root / "twitch" / "tests" / "tmp_non_campaign_keyword.json" - ) - temp_path.write_text( - json.dumps( - { - "data": {"currentUser": {"id": "123"}}, - "extensions": {"operationName": "PlaybackAccessToken"}, - }, - ), - encoding="utf-8", - ) - - try: - result = command.process_file_worker( - file_path=temp_path, - options={"crash_on_error": False, "skip_broken_moves": True}, - ) - finally: - if temp_path.exists(): - temp_path.unlink() - - assert result["success"] is False - assert result["broken_dir"] == "(skipped)" - assert result["reason"] == "matched 'PlaybackAccessToken'" - - -class NormalizeResponsesTests(TestCase): - """Tests for response normalization across supported payload formats.""" - - def test_normalizes_batched_responses_wrapper(self) -> None: - """Ensure batched payloads under responses key are unwrapped and filtered.""" - command = Command() - - parsed_json: dict[str, object] = { - "responses": [ - {"data": {"currentUser": {"id": "1"}}}, - "invalid-item", - {"extensions": {"operationName": "Inventory"}}, - ], - } - - normalized = command._normalize_responses(parsed_json) - assert len(normalized) == 2 - assert normalized[0]["data"]["currentUser"]["id"] == "1" - assert normalized[1]["extensions"]["operationName"] == "Inventory" - - def test_returns_empty_list_for_empty_tuple_payload(self) -> None: - """Ensure empty tuple payloads from json_repair produce no responses.""" - command = Command() - - normalized = command._normalize_responses(()) # type: ignore[arg-type] - assert normalized == [] diff --git a/twitch/tests/test_import_chat_badges.py b/twitch/tests/test_import_chat_badges.py deleted file mode 100644 index bea6e90..0000000 --- a/twitch/tests/test_import_chat_badges.py +++ /dev/null @@ -1,150 +0,0 @@ -from io import StringIO -from unittest.mock import patch - -import httpx -import pytest -from django.core.management import CommandError -from django.core.management import call_command - -from twitch.models import ChatBadge -from twitch.models import ChatBadgeSet -from twitch.schemas import GlobalChatBadgesResponse - -pytestmark: pytest.MarkDecorator = pytest.mark.django_db - - -def _build_response(title: str = "VIP") -> GlobalChatBadgesResponse: - """Build a valid GlobalChatBadgesResponse payload for tests. - - Returns: - A validated Twitch global chat badges response object. - """ - return GlobalChatBadgesResponse.model_validate({ - "data": [ - { - "set_id": "vip", - "versions": [ - { - "id": "1", - "image_url_1x": "https://example.com/vip-1x.png", - "image_url_2x": "https://example.com/vip-2x.png", - "image_url_4x": "https://example.com/vip-4x.png", - "title": title, - "description": "VIP Badge", - "click_action": "visit_url", - "click_url": "https://help.twitch.tv", - }, - ], - }, - ], - }) - - -def test_raises_when_client_id_missing(monkeypatch: pytest.MonkeyPatch) -> None: - """Command should fail when client ID is not provided.""" - monkeypatch.delenv("TWITCH_CLIENT_ID", raising=False) - monkeypatch.delenv("TWITCH_CLIENT_SECRET", raising=False) - monkeypatch.delenv("TWITCH_ACCESS_TOKEN", raising=False) - - with pytest.raises(CommandError, match="Twitch Client ID is required"): - call_command("import_chat_badges", stdout=StringIO()) - - -def test_raises_when_token_and_secret_missing(monkeypatch: pytest.MonkeyPatch) -> None: - """Command should fail when no token and no client secret are available.""" - monkeypatch.delenv("TWITCH_CLIENT_SECRET", raising=False) - monkeypatch.delenv("TWITCH_ACCESS_TOKEN", raising=False) - - with pytest.raises(CommandError, match="Either --access-token or --client-secret"): - call_command( - "import_chat_badges", - "--client-id", - "client-id", - stdout=StringIO(), - ) - - -@pytest.mark.django_db -def test_import_creates_then_updates_existing_badge() -> None: - """Running import twice should update existing badges rather than duplicate them.""" - with patch( - "twitch.management.commands.import_chat_badges.Command._fetch_global_chat_badges", - side_effect=[_build_response("VIP"), _build_response("VIP Updated")], - ) as fetch_mock: - call_command( - "import_chat_badges", - "--client-id", - "client-id", - "--access-token", - "access-token", - stdout=StringIO(), - ) - call_command( - "import_chat_badges", - "--client-id", - "client-id", - "--access-token", - "access-token", - stdout=StringIO(), - ) - - assert fetch_mock.call_count == 2 - assert ChatBadgeSet.objects.count() == 1 - assert ChatBadge.objects.count() == 1 - - badge_set = ChatBadgeSet.objects.get(set_id="vip") - badge = ChatBadge.objects.get(badge_set=badge_set, badge_id="1") - assert badge.title == "VIP Updated" - assert badge.click_action == "visit_url" - assert badge.click_url == "https://help.twitch.tv" - - -@pytest.mark.django_db -def test_uses_client_credentials_when_access_token_missing() -> None: - """Command should obtain token from Twitch when no access token is provided.""" - with ( - patch( - "twitch.management.commands.import_chat_badges.Command._get_app_access_token", - return_value="generated-token", - ) as token_mock, - patch( - "twitch.management.commands.import_chat_badges.Command._fetch_global_chat_badges", - return_value=GlobalChatBadgesResponse.model_validate({"data": []}), - ) as fetch_mock, - ): - call_command( - "import_chat_badges", - "--client-id", - "client-id", - "--client-secret", - "client-secret", - stdout=StringIO(), - ) - - token_mock.assert_called_once_with("client-id", "client-secret") - fetch_mock.assert_called_once_with( - client_id="client-id", - access_token="generated-token", - ) - - -def test_wraps_http_errors_from_badges_fetch() -> None: - """Command should convert HTTP client errors to CommandError.""" - with ( - patch( - "twitch.management.commands.import_chat_badges.Command._fetch_global_chat_badges", - side_effect=httpx.HTTPError("boom"), - ), - pytest.raises( - CommandError, - match="Failed to fetch chat badges from Twitch API", - ), - ): - call_command( - "import_chat_badges", - "--client-id", - "client-id", - "--access-token", - "access-token", - stdout=StringIO(), - ) diff --git a/twitch/tests/test_tasks.py b/twitch/tests/test_tasks.py deleted file mode 100644 index 191b352..0000000 --- a/twitch/tests/test_tasks.py +++ /dev/null @@ -1,575 +0,0 @@ -from pathlib import Path -from types import SimpleNamespace -from unittest.mock import MagicMock -from unittest.mock import call -from unittest.mock import patch - -import httpx -import pytest - -from twitch.models import DropBenefit -from twitch.models import DropCampaign -from twitch.models import RewardCampaign -from twitch.tasks import _convert_to_modern_formats -from twitch.tasks import _download_and_save -from twitch.tasks import backup_database -from twitch.tasks import download_all_images -from twitch.tasks import download_benefit_image -from twitch.tasks import download_campaign_image -from twitch.tasks import download_game_image -from twitch.tasks import download_reward_campaign_image -from twitch.tasks import import_chat_badges -from twitch.tasks import import_twitch_file -from twitch.tasks import scan_pending_twitch_files - - -def test_scan_pending_twitch_files_dispatches_json_files( - monkeypatch: pytest.MonkeyPatch, - tmp_path: Path, -) -> None: - """It should dispatch an import task for each JSON file in the pending directory.""" - first = tmp_path / "one.json" - second = tmp_path / "two.json" - ignored = tmp_path / "notes.txt" - first.write_text("{}", encoding="utf-8") - second.write_text("{}", encoding="utf-8") - ignored.write_text("ignored", encoding="utf-8") - monkeypatch.setenv("TTVDROPS_PENDING_DIR", str(tmp_path)) - - with patch("twitch.tasks.import_twitch_file.delay") as delay_mock: - scan_pending_twitch_files.run() - - delay_mock.assert_has_calls([call(str(first)), call(str(second))], any_order=True) - assert delay_mock.call_count == 2 - - -def test_scan_pending_twitch_files_skips_when_env_missing( - monkeypatch: pytest.MonkeyPatch, -) -> None: - """It should do nothing when TTVDROPS_PENDING_DIR is not configured.""" - monkeypatch.delenv("TTVDROPS_PENDING_DIR", raising=False) - - with patch("twitch.tasks.import_twitch_file.delay") as delay_mock: - scan_pending_twitch_files.run() - - delay_mock.assert_not_called() - - -def test_import_twitch_file_calls_importer_for_existing_file(tmp_path: Path) -> None: - """It should run BetterImportDrops with the provided path when the file exists.""" - source = tmp_path / "drops.json" - source.write_text("{}", encoding="utf-8") - - with patch( - "twitch.management.commands.better_import_drops.Command.handle", - ) as handle_mock: - import_twitch_file.run(str(source)) - - assert handle_mock.call_count == 1 - assert handle_mock.call_args.kwargs["path"] == source - - -def test_import_twitch_file_retries_on_import_error(tmp_path: Path) -> None: - """It should retry when the importer raises an exception.""" - source = tmp_path / "drops.json" - source.write_text("{}", encoding="utf-8") - error = RuntimeError("boom") - - with ( - patch( - "twitch.management.commands.better_import_drops.Command.handle", - side_effect=error, - ), - patch.object( - import_twitch_file, - "retry", - side_effect=RuntimeError("retried"), - ) as retry_mock, - pytest.raises(RuntimeError, match="retried"), - ): - import_twitch_file.run(str(source)) - - retry_mock.assert_called_once_with(exc=error) - - -def test_download_and_save_skips_when_image_already_cached() -> None: - """It should not download when the target ImageField already has a file name.""" - file_field = MagicMock() - file_field.name = "already-there.jpg" - - with patch("twitch.tasks.httpx.Client") as client_mock: - result = _download_and_save( - url="https://example.com/test.jpg", - name="game-1", - file_field=file_field, - ) - - assert result is False - client_mock.assert_not_called() - - -def test_download_and_save_saves_downloaded_content() -> None: - """It should save downloaded bytes and trigger modern format conversion.""" - file_field = MagicMock() - file_field.name = "" - file_field.path = "C:/cache/game-1.png" - - response = MagicMock() - response.content = b"img-bytes" - response.raise_for_status.return_value = None - - client = MagicMock() - client.get.return_value = response - client_cm = MagicMock() - client_cm.__enter__.return_value = client - - with ( - patch("twitch.tasks.httpx.Client", return_value=client_cm), - patch("twitch.tasks._convert_to_modern_formats") as convert_mock, - ): - result = _download_and_save( - url="https://example.com/path/picture.png", - name="game-1", - file_field=file_field, - ) - - assert result is True - file_field.save.assert_called_once() - assert file_field.save.call_args.args[0] == "game-1.png" - assert file_field.save.call_args.kwargs["save"] is True - convert_mock.assert_called_once_with(Path("C:/cache/game-1.png")) - - -def test_download_and_save_returns_false_on_http_error() -> None: - """It should return False when HTTP requests fail.""" - file_field = MagicMock() - file_field.name = "" - - client = MagicMock() - client.get.side_effect = httpx.HTTPError("network down") - client_cm = MagicMock() - client_cm.__enter__.return_value = client - - with patch("twitch.tasks.httpx.Client", return_value=client_cm): - result = _download_and_save( - url="https://example.com/path/picture.png", - name="game-1", - file_field=file_field, - ) - - assert result is False - file_field.save.assert_not_called() - - -def test_download_game_image_downloads_normalized_twitch_url() -> None: - """It should normalize Twitch box art URLs before downloading.""" - game = SimpleNamespace( - box_art="https://static-cdn.jtvnw.net/ttv-boxart/1-{width}x{height}.jpg", - twitch_id="123", - box_art_file=MagicMock(), - ) - - with ( - patch("twitch.models.Game.objects.get", return_value=game), - patch( - "twitch.utils.is_twitch_box_art_url", - return_value=True, - ) as is_twitch_mock, - patch( - "twitch.utils.normalize_twitch_box_art_url", - return_value="https://cdn.example.com/box.jpg", - ) as normalize_mock, - patch("twitch.tasks._download_and_save") as save_mock, - ): - download_game_image.run(1) - - is_twitch_mock.assert_called_once_with(game.box_art) - normalize_mock.assert_called_once_with(game.box_art) - save_mock.assert_called_once_with( - "https://cdn.example.com/box.jpg", - "123", - game.box_art_file, - ) - - -def test_download_game_image_retries_on_download_error() -> None: - """It should retry when the image download helper fails unexpectedly.""" - game = SimpleNamespace( - box_art="https://static-cdn.jtvnw.net/ttv-boxart/1-{width}x{height}.jpg", - twitch_id="123", - box_art_file=MagicMock(), - ) - error = RuntimeError("boom") - - with ( - patch("twitch.models.Game.objects.get", return_value=game), - patch("twitch.utils.is_twitch_box_art_url", return_value=True), - patch( - "twitch.utils.normalize_twitch_box_art_url", - return_value="https://cdn.example.com/box.jpg", - ), - patch("twitch.tasks._download_and_save", side_effect=error), - patch.object( - download_game_image, - "retry", - side_effect=RuntimeError("retried"), - ) as retry_mock, - pytest.raises(RuntimeError, match="retried"), - ): - download_game_image.run(1) - - retry_mock.assert_called_once_with(exc=error) - - -def test_download_all_images_calls_expected_commands() -> None: - """It should invoke both image import management commands.""" - with patch("twitch.tasks.call_command") as call_command_mock: - download_all_images.run() - - call_command_mock.assert_has_calls([ - call("download_box_art"), - call("download_campaign_images", model="all"), - ]) - - -def test_import_chat_badges_retries_on_error() -> None: - """It should retry when import_chat_badges command execution fails.""" - error = RuntimeError("boom") - - with ( - patch("twitch.tasks.call_command", side_effect=error), - patch.object( - import_chat_badges, - "retry", - side_effect=RuntimeError("retried"), - ) as retry_mock, - pytest.raises(RuntimeError, match="retried"), - ): - import_chat_badges.run() - - retry_mock.assert_called_once_with(exc=error) - - -def test_backup_database_calls_backup_command() -> None: - """It should invoke the backup_db management command.""" - with patch("twitch.tasks.call_command") as call_command_mock: - backup_database.run() - - call_command_mock.assert_called_once_with("backup_db") - - -def test_convert_to_modern_formats_skips_non_image_files(tmp_path: Path) -> None: - """It should skip files that are not jpg, jpeg, or png.""" - text_file = tmp_path / "document.txt" - text_file.write_text("Not an image", encoding="utf-8") - - with patch("PIL.Image.open") as open_mock: - _convert_to_modern_formats(text_file) - - open_mock.assert_not_called() - - -def test_convert_to_modern_formats_skips_missing_files(tmp_path: Path) -> None: - """It should skip files that do not exist.""" - missing_file = tmp_path / "nonexistent.jpg" - - with patch("PIL.Image.open") as open_mock: - _convert_to_modern_formats(missing_file) - - open_mock.assert_not_called() - - -def test_convert_to_modern_formats_converts_rgb_image(tmp_path: Path) -> None: - """It should save image in WebP and AVIF formats for RGB images.""" - original = tmp_path / "image.jpg" - original.write_bytes(b"fake-jpeg") - - mock_image = MagicMock() - mock_image.mode = "RGB" - mock_copy = MagicMock() - mock_image.copy.return_value = mock_copy - - with ( - patch("PIL.Image.open") as open_mock, - ): - open_mock.return_value.__enter__.return_value = mock_image - _convert_to_modern_formats(original) - - open_mock.assert_called_once_with(original) - assert mock_copy.save.call_count == 2 - webp_call = [c for c in mock_copy.save.call_args_list if "webp" in str(c)] - avif_call = [c for c in mock_copy.save.call_args_list if "avif" in str(c)] - assert len(webp_call) == 1 - assert len(avif_call) == 1 - - -def test_convert_to_modern_formats_converts_rgba_image_with_background( - tmp_path: Path, -) -> None: - """It should create RGB background and paste RGBA image onto it.""" - original = tmp_path / "image.png" - original.write_bytes(b"fake-png") - - mock_rgba = MagicMock() - mock_rgba.mode = "RGBA" - mock_rgba.split.return_value = [MagicMock(), MagicMock(), MagicMock(), MagicMock()] - mock_rgba.size = (100, 100) - mock_rgba.convert.return_value = mock_rgba - - mock_bg = MagicMock() - - with ( - patch("PIL.Image.open") as open_mock, - patch("PIL.Image.new", return_value=mock_bg) as new_mock, - ): - open_mock.return_value.__enter__.return_value = mock_rgba - _convert_to_modern_formats(original) - - new_mock.assert_called_once_with("RGB", (100, 100), (255, 255, 255)) - mock_bg.paste.assert_called_once() - assert mock_bg.save.call_count == 2 - - -def test_convert_to_modern_formats_handles_conversion_error(tmp_path: Path) -> None: - """It should gracefully handle format conversion failures.""" - original = tmp_path / "image.jpg" - original.write_bytes(b"fake-jpeg") - - mock_image = MagicMock() - mock_image.mode = "RGB" - mock_image.copy.return_value = MagicMock() - mock_image.copy.return_value.save.side_effect = Exception("PIL error") - - with ( - patch("PIL.Image.open") as open_mock, - ): - open_mock.return_value.__enter__.return_value = mock_image - _convert_to_modern_formats(original) # Should not raise - - -def test_download_campaign_image_downloads_when_url_exists() -> None: - """It should download and save campaign image when image_url is present.""" - campaign = SimpleNamespace( - image_url="https://example.com/campaign.jpg", - twitch_id="camp123", - image_file=MagicMock(), - ) - - with ( - patch("twitch.models.DropCampaign.objects.get", return_value=campaign), - patch("twitch.tasks._download_and_save", return_value=True) as save_mock, - ): - download_campaign_image.run(1) - - save_mock.assert_called_once_with( - "https://example.com/campaign.jpg", - "camp123", - campaign.image_file, - ) - - -def test_download_campaign_image_skips_when_no_url() -> None: - """It should skip when campaign has no image_url.""" - campaign = SimpleNamespace( - image_url=None, - twitch_id="camp123", - image_file=MagicMock(), - ) - - with ( - patch("twitch.models.DropCampaign.objects.get", return_value=campaign), - patch("twitch.tasks._download_and_save") as save_mock, - ): - download_campaign_image.run(1) - - save_mock.assert_not_called() - - -def test_download_campaign_image_skips_when_not_found() -> None: - """It should skip when campaign does not exist.""" - with ( - patch( - "twitch.models.DropCampaign.objects.get", - side_effect=DropCampaign.DoesNotExist(), - ), - patch("twitch.tasks._download_and_save") as save_mock, - ): - download_campaign_image.run(1) - - save_mock.assert_not_called() - - -def test_download_campaign_image_retries_on_error() -> None: - """It should retry when image download fails unexpectedly.""" - campaign = SimpleNamespace( - image_url="https://example.com/campaign.jpg", - twitch_id="camp123", - image_file=MagicMock(), - ) - error = RuntimeError("boom") - - with ( - patch("twitch.models.DropCampaign.objects.get", return_value=campaign), - patch("twitch.tasks._download_and_save", side_effect=error), - patch.object( - download_campaign_image, - "retry", - side_effect=RuntimeError("retried"), - ) as retry_mock, - pytest.raises(RuntimeError, match="retried"), - ): - download_campaign_image.run(1) - - retry_mock.assert_called_once_with(exc=error) - - -def test_download_benefit_image_downloads_when_url_exists() -> None: - """It should download and save benefit image when image_asset_url is present.""" - benefit = SimpleNamespace( - image_asset_url="https://example.com/benefit.png", - twitch_id="benefit123", - image_file=MagicMock(), - ) - - with ( - patch("twitch.models.DropBenefit.objects.get", return_value=benefit), - patch("twitch.tasks._download_and_save", return_value=True) as save_mock, - ): - download_benefit_image.run(1) - - save_mock.assert_called_once_with( - url="https://example.com/benefit.png", - name="benefit123", - file_field=benefit.image_file, - ) - - -def test_download_benefit_image_skips_when_no_url() -> None: - """It should skip when benefit has no image_asset_url.""" - benefit = SimpleNamespace( - image_asset_url=None, - twitch_id="benefit123", - image_file=MagicMock(), - ) - - with ( - patch("twitch.models.DropBenefit.objects.get", return_value=benefit), - patch("twitch.tasks._download_and_save") as save_mock, - ): - download_benefit_image.run(1) - - save_mock.assert_not_called() - - -def test_download_benefit_image_skips_when_not_found() -> None: - """It should skip when benefit does not exist.""" - with ( - patch( - "twitch.models.DropBenefit.objects.get", - side_effect=DropBenefit.DoesNotExist(), - ), - patch("twitch.tasks._download_and_save") as save_mock, - ): - download_benefit_image.run(1) - - save_mock.assert_not_called() - - -def test_download_benefit_image_retries_on_error() -> None: - """It should retry when image download fails unexpectedly.""" - benefit = SimpleNamespace( - image_asset_url="https://example.com/benefit.png", - twitch_id="benefit123", - image_file=MagicMock(), - ) - error = RuntimeError("boom") - - with ( - patch("twitch.models.DropBenefit.objects.get", return_value=benefit), - patch("twitch.tasks._download_and_save", side_effect=error), - patch.object( - download_benefit_image, - "retry", - side_effect=RuntimeError("retried"), - ) as retry_mock, - pytest.raises(RuntimeError, match="retried"), - ): - download_benefit_image.run(1) - - retry_mock.assert_called_once_with(exc=error) - - -def test_download_reward_campaign_image_downloads_when_url_exists() -> None: - """It should download and save reward campaign image when image_url is present.""" - reward = SimpleNamespace( - image_url="https://example.com/reward.jpg", - twitch_id="reward123", - image_file=MagicMock(), - ) - - with ( - patch("twitch.models.RewardCampaign.objects.get", return_value=reward), - patch("twitch.tasks._download_and_save", return_value=True) as save_mock, - ): - download_reward_campaign_image.run(1) - - save_mock.assert_called_once_with( - "https://example.com/reward.jpg", - "reward123", - reward.image_file, - ) - - -def test_download_reward_campaign_image_skips_when_no_url() -> None: - """It should skip when reward has no image_url.""" - reward = SimpleNamespace( - image_url=None, - twitch_id="reward123", - image_file=MagicMock(), - ) - - with ( - patch("twitch.models.RewardCampaign.objects.get", return_value=reward), - patch("twitch.tasks._download_and_save") as save_mock, - ): - download_reward_campaign_image.run(1) - - save_mock.assert_not_called() - - -def test_download_reward_campaign_image_skips_when_not_found() -> None: - """It should skip when reward does not exist.""" - with ( - patch( - "twitch.models.RewardCampaign.objects.get", - side_effect=RewardCampaign.DoesNotExist(), - ), - patch("twitch.tasks._download_and_save") as save_mock, - ): - download_reward_campaign_image.run(1) - - save_mock.assert_not_called() - - -def test_download_reward_campaign_image_retries_on_error() -> None: - """It should retry when image download fails unexpectedly.""" - reward = SimpleNamespace( - image_url="https://example.com/reward.jpg", - twitch_id="reward123", - image_file=MagicMock(), - ) - error = RuntimeError("boom") - - with ( - patch("twitch.models.RewardCampaign.objects.get", return_value=reward), - patch("twitch.tasks._download_and_save", side_effect=error), - patch.object( - download_reward_campaign_image, - "retry", - side_effect=RuntimeError("retried"), - ) as retry_mock, - pytest.raises(RuntimeError, match="retried"), - ): - download_reward_campaign_image.run(1) - - retry_mock.assert_called_once_with(exc=error)