Compare commits

..

2 commits

Author SHA1 Message Date
66ea46cf23
Use celery tasks instead of systemd timers for periodic work; and add more tests
All checks were successful
Deploy to Server / deploy (push) Successful in 26s
2026-04-08 03:23:18 +02:00
333476b30b
Revise copilot-instructions.md 2026-04-07 20:37:03 +02:00
26 changed files with 2173 additions and 182 deletions

View file

@ -1,7 +1,13 @@
# Django Configuration # Environment variables for ttvdrops Django application
# Set to False in production # Copy this file to `.env` and fill in the appropriate values before running the application.
# For local development, you can use the default values provided here, but make sure to change the secret key and Twitch API credentials for production use.
# Debug mode (set to False in production)
DEBUG=True DEBUG=True
# Base URL used for absolute URL generation
BASE_URL=https://ttvdrops.lovinator.space
# Django Secret Key # Django Secret Key
# Generate a new secret key for production: python -c 'from django.core.management.utils import get_random_secret_key; print(get_random_secret_key())' # Generate a new secret key for production: python -c 'from django.core.management.utils import get_random_secret_key; print(get_random_secret_key())'
DJANGO_SECRET_KEY=your-secret-key-here DJANGO_SECRET_KEY=your-secret-key-here
@ -11,6 +17,8 @@ DJANGO_SECRET_KEY=your-secret-key-here
# You can use either an app access token or user access token # You can use either an app access token or user access token
TWITCH_CLIENT_ID=your-twitch-client-id TWITCH_CLIENT_ID=your-twitch-client-id
TWITCH_CLIENT_SECRET=your-twitch-client-secret TWITCH_CLIENT_SECRET=your-twitch-client-secret
# Optional: if omitted, some commands can fetch an app access token using the client credentials
TWITCH_ACCESS_TOKEN=
# Email Configuration # Email Configuration
# SMTP Host (examples below) # SMTP Host (examples below)
@ -42,10 +50,24 @@ POSTGRES_HOST=/run/postgresql
# Note: Changed from 5432 to 6432 to use PgBouncer # Note: Changed from 5432 to 6432 to use PgBouncer
POSTGRES_PORT=5432 POSTGRES_PORT=5432
# Optional database connection tuning
# CONN_MAX_AGE=60
# CONN_HEALTH_CHECKS=True
# DB_CONNECT_TIMEOUT=10
# Use SQLite instead of PostgreSQL (useful for local development)
USE_SQLITE=False
# Where to store Twitch API responses # Where to store Twitch API responses
TTVDROPS_IMPORTED_DIR=/mnt/fourteen/Data/Responses/imported TTVDROPS_IMPORTED_DIR=/mnt/fourteen/Data/Responses/imported
# Where to store Twitch API responses that failed processing (e.g. due to missing fields or unrecognized formats)
TTVDROPS_BROKEN_DIR=/mnt/fourteen/Data/Responses/broken TTVDROPS_BROKEN_DIR=/mnt/fourteen/Data/Responses/broken
# Redis Configuration # Where to store Twitch API responses that are pending processing (e.g. waiting for a periodic task to process them)
TTVDROPS_PENDING_DIR=/mnt/fourteen/Data/Responses/pending
# Redis Configuration for caching and Celery
REDIS_URL_CACHE=unix:///var/run/redis/redis.sock?db=0 REDIS_URL_CACHE=unix:///var/run/redis/redis.sock?db=0
REDIS_URL_CELERY=unix:///var/run/redis/redis.sock?db=1 REDIS_URL_CELERY=unix:///var/run/redis/redis.sock?db=1

View file

@ -1,78 +1,40 @@
# Django Guidelines # Project Guidelines
## Python Best Practices ## Code Style
- Follow PEP 8 with 120 character line limit - Follow PEP 8 with a 120 character line limit.
- Use double quotes for Python strings - Use double quotes for Python strings.
- Keep comments focused on why, not what.
## Django Best Practices ## Build and Test
- Follow Django's "batteries included" philosophy - use built-in features before third-party packages - Install/update dependencies with `uv sync -U`.
- Prioritize security and follow Django's security best practices - Run tests with `uv run pytest`.
- Use Django's ORM effectively - Run Django commands with `uv run python manage.py <command>`.
- Typical local sequence: `uv run python manage.py makemigrations`, `uv run python manage.py migrate`, `uv run python manage.py runserver`.
- Before finishing model changes, verify migrations with `uv run python manage.py makemigrations --check --dry-run`.
## Models ## Architecture
- Add `__str__` methods to all models for a better admin interface - This is a Django multi-app project:
- Use `related_name` for foreign keys when needed - `twitch/`, `kick/`, `chzzk/`, and `youtube/` hold platform-specific domain logic.
- Define `Meta` class with appropriate options (ordering, verbose_name, etc.) - `core/` holds shared cross-app utilities (base URL, SEO, shared views/helpers).
- Use `blank=True` for optional form fields, `null=True` for optional database fields - `config/` holds settings, URLs, WSGI, and Celery app wiring.
- Favor Django's batteries-included approach before adding new third-party dependencies.
- Follow MTV with fat models and thin views; keep template logic simple.
## Views ## Conventions
- Always validate and sanitize user input - Models should include `__str__` and explicit `Meta` options (ordering/indexes/verbose names) where relevant.
- Handle exceptions gracefully with try/except blocks - Use `blank=True` for optional form fields and `null=True` for optional database fields.
- Use `get_object_or_404` instead of manual exception handling - Prefer `get_object_or_404` over manual DoesNotExist handling in views.
- Implement proper pagination for list views - URL patterns should use descriptive names and trailing slashes.
- For schema models that parse external APIs, prefer Pydantic `extra="forbid"` and normalize variant payloads with validators.
- Add tests for both positive and negative scenarios, especially when handling multiple upstream payload formats.
## URLs ## Environment and Pitfalls
- Use descriptive URL names for reverse URL lookups - `DJANGO_SECRET_KEY` must be set; the app exits if it is missing.
- Always end URL patterns with a trailing slash - Database defaults to PostgreSQL unless `USE_SQLITE=true`; tests use in-memory SQLite.
- Redis is used for cache and Celery. Keep `REDIS_URL_CACHE` and `REDIS_URL_CELERY` configured in environments that run background tasks.
- Static and media files are stored under the platform data directory configured in `config/settings.py`, not in the repo tree.
## Templates ## Documentation
- Use template inheritance with base templates - Link to existing docs instead of duplicating them.
- Use template tags and filters for common operations - Use `README.md` as the source of truth for system setup, deployment, Celery, and management command examples.
- Avoid complex logic in templates - move it to views or template tags - Only create new documentation files when explicitly requested.
- Use static files properly with `{% load static %}`
- Avoid hiding controls with `<details>` or other collapse elements unless explicitly needed
- Prioritize accessibility and discoverability of features
## Settings
- Use environment variables in a single `settings.py` file
- Never commit secrets to version control
## Database
- Use migrations for all database changes
- Optimize queries with `select_related` and `prefetch_related`
- Use database indexes for frequently queried fields
- Avoid N+1 query problems
## Testing
- Always write unit tests and check that they pass for new features
- Test both positive and negative scenarios
## Pydantic Schemas
- Use `extra="forbid"` in model_config to catch API changes and new fields from external APIs
- Explicitly document all optional fields from different API operation formats
- This ensures validation failures alert you to API changes rather than silently ignoring new data
- When supporting multiple API formats (e.g., ViewerDropsDashboard vs Inventory operations):
- Make fields optional that differ between formats
- Use field validators or model validators to normalize data between formats
- Write tests covering both operation formats to ensure backward compatibility
## Architectural Patterns
- Follow Django's MTV (Model-Template-View) paradigm; keep business logic in models/services and presentation in templates
- Favor fat models and thin views; extract reusable business logic into services/helpers when complexity grows
- Keep forms and serializers (if added) responsible for validation; avoid duplicating validation in views
- Avoid circular dependencies; keep modules cohesive and decoupled
- Use settings modules and environment variables to configure behavior, not hardcoded constants
## Technology Stack
- Python 3, Django, PostgreSQL, Redis (Valkey), Celery for background tasks
- HTML templates with Django templating; static assets served from `static/` and collected to `staticfiles/`
- Management commands in `twitch/management/commands/` for data import and maintenance tasks
- Use `pyproject.toml` + uv for dependency and environment management
- Use `uv run python manage.py <command>` to run Django management commands
- Use `uv run pytest` to run tests
## Documentation & Project Organization
- Only create documentation files when explicitly requested by the user
- Do not generate markdown files summarizing work or changes unless asked
- Keep code comments focused on "why" not "what"; the code itself should be clear
- Update existing documentation rather than creating new files

View file

@ -61,13 +61,20 @@ sudo systemctl enable --now ttvdrops.service
curl --unix-socket /run/ttvdrops/ttvdrops.sock https://ttvdrops.lovinator.space curl --unix-socket /run/ttvdrops/ttvdrops.sock https://ttvdrops.lovinator.space
``` ```
Install and enable timers: Enable Celery worker and Beat services:
```bash ```bash
sudo install -m 0644 tools/systemd/ttvdrops-backup.{service,timer} /etc/systemd/system/ sudo install -m 0644 tools/systemd/ttvdrops-celery-worker.service /etc/systemd/system/
sudo install -m 0644 tools/systemd/ttvdrops-import-drops.{service,timer} /etc/systemd/system/ sudo install -m 0644 tools/systemd/ttvdrops-celery-beat.service /etc/systemd/system/
sudo systemctl daemon-reload sudo systemctl daemon-reload
sudo systemctl enable --now ttvdrops-backup.timer ttvdrops-import-drops.timer sudo systemctl enable --now ttvdrops-celery-worker.service
sudo systemctl enable --now ttvdrops-celery-beat.service
```
Set `TTVDROPS_PENDING_DIR` in `.env` to the directory where Twitch JSON drop files are dropped:
```env
TTVDROPS_PENDING_DIR=/mnt/fourteen/Data/Responses/pending
``` ```
## Development ## Development
@ -83,13 +90,40 @@ uv run pytest
## Celery ## Celery
Start a worker: Celery powers all periodic and background work. Three services are required:
| Service file | Queues | Purpose |
|---|---|---|
| `ttvdrops-celery-worker.service` | `imports`, `api-fetches`, `default` | Twitch/Kick/Chzzk imports, backup |
| `ttvdrops-celery-beat.service` | — | Periodic task scheduler (Beat) |
Start workers manually during development:
```bash ```bash
uv run celery -A config worker --loglevel=info # All-in-one worker (development)
uv run celery -A config beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler uv run celery -A config worker --queues imports,api-fetches,image-downloads,default --loglevel=info
# Beat scheduler (requires database Beat tables — run migrate first)
uv run celery -A config beat --scheduler django_celery_beat.schedulers:DatabaseScheduler --loglevel=info
# Monitor tasks in the browser (optional)
uv run celery -A config flower
``` ```
**Periodic tasks configured via `CELERY_BEAT_SCHEDULE`:**
| Task | Schedule | Queue |
|---|---|---|
| `twitch.tasks.scan_pending_twitch_files` | every 10 s | `imports` |
| `kick.tasks.import_kick_drops` | :01/:16/:31/:46 | `api-fetches` |
| `chzzk.tasks.discover_chzzk_campaigns` | every 2 h | `api-fetches` |
| `twitch.tasks.backup_database` | daily 02:15 UTC | `default` |
| `twitch.tasks.download_all_images` | Sunday 04:00 UTC | `image-downloads` |
| `twitch.tasks.import_chat_badges` | Sunday 03:00 UTC | `api-fetches` |
Image downloads also run **immediately** on record creation via `post_save` signals
(`Game`, `DropCampaign`, `DropBenefit`, `RewardCampaign`).
## Import Drops ## Import Drops
```bash ```bash

26
chzzk/tasks.py Normal file
View file

@ -0,0 +1,26 @@
from __future__ import annotations
import logging
from celery import shared_task
from django.core.management import call_command
logger = logging.getLogger("ttvdrops.tasks")
@shared_task(bind=True, queue="imports", max_retries=3, default_retry_delay=60)
def import_chzzk_campaign_task(self, campaign_no: int) -> None: # noqa: ANN001
"""Import a single Chzzk campaign by its campaign number."""
try:
call_command("import_chzzk_campaign", str(campaign_no))
except Exception as exc:
raise self.retry(exc=exc) from exc
@shared_task(bind=True, queue="api-fetches", max_retries=3, default_retry_delay=120)
def discover_chzzk_campaigns(self) -> None: # noqa: ANN001
"""Discover and import the latest Chzzk campaigns (equivalent to --latest flag)."""
try:
call_command("import_chzzk_campaign", latest=True)
except Exception as exc:
raise self.retry(exc=exc) from exc

View file

@ -0,0 +1,481 @@
from datetime import timedelta
from io import StringIO
from unittest.mock import Mock
from unittest.mock import call
from unittest.mock import patch
import pytest
import requests
from django.core.management import CommandError
from django.core.management import call_command
from django.test import TestCase
from django.utils import timezone
from chzzk.management.commands.import_chzzk_campaign import (
Command as ImportChzzkCampaignCommand,
)
from chzzk.models import ChzzkCampaign
from chzzk.models import ChzzkReward
from chzzk.schemas import ChzzkCampaignV2
from chzzk.schemas import ChzzkRewardV2
class ImportChzzkCampaignRangeCommandTest(TestCase):
"""Tests for the import_chzzk_campaign_range management command."""
def test_imports_campaigns_in_range_descending_by_default(self) -> None:
"""Test that campaigns are imported in descending order when start > end."""
stdout = StringIO()
with patch(
"chzzk.management.commands.import_chzzk_campaign_range.call_command",
) as call_command_mock:
call_command(
"import_chzzk_campaign_range",
"5",
"3",
stdout=stdout,
)
# Verify call_command was called for each campaign in descending order
expected_calls = [
call("import_chzzk_campaign", "5"),
call("import_chzzk_campaign", "4"),
call("import_chzzk_campaign", "3"),
]
call_command_mock.assert_has_calls(expected_calls)
assert call_command_mock.call_count == 3
def test_imports_campaigns_in_range_ascending_by_default(self) -> None:
"""Test that campaigns are imported in ascending order when start < end."""
stdout = StringIO()
with patch(
"chzzk.management.commands.import_chzzk_campaign_range.call_command",
) as call_command_mock:
call_command(
"import_chzzk_campaign_range",
"3",
"5",
stdout=stdout,
)
expected_calls = [
call("import_chzzk_campaign", "3"),
call("import_chzzk_campaign", "4"),
call("import_chzzk_campaign", "5"),
]
call_command_mock.assert_has_calls(expected_calls)
assert call_command_mock.call_count == 3
def test_imports_single_campaign_when_start_equals_end(self) -> None:
"""Test that a single campaign is imported when start equals end."""
stdout = StringIO()
with patch(
"chzzk.management.commands.import_chzzk_campaign_range.call_command",
) as call_command_mock:
call_command(
"import_chzzk_campaign_range",
"5",
"5",
stdout=stdout,
)
call_command_mock.assert_called_once_with("import_chzzk_campaign", "5")
def test_respects_custom_step_parameter(self) -> None:
"""Test that custom step parameter is respected."""
stdout = StringIO()
with patch(
"chzzk.management.commands.import_chzzk_campaign_range.call_command",
) as call_command_mock:
call_command(
"import_chzzk_campaign_range",
"1",
"10",
"--step",
"2",
stdout=stdout,
)
expected_calls = [
call("import_chzzk_campaign", "1"),
call("import_chzzk_campaign", "3"),
call("import_chzzk_campaign", "5"),
call("import_chzzk_campaign", "7"),
call("import_chzzk_campaign", "9"),
]
call_command_mock.assert_has_calls(expected_calls)
assert call_command_mock.call_count == 5
def test_respects_custom_negative_step(self) -> None:
"""Test that custom negative step parameter works correctly."""
stdout = StringIO()
with patch(
"chzzk.management.commands.import_chzzk_campaign_range.call_command",
) as call_command_mock:
call_command(
"import_chzzk_campaign_range",
"10",
"1",
"--step",
"-2",
stdout=stdout,
)
expected_calls = [
call("import_chzzk_campaign", "10"),
call("import_chzzk_campaign", "8"),
call("import_chzzk_campaign", "6"),
call("import_chzzk_campaign", "4"),
call("import_chzzk_campaign", "2"),
]
call_command_mock.assert_has_calls(expected_calls)
assert call_command_mock.call_count == 5
def test_handles_command_error_gracefully(self) -> None:
"""Test that CommandError from import_chzzk_campaign is caught and reported."""
stdout = StringIO()
stderr = StringIO()
def side_effect(command: str, *args: str, **kwargs: object) -> None:
if "4" in args:
msg = "Campaign 4 not found"
raise CommandError(msg)
with patch(
"chzzk.management.commands.import_chzzk_campaign_range.call_command",
side_effect=side_effect,
):
call_command(
"import_chzzk_campaign_range",
"3",
"5",
stdout=stdout,
stderr=stderr,
)
output = stdout.getvalue()
assert "Importing campaign 3" in output
assert "Importing campaign 4" in output
assert "Importing campaign 5" in output
assert "Failed campaign 4" in output
assert "Campaign 4 not found" in output
assert "Batch import complete" in output
def test_continues_after_command_error(self) -> None:
"""Test that import continues after encountering a CommandError."""
call_count = 0
def side_effect(command: str, campaign_no: str) -> None:
nonlocal call_count
call_count += 1
if campaign_no == "4":
msg = "Campaign 4 error"
raise CommandError(msg)
with patch(
"chzzk.management.commands.import_chzzk_campaign_range.call_command",
side_effect=side_effect,
):
call_command(
"import_chzzk_campaign_range",
"3",
"5",
stdout=StringIO(),
)
# Verify all campaigns were attempted
assert call_count == 3
def test_outputs_success_messages(self) -> None:
"""Test that success messages are written to stdout."""
stdout = StringIO()
with patch(
"chzzk.management.commands.import_chzzk_campaign_range.call_command",
):
call_command(
"import_chzzk_campaign_range",
"1",
"2",
stdout=stdout,
)
output: str = stdout.getvalue()
assert "Importing campaigns from 1 to 2 with step 1" in output
assert "Batch import complete" in output
def test_raises_error_when_step_is_zero(self) -> None:
"""Test that ValueError is raised when step is 0."""
with pytest.raises(ValueError, match="Step cannot be 0"):
call_command(
"import_chzzk_campaign_range",
"1",
"5",
"--step",
"0",
stdout=StringIO(),
)
def test_handles_large_range(self) -> None:
"""Test that large ranges are handled correctly."""
stdout = StringIO()
with patch(
"chzzk.management.commands.import_chzzk_campaign_range.call_command",
) as call_command_mock:
call_command(
"import_chzzk_campaign_range",
"1",
"100",
"--step",
"10",
stdout=stdout,
)
assert call_command_mock.call_count == 10
first_call = call_command_mock.call_args_list[0]
assert first_call == call("import_chzzk_campaign", "1")
last_call = call_command_mock.call_args_list[-1]
assert last_call == call("import_chzzk_campaign", "91")
class ImportChzzkCampaignCommandTest(TestCase):
"""Tests for the import_chzzk_campaign management command."""
def _create_campaign(self, campaign_no: int) -> ChzzkCampaign:
now = timezone.now()
return ChzzkCampaign.objects.create(
campaign_no=campaign_no,
title=f"Campaign {campaign_no}",
description="Campaign description",
category_type="game",
category_id="1",
category_value="Game",
service_id="chzzk",
state="ACTIVE",
start_date=now - timedelta(days=1),
end_date=now + timedelta(days=1),
has_ios_based_reward=False,
drops_campaign_not_started=False,
source_api="unit-test",
)
def test_requires_campaign_no_when_latest_not_used(self) -> None:
"""Command should fail without campaign_no unless --latest is provided."""
with pytest.raises(
CommandError,
match="campaign_no is required unless --latest is used",
):
call_command("import_chzzk_campaign", stdout=StringIO())
def test_imports_single_campaign_no(self) -> None:
"""Command should import the provided campaign number."""
with patch(
"chzzk.management.commands.import_chzzk_campaign.Command._import_campaign",
autospec=True,
) as import_campaign_mock:
call_command("import_chzzk_campaign", "42", stdout=StringIO())
assert import_campaign_mock.call_count == 1
assert import_campaign_mock.call_args.args[1] == 42
def test_latest_imports_candidates(self) -> None:
"""Command should import all candidates returned by get_campaign_import_candidates."""
with (
patch(
"chzzk.management.commands.import_chzzk_campaign.Command.get_campaign_import_candidates",
autospec=True,
return_value=[9, 10, 11],
),
patch(
"chzzk.management.commands.import_chzzk_campaign.Command._import_campaign",
autospec=True,
) as import_campaign_mock,
):
call_command("import_chzzk_campaign", "--latest", stdout=StringIO())
called_campaigns = [
call_.args[1] for call_ in import_campaign_mock.call_args_list
]
assert called_campaigns == [9, 10, 11]
def test_latest_with_no_candidates_exits_cleanly(self) -> None:
"""Command should not import anything when --latest has no candidates."""
stdout = StringIO()
with (
patch(
"chzzk.management.commands.import_chzzk_campaign.Command.get_campaign_import_candidates",
autospec=True,
return_value=[],
),
patch(
"chzzk.management.commands.import_chzzk_campaign.Command._import_campaign",
autospec=True,
) as import_campaign_mock,
):
call_command("import_chzzk_campaign", "--latest", stdout=stdout)
assert import_campaign_mock.call_count == 0
assert "Nothing to import with --latest at this time." in stdout.getvalue()
def test_get_campaign_import_candidates_uses_initial_range_on_empty_db(
self,
) -> None:
"""When there are no campaigns, candidates should be 1..5."""
command: ImportChzzkCampaignCommand = ImportChzzkCampaignCommand()
candidates: list[int] = command.get_campaign_import_candidates()
assert candidates == [1, 2, 3, 4, 5]
def test_get_campaign_import_candidates_adds_backfill_and_new_candidates(
self,
) -> None:
"""Candidates should include missing IDs from latest-5..latest-1 plus latest+1..latest+5."""
self._create_campaign(10)
self._create_campaign(8)
self._create_campaign(6)
command: ImportChzzkCampaignCommand = ImportChzzkCampaignCommand()
candidates: list[int] = command.get_campaign_import_candidates()
assert candidates == [5, 7, 9, 11, 12, 13, 14, 15]
def test_get_campaign_import_candidates_ignores_outlier_max_campaign_no(
self,
) -> None:
"""If max campaign_no is an outlier, the second max should be used."""
self._create_campaign(250)
self._create_campaign(100_002_000)
command: ImportChzzkCampaignCommand = ImportChzzkCampaignCommand()
candidates: list[int] = command.get_campaign_import_candidates()
assert candidates == [245, 246, 247, 248, 249, 251, 252, 253, 254, 255]
def test_import_campaign_handles_http_error_with_json_message(self) -> None:
"""Import should fail gracefully on HTTP errors and include JSON message when present."""
command = ImportChzzkCampaignCommand()
response = Mock()
response.raise_for_status.side_effect = requests.HTTPError("404 Client Error")
response.headers = {"Content-Type": "application/json; charset=utf-8"}
response.json.return_value = {"message": "Campaign not found"}
with (
patch(
"chzzk.management.commands.import_chzzk_campaign.requests.get",
return_value=response,
),
patch.object(command.stdout, "write") as stdout_write,
patch.object(command, "import_campaign_data") as import_campaign_data_mock,
):
command._import_campaign(12345)
assert import_campaign_data_mock.call_count == 0
assert stdout_write.call_count == 1
msg = stdout_write.call_args.args[0]
assert "Failed to fetch campaign 12345" in msg
assert "Campaign not found" in msg
def test_update_or_create_reward_updates_existing_reward(self) -> None:
"""Existing rewards should be updated when incoming reward data differs."""
campaign = self._create_campaign(500)
existing_reward = ChzzkReward.objects.create(
campaign=campaign,
reward_no=1,
title="Old title",
image_url="https://example.com/old.png",
reward_type="OLD",
campaign_reward_type="",
condition_type="TIME",
condition_for_minutes=10,
ios_based_reward=False,
code_remaining_count=5,
)
reward_data = ChzzkRewardV2.model_validate(
{
"title": "New title",
"rewardNo": 1,
"imageUrl": "https://example.com/new.png",
"rewardType": "NEW",
"conditionType": "WATCH",
"conditionForMinutes": 20,
"iosBasedReward": True,
"codeRemainingCount": 99,
},
)
command = ImportChzzkCampaignCommand()
command.update_or_create_reward(500, campaign, reward_data)
existing_reward.refresh_from_db()
assert existing_reward.title == "New title"
assert existing_reward.image_url == "https://example.com/new.png"
assert existing_reward.reward_type == "NEW"
assert not existing_reward.campaign_reward_type
assert existing_reward.condition_type == "WATCH"
assert existing_reward.condition_for_minutes == 20
assert existing_reward.ios_based_reward is True
assert existing_reward.code_remaining_count == 99
def test_import_campaign_data_updates_existing_campaign(self) -> None:
"""Existing campaigns should be updated when imported fields have changed."""
campaign = self._create_campaign(600)
original_scraped_at = campaign.scraped_at
command = ImportChzzkCampaignCommand()
campaign_data = ChzzkCampaignV2.model_validate(
{
"campaignNo": 600,
"title": "Updated title",
"imageUrl": "https://example.com/new-campaign.png",
"description": "Updated description",
"categoryType": "game",
"categoryId": "2",
"categoryValue": "Game 2",
"pcLinkUrl": "https://example.com/pc",
"mobileLinkUrl": "https://example.com/mobile",
"serviceId": "chzzk",
"state": "ACTIVE",
"startDate": campaign.start_date.isoformat(),
"endDate": campaign.end_date.isoformat(),
"rewardList": [],
"hasIosBasedReward": True,
"dropsCampaignNotStarted": False,
"rewardType": "DROP",
"accountLinkUrl": "https://example.com/account",
},
)
updated_campaign = command.import_campaign_data(
campaign_no=600,
api_version="v2",
data={"key": "value"},
cd=campaign_data,
)
updated_campaign.refresh_from_db()
assert updated_campaign.title == "Updated title"
assert updated_campaign.description == "Updated description"
assert updated_campaign.category_id == "2"
assert updated_campaign.has_ios_based_reward is True
assert not updated_campaign.campaign_reward_type
assert updated_campaign.reward_type == "DROP"
assert updated_campaign.account_link_url == "https://example.com/account"
assert updated_campaign.raw_json_v2 == {"key": "value"}
assert updated_campaign.scrape_status == "success"
assert updated_campaign.scraped_at >= original_scraped_at
def test_apply_updates_if_changed_returns_false_on_noop(self) -> None:
"""Helper should return False when values are unchanged."""
campaign = self._create_campaign(700)
command = ImportChzzkCampaignCommand()
changed = command._apply_updates_if_changed(
campaign,
{
"title": campaign.title,
"description": campaign.description,
},
)
assert changed is False

58
chzzk/tests/test_tasks.py Normal file
View file

@ -0,0 +1,58 @@
from unittest.mock import patch
import pytest
from chzzk.tasks import discover_chzzk_campaigns
from chzzk.tasks import import_chzzk_campaign_task
def test_import_chzzk_campaign_task_calls_command_with_campaign_no() -> None:
"""It should invoke import_chzzk_campaign for the given campaign number."""
with patch("chzzk.tasks.call_command") as call_command_mock:
import_chzzk_campaign_task.run(905)
call_command_mock.assert_called_once_with("import_chzzk_campaign", "905")
def test_import_chzzk_campaign_task_retries_on_command_error() -> None:
"""It should retry when import_chzzk_campaign raises an exception."""
error = RuntimeError("boom")
with (
patch("chzzk.tasks.call_command", side_effect=error),
patch.object(
import_chzzk_campaign_task,
"retry",
side_effect=RuntimeError("retried"),
) as retry_mock,
pytest.raises(RuntimeError, match="retried"),
):
import_chzzk_campaign_task.run(905)
retry_mock.assert_called_once_with(exc=error)
def test_discover_chzzk_campaigns_calls_command_with_latest_flag() -> None:
"""It should invoke import_chzzk_campaign with latest=True."""
with patch("chzzk.tasks.call_command") as call_command_mock:
discover_chzzk_campaigns.run()
call_command_mock.assert_called_once_with("import_chzzk_campaign", latest=True)
def test_discover_chzzk_campaigns_retries_on_command_error() -> None:
"""It should retry when import_chzzk_campaign raises an exception."""
error = RuntimeError("boom")
with (
patch("chzzk.tasks.call_command", side_effect=error),
patch.object(
discover_chzzk_campaigns,
"retry",
side_effect=RuntimeError("retried"),
) as retry_mock,
pytest.raises(RuntimeError, match="retried"),
):
discover_chzzk_campaigns.run()
retry_mock.assert_called_once_with(exc=error)

View file

@ -5,6 +5,7 @@ from pathlib import Path
from typing import Any from typing import Any
import sentry_sdk import sentry_sdk
from celery.schedules import crontab
from dotenv import load_dotenv from dotenv import load_dotenv
from platformdirs import user_data_dir from platformdirs import user_data_dir
@ -109,6 +110,8 @@ STATICFILES_DIRS: list[Path] = [BASE_DIR / "static"]
TIME_ZONE = "UTC" TIME_ZONE = "UTC"
WSGI_APPLICATION = "config.wsgi.application" WSGI_APPLICATION = "config.wsgi.application"
TTVDROPS_PENDING_DIR: str = os.getenv("TTVDROPS_PENDING_DIR", "")
INTERNAL_IPS: list[str] = [] INTERNAL_IPS: list[str] = []
if DEBUG: if DEBUG:
INTERNAL_IPS = ["127.0.0.1", "localhost"] # pyright: ignore[reportConstantRedefinition] INTERNAL_IPS = ["127.0.0.1", "localhost"] # pyright: ignore[reportConstantRedefinition]
@ -259,6 +262,63 @@ CELERY_BROKER_URL: str = REDIS_URL_CELERY
CELERY_RESULT_BACKEND = "django-db" CELERY_RESULT_BACKEND = "django-db"
CELERY_RESULT_EXTENDED = True CELERY_RESULT_EXTENDED = True
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler" CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
CELERY_TASK_SOFT_TIME_LIMIT: int = 3600 # warn at 1 h
CELERY_TASK_TIME_LIMIT: int = 3900 # hard-kill at 1 h 5 min
CELERY_TASK_ROUTES: dict[str, dict[str, str]] = {
"twitch.tasks.scan_pending_twitch_files": {"queue": "imports"},
"twitch.tasks.import_twitch_file": {"queue": "imports"},
"twitch.tasks.download_game_image": {"queue": "image-downloads"},
"twitch.tasks.download_campaign_image": {"queue": "image-downloads"},
"twitch.tasks.download_benefit_image": {"queue": "image-downloads"},
"twitch.tasks.download_reward_campaign_image": {"queue": "image-downloads"},
"twitch.tasks.download_all_images": {"queue": "image-downloads"},
"twitch.tasks.import_chat_badges": {"queue": "api-fetches"},
"twitch.tasks.backup_database": {"queue": "default"},
"kick.tasks.import_kick_drops": {"queue": "api-fetches"},
"chzzk.tasks.discover_chzzk_campaigns": {"queue": "api-fetches"},
"chzzk.tasks.import_chzzk_campaign_task": {"queue": "imports"},
"core.tasks.submit_indexnow_task": {"queue": "default"},
}
CELERY_BEAT_SCHEDULE: dict[str, Any] = {
# Scan for new Twitch JSON drops every 10 seconds.
"scan-pending-twitch-files": {
"task": "twitch.tasks.scan_pending_twitch_files",
"schedule": 10.0,
"options": {"queue": "imports"},
},
# Import Kick drops from the API (:01, :16, :31, :46 each hour).
"import-kick-drops": {
"task": "kick.tasks.import_kick_drops",
"schedule": crontab(minute="1,16,31,46"),
"options": {"queue": "api-fetches"},
},
# Backup database nightly at 02:15.
"backup-database": {
"task": "twitch.tasks.backup_database",
"schedule": crontab(hour=2, minute=15),
"options": {"queue": "default"},
},
# Discover new Chzzk campaigns every 2 hours at minute 0.
"discover-chzzk-campaigns": {
"task": "chzzk.tasks.discover_chzzk_campaigns",
"schedule": crontab(minute=0, hour="*/2"),
"options": {"queue": "api-fetches"},
},
# Weekly full image refresh (Sunday 04:00 UTC).
"download-all-images-weekly": {
"task": "twitch.tasks.download_all_images",
"schedule": crontab(hour=4, minute=0, day_of_week=0),
"options": {"queue": "image-downloads"},
},
# Weekly chat badge refresh (Sunday 03:00 UTC).
"import-chat-badges-weekly": {
"task": "twitch.tasks.import_chat_badges",
"schedule": crontab(hour=3, minute=0, day_of_week=0),
"options": {"queue": "api-fetches"},
},
}
# Define BASE_URL for dynamic URL generation # Define BASE_URL for dynamic URL generation
BASE_URL: str = "https://ttvdrops.lovinator.space" BASE_URL: str = "https://ttvdrops.lovinator.space"

17
core/tasks.py Normal file
View file

@ -0,0 +1,17 @@
from __future__ import annotations
import logging
from celery import shared_task
from django.core.management import call_command
logger = logging.getLogger("ttvdrops.tasks")
@shared_task(bind=True, queue="default", max_retries=3, default_retry_delay=300)
def submit_indexnow_task(self) -> None: # noqa: ANN001
"""Submit all site URLs to the IndexNow search index."""
try:
call_command("submit_indexnow")
except Exception as exc:
raise self.retry(exc=exc) from exc

17
kick/tasks.py Normal file
View file

@ -0,0 +1,17 @@
from __future__ import annotations
import logging
from celery import shared_task
from django.core.management import call_command
logger = logging.getLogger("ttvdrops.tasks")
@shared_task(bind=True, queue="api-fetches", max_retries=3, default_retry_delay=120)
def import_kick_drops(self) -> None: # noqa: ANN001
"""Fetch and upsert Kick drop campaigns from the public API."""
try:
call_command("import_kick_drops")
except Exception as exc:
raise self.retry(exc=exc) from exc

View file

@ -50,7 +50,7 @@ dev = [
[tool.pytest.ini_options] [tool.pytest.ini_options]
DJANGO_SETTINGS_MODULE = "config.settings" DJANGO_SETTINGS_MODULE = "config.settings"
python_files = ["test_*.py", "*_test.py"] python_files = ["test_*.py", "*_test.py"]
addopts = "" addopts = "--tb=short -n auto --cov"
filterwarnings = [ filterwarnings = [
"ignore:Parsing dates involving a day of month without a year specified is ambiguous:DeprecationWarning", "ignore:Parsing dates involving a day of month without a year specified is ambiguous:DeprecationWarning",
] ]

View file

@ -1,10 +0,0 @@
[Unit]
Description=TTVDrops database backup
[Service]
Type=oneshot
User=ttvdrops
Group=ttvdrops
WorkingDirectory=/home/ttvdrops/ttvdrops
EnvironmentFile=/home/ttvdrops/ttvdrops/.env
ExecStart=/usr/bin/uv run python manage.py backup_db

View file

@ -1,9 +0,0 @@
[Unit]
Description=Nightly TTVDrops database backup
[Timer]
OnCalendar=*-*-* 02:15:00
Persistent=true
[Install]
WantedBy=timers.target

View file

@ -0,0 +1,27 @@
[Unit]
Description=TTVDrops Celery Beat scheduler
After=network-online.target valkey.service ttvdrops-celery-worker.service
Wants=network-online.target valkey.service
[Service]
Type=simple
User=ttvdrops
Group=ttvdrops
SupplementaryGroups=http
UMask=0002
WorkingDirectory=/home/ttvdrops/ttvdrops
EnvironmentFile=/home/ttvdrops/ttvdrops/.env
ExecStart=/usr/bin/uv run celery -A config beat \
--scheduler django_celery_beat.schedulers:DatabaseScheduler \
--loglevel INFO
ExecStop=/bin/kill -s TERM $MAINPID
StandardOutput=journal
StandardError=journal
SyslogIdentifier=ttvdrops-celery-beat
Restart=on-failure
RestartSec=10s
[Install]
WantedBy=multi-user.target

View file

@ -0,0 +1,31 @@
[Unit]
Description=TTVDrops Celery worker
After=network-online.target valkey.service
Wants=network-online.target valkey.service
[Service]
Type=simple
User=ttvdrops
Group=ttvdrops
SupplementaryGroups=http
UMask=0002
WorkingDirectory=/home/ttvdrops/ttvdrops
EnvironmentFile=/home/ttvdrops/ttvdrops/.env
ExecStart=/usr/bin/uv run celery -A config worker \
--queues imports,api-fetches,default \
--concurrency 4 \
--loglevel INFO
ExecStop=/bin/kill -s TERM $MAINPID
ExecReload=/bin/kill -s HUP $MAINPID
MemoryLimit=512M
CPUQuota=75%
StandardOutput=journal
StandardError=journal
SyslogIdentifier=ttvdrops-celery-worker
Restart=on-failure
RestartSec=10s
[Install]
WantedBy=multi-user.target

View file

@ -1,34 +0,0 @@
[Unit]
Description=TTVDrops watch and import drops from pending directory
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
User=ttvdrops
Group=ttvdrops
SupplementaryGroups=http
UMask=0002
WorkingDirectory=/home/ttvdrops/ttvdrops
EnvironmentFile=/home/ttvdrops/ttvdrops/.env
ExecStart=/usr/bin/uv run python manage.py watch_imports /mnt/fourteen/Data/Responses/pending --verbose
# Restart policy
Restart=on-failure
RestartSec=5s
# Process management
KillMode=mixed
KillSignal=SIGTERM
# Resource limits
MemoryLimit=512M
CPUQuota=50%
# Logging
StandardOutput=journal
StandardError=journal
SyslogIdentifier=ttvdrops-watch
[Install]
WantedBy=multi-user.target

View file

@ -1,26 +0,0 @@
[Unit]
Description=TTVDrops import Kick drops
After=network-online.target
Wants=network-online.target
[Service]
Type=oneshot
User=ttvdrops
Group=ttvdrops
SupplementaryGroups=http
UMask=0002
WorkingDirectory=/home/ttvdrops/ttvdrops
EnvironmentFile=/home/ttvdrops/ttvdrops/.env
ExecStart=/usr/bin/uv run python manage.py import_kick_drops
# Logging
StandardOutput=journal
StandardError=journal
SyslogIdentifier=ttvdrops-import-kick
# Resource limits
MemoryLimit=512M
CPUQuota=50%
[Install]
WantedBy=multi-user.target

View file

@ -1,9 +0,0 @@
[Unit]
Description=TTVDrops import Kick drops at :01, :16, :31, and :46
[Timer]
OnCalendar=*-*-* *:01,16,31,46:00
Persistent=true
[Install]
WantedBy=timers.target

View file

@ -36,3 +36,21 @@ class TwitchConfig(AppConfig):
FieldFile.open = _safe_open FieldFile.open = _safe_open
except (AttributeError, TypeError) as exc: except (AttributeError, TypeError) as exc:
logger.debug("Failed to patch FieldFile.open: %s", exc) logger.debug("Failed to patch FieldFile.open: %s", exc)
# Register post_save signal handlers that dispatch image download tasks
# when new Twitch records are created.
from django.db.models.signals import post_save # noqa: PLC0415
from twitch.models import DropBenefit # noqa: PLC0415
from twitch.models import DropCampaign # noqa: PLC0415
from twitch.models import Game # noqa: PLC0415
from twitch.models import RewardCampaign # noqa: PLC0415
from twitch.signals import on_drop_benefit_saved # noqa: PLC0415
from twitch.signals import on_drop_campaign_saved # noqa: PLC0415
from twitch.signals import on_game_saved # noqa: PLC0415
from twitch.signals import on_reward_campaign_saved # noqa: PLC0415
post_save.connect(on_game_saved, sender=Game)
post_save.connect(on_drop_campaign_saved, sender=DropCampaign)
post_save.connect(on_drop_benefit_saved, sender=DropBenefit)
post_save.connect(on_reward_campaign_saved, sender=RewardCampaign)

View file

@ -8,7 +8,8 @@ from twitch.models import Channel
if TYPE_CHECKING: if TYPE_CHECKING:
from argparse import ArgumentParser from argparse import ArgumentParser
from debug_toolbar.panels.templates.panel import QuerySet from django.db.models import QuerySet
SAMPLE_PREVIEW_COUNT = 10 SAMPLE_PREVIEW_COUNT = 10

View file

@ -193,8 +193,8 @@ class Command(BaseCommand):
img.mode == "P" and "transparency" in img.info img.mode == "P" and "transparency" in img.info
): ):
# Create white background for transparency # Create white background for transparency
background = Image.new("RGB", img.size, (255, 255, 255)) background: Image.Image = Image.new("RGB", img.size, (255, 255, 255))
rgba_img = img.convert("RGBA") if img.mode == "P" else img rgba_img: Image.Image = img.convert("RGBA") if img.mode == "P" else img
background.paste( background.paste(
rgba_img, rgba_img,
mask=rgba_img.split()[-1] if rgba_img.mode in {"RGBA", "LA"} else None, mask=rgba_img.split()[-1] if rgba_img.mode in {"RGBA", "LA"} else None,

View file

@ -17,9 +17,16 @@ logger: logging.Logger = logging.getLogger("ttvdrops.watch_imports")
class Command(BaseCommand): class Command(BaseCommand):
"""Watch for JSON files in a directory and import them automatically.""" """Watch for JSON files in a directory and import them automatically.
help = "Watch a directory for JSON files and import them automatically" .. deprecated::
This command is superseded by the Celery Beat task
``twitch.tasks.scan_pending_twitch_files`` (runs every 10 s via
``ttvdrops-celery-beat.service``). Keep this command for ad-hoc use
or in environments that run without a Celery worker.
"""
help = "Watch a directory for JSON files and import them automatically (superseded by Celery Beat)"
requires_migrations_checks = True requires_migrations_checks = True
def add_arguments(self, parser: CommandParser) -> None: def add_arguments(self, parser: CommandParser) -> None:

65
twitch/signals.py Normal file
View file

@ -0,0 +1,65 @@
from __future__ import annotations
import logging
from typing import Any
logger = logging.getLogger("ttvdrops.signals")
def _dispatch(task_fn: Any, pk: int) -> None: # noqa: ANN401
"""Dispatch a Celery task, logging rather than raising when the broker is unavailable."""
try:
task_fn.delay(pk)
except Exception: # noqa: BLE001
logger.debug(
"Could not dispatch %s(%d) — broker may be unavailable.",
task_fn.name,
pk,
)
def on_game_saved(sender: Any, instance: Any, created: bool, **kwargs: Any) -> None: # noqa: ANN401, FBT001
"""Dispatch a box-art download task when a new Game is created."""
if created:
from twitch.tasks import download_game_image # noqa: PLC0415
_dispatch(download_game_image, instance.pk)
def on_drop_campaign_saved(
sender: Any, # noqa: ANN401
instance: Any, # noqa: ANN401
created: bool, # noqa: FBT001
**kwargs: Any, # noqa: ANN401
) -> None:
"""Dispatch an image download task when a new DropCampaign is created."""
if created:
from twitch.tasks import download_campaign_image # noqa: PLC0415
_dispatch(download_campaign_image, instance.pk)
def on_drop_benefit_saved(
sender: Any, # noqa: ANN401
instance: Any, # noqa: ANN401
created: bool, # noqa: FBT001
**kwargs: Any, # noqa: ANN401
) -> None:
"""Dispatch an image download task when a new DropBenefit is created."""
if created:
from twitch.tasks import download_benefit_image # noqa: PLC0415
_dispatch(download_benefit_image, instance.pk)
def on_reward_campaign_saved(
sender: Any, # noqa: ANN401
instance: Any, # noqa: ANN401
created: bool, # noqa: FBT001
**kwargs: Any, # noqa: ANN401
) -> None:
"""Dispatch an image download task when a new RewardCampaign is created."""
if created:
from twitch.tasks import download_reward_campaign_image # noqa: PLC0415
_dispatch(download_reward_campaign_image, instance.pk)

257
twitch/tasks.py Normal file
View file

@ -0,0 +1,257 @@
from __future__ import annotations
import logging
import os
from pathlib import Path
from typing import TYPE_CHECKING
from urllib.parse import urlparse
import httpx
from celery import shared_task
from django.core.files.base import ContentFile
from django.core.management import call_command
from PIL.Image import Image
if TYPE_CHECKING:
from urllib.parse import ParseResult
from django.db.models.fields.files import ImageFieldFile
from PIL.Image import Image
from PIL.ImageFile import ImageFile
logger: logging.Logger = logging.getLogger("ttvdrops.tasks")
@shared_task(bind=True, queue="imports", max_retries=3, default_retry_delay=60)
def scan_pending_twitch_files(self) -> None: # noqa: ANN001
"""Scan TTVDROPS_PENDING_DIR for JSON files and dispatch an import task for each."""
pending_dir: str = os.getenv("TTVDROPS_PENDING_DIR", "")
if not pending_dir:
logger.debug("TTVDROPS_PENDING_DIR not configured; skipping scan.")
return
path = Path(pending_dir)
if not path.is_dir():
logger.warning("TTVDROPS_PENDING_DIR %r is not a directory.", pending_dir)
return
json_files: list[Path] = [
f for f in path.iterdir() if f.suffix == ".json" and f.is_file()
]
for json_file in json_files:
import_twitch_file.delay(str(json_file))
if json_files:
logger.info("Dispatched %d Twitch file import task(s).", len(json_files))
@shared_task(bind=True, queue="imports", max_retries=3, default_retry_delay=60)
def import_twitch_file(self, file_path: str) -> None: # noqa: ANN001
"""Import a single Twitch JSON drop file via BetterImportDrops logic."""
from twitch.management.commands.better_import_drops import Command as Importer # noqa: I001, PLC0415
path = Path(file_path)
if not path.is_file():
# Already moved to imported/ or broken/ by a prior run.
logger.debug("File %s no longer exists; skipping.", file_path)
return
try:
Importer().handle(path=path)
except Exception as exc:
logger.exception("Failed to import %s.", file_path)
raise self.retry(exc=exc) from exc
def _download_and_save(url: str, name: str, file_field: ImageFieldFile) -> bool:
"""Download url and save the content to file_field (Django ImageField).
Files that are already cached (non-empty .name) are skipped.
Returns:
True when the image was saved, False when skipped or on error.
"""
if not url or file_field is None:
return False
if getattr(file_field, "name", None):
return False # already cached
parsed: ParseResult = urlparse(url)
suffix: str = Path(parsed.path).suffix or ".jpg"
file_name: str = f"{name}{suffix}"
try:
with httpx.Client(timeout=20, follow_redirects=True) as client:
response = client.get(url)
response.raise_for_status()
except httpx.HTTPError:
logger.warning("HTTP error downloading image for %r.", name)
return False
file_field.save(file_name, ContentFile(response.content), save=True) # type: ignore[union-attr]
image_path: str | None = getattr(file_field, "path", None)
if image_path:
_convert_to_modern_formats(Path(image_path))
return True
def _convert_to_modern_formats(source: Path) -> None:
"""Convert *source* image to WebP and AVIF alongside the original."""
if not source.exists() or source.suffix.lower() not in {".jpg", ".jpeg", ".png"}:
return
try:
from PIL import Image # noqa: PLC0415
except ImportError:
return
try:
with Image.open(source) as raw:
if raw.mode in {"RGBA", "LA"} or (
raw.mode == "P" and "transparency" in raw.info
):
background: Image = Image.new("RGB", raw.size, (255, 255, 255))
rgba: Image | ImageFile = (
raw.convert("RGBA") if raw.mode == "P" else raw
)
mask: Image | None = (
rgba.split()[-1] if rgba.mode in {"RGBA", "LA"} else None
)
background.paste(rgba, mask=mask)
img: Image = background
elif raw.mode != "RGB":
img = raw.convert("RGB")
else:
img: Image = raw.copy()
for fmt, ext in (("WEBP", ".webp"), ("AVIF", ".avif")):
out: Path = source.with_suffix(ext)
try:
img.save(out, fmt, quality=80)
except Exception: # noqa: BLE001
logger.debug("Could not convert %s to %s.", source, fmt)
except Exception: # noqa: BLE001
logger.debug("Format conversion failed for %s.", source)
# ---------------------------------------------------------------------------
# Per-model image tasks — triggered by post_save signals on new records
# ---------------------------------------------------------------------------
@shared_task(bind=True, queue="image-downloads", max_retries=3, default_retry_delay=300)
def download_game_image(self, game_pk: int) -> None: # noqa: ANN001
"""Download and cache the box art image for a single Game."""
from twitch.models import Game # noqa: PLC0415
from twitch.utils import is_twitch_box_art_url # noqa: PLC0415
from twitch.utils import normalize_twitch_box_art_url # noqa: PLC0415
try:
game = Game.objects.get(pk=game_pk)
except Game.DoesNotExist:
return
if not game.box_art or not is_twitch_box_art_url(game.box_art):
return
url = normalize_twitch_box_art_url(game.box_art)
try:
_download_and_save(url, game.twitch_id, game.box_art_file)
except Exception as exc:
raise self.retry(exc=exc) from exc
@shared_task(bind=True, queue="image-downloads", max_retries=3, default_retry_delay=300)
def download_campaign_image(self, campaign_pk: int) -> None: # noqa: ANN001
"""Download and cache the image for a single DropCampaign."""
from twitch.models import DropCampaign # noqa: PLC0415
try:
campaign = DropCampaign.objects.get(pk=campaign_pk)
except DropCampaign.DoesNotExist:
return
if not campaign.image_url:
return
try:
_download_and_save(campaign.image_url, campaign.twitch_id, campaign.image_file)
except Exception as exc:
raise self.retry(exc=exc) from exc
@shared_task(bind=True, queue="image-downloads", max_retries=3, default_retry_delay=300)
def download_benefit_image(self, benefit_pk: int) -> None: # noqa: ANN001
"""Download and cache the image for a single DropBenefit."""
from twitch.models import DropBenefit # noqa: PLC0415
try:
benefit = DropBenefit.objects.get(pk=benefit_pk)
except DropBenefit.DoesNotExist:
return
if not benefit.image_asset_url:
return
try:
_download_and_save(
url=benefit.image_asset_url,
name=benefit.twitch_id,
file_field=benefit.image_file,
)
except Exception as exc:
raise self.retry(exc=exc) from exc
@shared_task(bind=True, queue="image-downloads", max_retries=3, default_retry_delay=300)
def download_reward_campaign_image(self, reward_pk: int) -> None: # noqa: ANN001
"""Download and cache the image for a single RewardCampaign."""
from twitch.models import RewardCampaign # noqa: PLC0415
try:
reward = RewardCampaign.objects.get(pk=reward_pk)
except RewardCampaign.DoesNotExist:
return
if not reward.image_url:
return
try:
_download_and_save(reward.image_url, reward.twitch_id, reward.image_file)
except Exception as exc:
raise self.retry(exc=exc) from exc
@shared_task(queue="image-downloads")
def download_all_images() -> None:
"""Weekly full-refresh: download images for all Twitch models."""
call_command("download_box_art")
call_command("download_campaign_images", model="all")
# ---------------------------------------------------------------------------
# Twitch API tasks
# ---------------------------------------------------------------------------
@shared_task(bind=True, queue="api-fetches", max_retries=3, default_retry_delay=120)
def import_chat_badges(self) -> None: # noqa: ANN001
"""Fetch and upsert Twitch global chat badges via the Helix API."""
try:
call_command("import_chat_badges")
except Exception as exc:
raise self.retry(exc=exc) from exc
# ---------------------------------------------------------------------------
# Maintenance
# ---------------------------------------------------------------------------
@shared_task(queue="default")
def backup_database() -> None:
"""Create a zstd-compressed SQL backup of the dataset tables."""
call_command("backup_db")

View file

@ -1,13 +1,22 @@
import json import json
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from unittest import skipIf from unittest import skipIf
from unittest.mock import patch
from django.db import connection from django.db import connection
from django.test import TestCase from django.test import TestCase
from twitch.management.commands.better_import_drops import Command from twitch.management.commands.better_import_drops import Command
from twitch.management.commands.better_import_drops import detect_error_only_response from twitch.management.commands.better_import_drops import detect_error_only_response
from twitch.management.commands.better_import_drops import detect_non_campaign_keyword
from twitch.management.commands.better_import_drops import (
extract_operation_name_from_parsed,
)
from twitch.management.commands.better_import_drops import move_completed_file
from twitch.management.commands.better_import_drops import move_file_to_broken_subdir
from twitch.management.commands.better_import_drops import repair_partially_broken_json
from twitch.models import DropBenefit from twitch.models import DropBenefit
from twitch.models import DropCampaign from twitch.models import DropCampaign
from twitch.models import Game from twitch.models import Game
@ -426,6 +435,92 @@ class CampaignStructureDetectionTests(TestCase):
structure: str | None = command._detect_campaign_structure(response) structure: str | None = command._detect_campaign_structure(response)
assert structure == "current_user_drop_campaigns" assert structure == "current_user_drop_campaigns"
def test_detects_user_drop_campaign_structure(self) -> None:
"""Ensure user.dropCampaign structure is correctly detected."""
command = Command()
response: dict[str, object] = {
"data": {
"user": {
"id": "123",
"dropCampaign": {"id": "c1", "name": "Test Campaign"},
"__typename": "User",
},
},
}
structure: str | None = command._detect_campaign_structure(response)
assert structure == "user_drop_campaign"
def test_detects_channel_viewer_campaigns_structure(self) -> None:
"""Ensure channel.viewerDropCampaigns structure is correctly detected."""
command = Command()
response: dict[str, object] = {
"data": {
"channel": {
"id": "123",
"viewerDropCampaigns": [{"id": "c1", "name": "Test Campaign"}],
"__typename": "Channel",
},
},
}
structure: str | None = command._detect_campaign_structure(response)
assert structure == "channel_viewer_campaigns"
class FileMoveUtilityTests(TestCase):
"""Tests for imported/broken file move utility helpers."""
def test_move_completed_file_sanitizes_operation_directory_name(self) -> None:
"""Ensure operation names are sanitized and campaign structure subdir is respected."""
with TemporaryDirectory() as tmp_dir:
root_path = Path(tmp_dir)
imported_root = root_path / "imported"
source_file = root_path / "payload.json"
source_file.write_text("{}", encoding="utf-8")
with patch(
"twitch.management.commands.better_import_drops.get_imported_directory_root",
return_value=imported_root,
):
target_dir = move_completed_file(
file_path=source_file,
operation_name="My Op/Name\\v1",
campaign_structure="inventory_campaigns",
)
expected_dir = imported_root / "My_Op_Name_v1" / "inventory_campaigns"
assert target_dir == expected_dir
assert not source_file.exists()
assert (expected_dir / "payload.json").exists()
def test_move_file_to_broken_subdir_avoids_duplicate_operation_segment(
self,
) -> None:
"""Ensure matching reason and operation names do not create duplicate directories."""
with TemporaryDirectory() as tmp_dir:
root_path = Path(tmp_dir)
broken_root = root_path / "broken"
source_file = root_path / "broken_payload.json"
source_file.write_text("{}", encoding="utf-8")
with patch(
"twitch.management.commands.better_import_drops.get_broken_directory_root",
return_value=broken_root,
):
broken_dir = move_file_to_broken_subdir(
file_path=source_file,
subdir="validation_failed",
operation_name="validation_failed",
)
path_segments = broken_dir.as_posix().split("/")
assert path_segments.count("validation_failed") == 1
assert not source_file.exists()
assert (broken_dir / "broken_payload.json").exists()
class OperationNameFilteringTests(TestCase): class OperationNameFilteringTests(TestCase):
"""Tests for filtering campaigns by operation_name field.""" """Tests for filtering campaigns by operation_name field."""
@ -864,3 +959,179 @@ class ErrorOnlyResponseDetectionTests(TestCase):
result = detect_error_only_response(parsed_json) result = detect_error_only_response(parsed_json)
assert result == "error_only: unknown error" assert result == "error_only: unknown error"
class NonCampaignKeywordDetectionTests(TestCase):
"""Tests for non-campaign operation keyword detection."""
def test_detects_known_non_campaign_operation(self) -> None:
"""Ensure known operationName values are detected as non-campaign payloads."""
raw_text = json.dumps({"extensions": {"operationName": "PlaybackAccessToken"}})
result = detect_non_campaign_keyword(raw_text)
assert result == "PlaybackAccessToken"
def test_returns_none_for_unknown_operation(self) -> None:
"""Ensure unrelated operation names are not flagged."""
raw_text = json.dumps({"extensions": {"operationName": "DropCampaignDetails"}})
result = detect_non_campaign_keyword(raw_text)
assert result is None
class OperationNameExtractionTests(TestCase):
"""Tests for operation name extraction across supported payload shapes."""
def test_extracts_operation_name_from_json_repair_tuple(self) -> None:
"""Ensure extraction supports tuple payloads returned by json_repair."""
payload = (
{"extensions": {"operationName": "ViewerDropsDashboard"}},
[{"json_repair": "log"}],
)
result = extract_operation_name_from_parsed(payload)
assert result == "ViewerDropsDashboard"
def test_extracts_operation_name_from_list_payload(self) -> None:
"""Ensure extraction inspects the first response in list payloads."""
payload = [
{"extensions": {"operationName": "Inventory"}},
{"extensions": {"operationName": "IgnoredSecondItem"}},
]
result = extract_operation_name_from_parsed(payload)
assert result == "Inventory"
def test_returns_none_for_empty_list_payload(self) -> None:
"""Ensure extraction returns None for empty list payloads."""
result = extract_operation_name_from_parsed([])
assert result is None
class JsonRepairTests(TestCase):
"""Tests for partial JSON repair fallback behavior."""
def test_repair_filters_non_graphql_items_from_list(self) -> None:
"""Ensure repaired list output only keeps GraphQL-like response objects."""
raw_text = '[{"foo": 1}, {"data": {"currentUser": {"id": "1"}}}]'
repaired = repair_partially_broken_json(raw_text)
parsed = json.loads(repaired)
assert parsed == [{"data": {"currentUser": {"id": "1"}}}]
class ProcessFileWorkerTests(TestCase):
"""Tests for process_file_worker early-return behaviors."""
def test_returns_reason_when_drop_campaign_key_missing(self) -> None:
"""Ensure files without dropCampaign are marked failed with clear reason."""
command = Command()
repo_root: Path = Path(__file__).resolve().parents[2]
temp_path: Path = repo_root / "twitch" / "tests" / "tmp_no_drop_campaign.json"
temp_path.write_text(
json.dumps({"data": {"currentUser": {"id": "123"}}}),
encoding="utf-8",
)
try:
result = command.process_file_worker(
file_path=temp_path,
options={"crash_on_error": False, "skip_broken_moves": True},
)
finally:
if temp_path.exists():
temp_path.unlink()
assert result["success"] is False
assert result["broken_dir"] == "(skipped)"
assert result["reason"] == "no dropCampaign present"
def test_returns_reason_for_error_only_response(self) -> None:
"""Ensure error-only responses are marked failed with extracted reason."""
command = Command()
repo_root: Path = Path(__file__).resolve().parents[2]
temp_path: Path = repo_root / "twitch" / "tests" / "tmp_error_only.json"
temp_path.write_text(
json.dumps(
{
"errors": [{"message": "service timeout"}],
"data": None,
},
),
encoding="utf-8",
)
try:
result = command.process_file_worker(
file_path=temp_path,
options={"crash_on_error": False, "skip_broken_moves": True},
)
finally:
if temp_path.exists():
temp_path.unlink()
assert result["success"] is False
assert result["broken_dir"] == "(skipped)"
assert result["reason"] == "error_only: service timeout"
def test_returns_reason_for_known_non_campaign_keyword(self) -> None:
"""Ensure known non-campaign operation payloads are rejected with reason."""
command = Command()
repo_root: Path = Path(__file__).resolve().parents[2]
temp_path: Path = (
repo_root / "twitch" / "tests" / "tmp_non_campaign_keyword.json"
)
temp_path.write_text(
json.dumps(
{
"data": {"currentUser": {"id": "123"}},
"extensions": {"operationName": "PlaybackAccessToken"},
},
),
encoding="utf-8",
)
try:
result = command.process_file_worker(
file_path=temp_path,
options={"crash_on_error": False, "skip_broken_moves": True},
)
finally:
if temp_path.exists():
temp_path.unlink()
assert result["success"] is False
assert result["broken_dir"] == "(skipped)"
assert result["reason"] == "matched 'PlaybackAccessToken'"
class NormalizeResponsesTests(TestCase):
"""Tests for response normalization across supported payload formats."""
def test_normalizes_batched_responses_wrapper(self) -> None:
"""Ensure batched payloads under responses key are unwrapped and filtered."""
command = Command()
parsed_json: dict[str, object] = {
"responses": [
{"data": {"currentUser": {"id": "1"}}},
"invalid-item",
{"extensions": {"operationName": "Inventory"}},
],
}
normalized = command._normalize_responses(parsed_json)
assert len(normalized) == 2
assert normalized[0]["data"]["currentUser"]["id"] == "1"
assert normalized[1]["extensions"]["operationName"] == "Inventory"
def test_returns_empty_list_for_empty_tuple_payload(self) -> None:
"""Ensure empty tuple payloads from json_repair produce no responses."""
command = Command()
normalized = command._normalize_responses(()) # type: ignore[arg-type]
assert normalized == []

View file

@ -0,0 +1,150 @@
from io import StringIO
from unittest.mock import patch
import httpx
import pytest
from django.core.management import CommandError
from django.core.management import call_command
from twitch.models import ChatBadge
from twitch.models import ChatBadgeSet
from twitch.schemas import GlobalChatBadgesResponse
pytestmark: pytest.MarkDecorator = pytest.mark.django_db
def _build_response(title: str = "VIP") -> GlobalChatBadgesResponse:
"""Build a valid GlobalChatBadgesResponse payload for tests.
Returns:
A validated Twitch global chat badges response object.
"""
return GlobalChatBadgesResponse.model_validate({
"data": [
{
"set_id": "vip",
"versions": [
{
"id": "1",
"image_url_1x": "https://example.com/vip-1x.png",
"image_url_2x": "https://example.com/vip-2x.png",
"image_url_4x": "https://example.com/vip-4x.png",
"title": title,
"description": "VIP Badge",
"click_action": "visit_url",
"click_url": "https://help.twitch.tv",
},
],
},
],
})
def test_raises_when_client_id_missing(monkeypatch: pytest.MonkeyPatch) -> None:
"""Command should fail when client ID is not provided."""
monkeypatch.delenv("TWITCH_CLIENT_ID", raising=False)
monkeypatch.delenv("TWITCH_CLIENT_SECRET", raising=False)
monkeypatch.delenv("TWITCH_ACCESS_TOKEN", raising=False)
with pytest.raises(CommandError, match="Twitch Client ID is required"):
call_command("import_chat_badges", stdout=StringIO())
def test_raises_when_token_and_secret_missing(monkeypatch: pytest.MonkeyPatch) -> None:
"""Command should fail when no token and no client secret are available."""
monkeypatch.delenv("TWITCH_CLIENT_SECRET", raising=False)
monkeypatch.delenv("TWITCH_ACCESS_TOKEN", raising=False)
with pytest.raises(CommandError, match="Either --access-token or --client-secret"):
call_command(
"import_chat_badges",
"--client-id",
"client-id",
stdout=StringIO(),
)
@pytest.mark.django_db
def test_import_creates_then_updates_existing_badge() -> None:
"""Running import twice should update existing badges rather than duplicate them."""
with patch(
"twitch.management.commands.import_chat_badges.Command._fetch_global_chat_badges",
side_effect=[_build_response("VIP"), _build_response("VIP Updated")],
) as fetch_mock:
call_command(
"import_chat_badges",
"--client-id",
"client-id",
"--access-token",
"access-token",
stdout=StringIO(),
)
call_command(
"import_chat_badges",
"--client-id",
"client-id",
"--access-token",
"access-token",
stdout=StringIO(),
)
assert fetch_mock.call_count == 2
assert ChatBadgeSet.objects.count() == 1
assert ChatBadge.objects.count() == 1
badge_set = ChatBadgeSet.objects.get(set_id="vip")
badge = ChatBadge.objects.get(badge_set=badge_set, badge_id="1")
assert badge.title == "VIP Updated"
assert badge.click_action == "visit_url"
assert badge.click_url == "https://help.twitch.tv"
@pytest.mark.django_db
def test_uses_client_credentials_when_access_token_missing() -> None:
"""Command should obtain token from Twitch when no access token is provided."""
with (
patch(
"twitch.management.commands.import_chat_badges.Command._get_app_access_token",
return_value="generated-token",
) as token_mock,
patch(
"twitch.management.commands.import_chat_badges.Command._fetch_global_chat_badges",
return_value=GlobalChatBadgesResponse.model_validate({"data": []}),
) as fetch_mock,
):
call_command(
"import_chat_badges",
"--client-id",
"client-id",
"--client-secret",
"client-secret",
stdout=StringIO(),
)
token_mock.assert_called_once_with("client-id", "client-secret")
fetch_mock.assert_called_once_with(
client_id="client-id",
access_token="generated-token",
)
def test_wraps_http_errors_from_badges_fetch() -> None:
"""Command should convert HTTP client errors to CommandError."""
with (
patch(
"twitch.management.commands.import_chat_badges.Command._fetch_global_chat_badges",
side_effect=httpx.HTTPError("boom"),
),
pytest.raises(
CommandError,
match="Failed to fetch chat badges from Twitch API",
),
):
call_command(
"import_chat_badges",
"--client-id",
"client-id",
"--access-token",
"access-token",
stdout=StringIO(),
)

575
twitch/tests/test_tasks.py Normal file
View file

@ -0,0 +1,575 @@
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import MagicMock
from unittest.mock import call
from unittest.mock import patch
import httpx
import pytest
from twitch.models import DropBenefit
from twitch.models import DropCampaign
from twitch.models import RewardCampaign
from twitch.tasks import _convert_to_modern_formats
from twitch.tasks import _download_and_save
from twitch.tasks import backup_database
from twitch.tasks import download_all_images
from twitch.tasks import download_benefit_image
from twitch.tasks import download_campaign_image
from twitch.tasks import download_game_image
from twitch.tasks import download_reward_campaign_image
from twitch.tasks import import_chat_badges
from twitch.tasks import import_twitch_file
from twitch.tasks import scan_pending_twitch_files
def test_scan_pending_twitch_files_dispatches_json_files(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
"""It should dispatch an import task for each JSON file in the pending directory."""
first = tmp_path / "one.json"
second = tmp_path / "two.json"
ignored = tmp_path / "notes.txt"
first.write_text("{}", encoding="utf-8")
second.write_text("{}", encoding="utf-8")
ignored.write_text("ignored", encoding="utf-8")
monkeypatch.setenv("TTVDROPS_PENDING_DIR", str(tmp_path))
with patch("twitch.tasks.import_twitch_file.delay") as delay_mock:
scan_pending_twitch_files.run()
delay_mock.assert_has_calls([call(str(first)), call(str(second))], any_order=True)
assert delay_mock.call_count == 2
def test_scan_pending_twitch_files_skips_when_env_missing(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""It should do nothing when TTVDROPS_PENDING_DIR is not configured."""
monkeypatch.delenv("TTVDROPS_PENDING_DIR", raising=False)
with patch("twitch.tasks.import_twitch_file.delay") as delay_mock:
scan_pending_twitch_files.run()
delay_mock.assert_not_called()
def test_import_twitch_file_calls_importer_for_existing_file(tmp_path: Path) -> None:
"""It should run BetterImportDrops with the provided path when the file exists."""
source = tmp_path / "drops.json"
source.write_text("{}", encoding="utf-8")
with patch(
"twitch.management.commands.better_import_drops.Command.handle",
) as handle_mock:
import_twitch_file.run(str(source))
assert handle_mock.call_count == 1
assert handle_mock.call_args.kwargs["path"] == source
def test_import_twitch_file_retries_on_import_error(tmp_path: Path) -> None:
"""It should retry when the importer raises an exception."""
source = tmp_path / "drops.json"
source.write_text("{}", encoding="utf-8")
error = RuntimeError("boom")
with (
patch(
"twitch.management.commands.better_import_drops.Command.handle",
side_effect=error,
),
patch.object(
import_twitch_file,
"retry",
side_effect=RuntimeError("retried"),
) as retry_mock,
pytest.raises(RuntimeError, match="retried"),
):
import_twitch_file.run(str(source))
retry_mock.assert_called_once_with(exc=error)
def test_download_and_save_skips_when_image_already_cached() -> None:
"""It should not download when the target ImageField already has a file name."""
file_field = MagicMock()
file_field.name = "already-there.jpg"
with patch("twitch.tasks.httpx.Client") as client_mock:
result = _download_and_save(
url="https://example.com/test.jpg",
name="game-1",
file_field=file_field,
)
assert result is False
client_mock.assert_not_called()
def test_download_and_save_saves_downloaded_content() -> None:
"""It should save downloaded bytes and trigger modern format conversion."""
file_field = MagicMock()
file_field.name = ""
file_field.path = "C:/cache/game-1.png"
response = MagicMock()
response.content = b"img-bytes"
response.raise_for_status.return_value = None
client = MagicMock()
client.get.return_value = response
client_cm = MagicMock()
client_cm.__enter__.return_value = client
with (
patch("twitch.tasks.httpx.Client", return_value=client_cm),
patch("twitch.tasks._convert_to_modern_formats") as convert_mock,
):
result = _download_and_save(
url="https://example.com/path/picture.png",
name="game-1",
file_field=file_field,
)
assert result is True
file_field.save.assert_called_once()
assert file_field.save.call_args.args[0] == "game-1.png"
assert file_field.save.call_args.kwargs["save"] is True
convert_mock.assert_called_once_with(Path("C:/cache/game-1.png"))
def test_download_and_save_returns_false_on_http_error() -> None:
"""It should return False when HTTP requests fail."""
file_field = MagicMock()
file_field.name = ""
client = MagicMock()
client.get.side_effect = httpx.HTTPError("network down")
client_cm = MagicMock()
client_cm.__enter__.return_value = client
with patch("twitch.tasks.httpx.Client", return_value=client_cm):
result = _download_and_save(
url="https://example.com/path/picture.png",
name="game-1",
file_field=file_field,
)
assert result is False
file_field.save.assert_not_called()
def test_download_game_image_downloads_normalized_twitch_url() -> None:
"""It should normalize Twitch box art URLs before downloading."""
game = SimpleNamespace(
box_art="https://static-cdn.jtvnw.net/ttv-boxart/1-{width}x{height}.jpg",
twitch_id="123",
box_art_file=MagicMock(),
)
with (
patch("twitch.models.Game.objects.get", return_value=game),
patch(
"twitch.utils.is_twitch_box_art_url",
return_value=True,
) as is_twitch_mock,
patch(
"twitch.utils.normalize_twitch_box_art_url",
return_value="https://cdn.example.com/box.jpg",
) as normalize_mock,
patch("twitch.tasks._download_and_save") as save_mock,
):
download_game_image.run(1)
is_twitch_mock.assert_called_once_with(game.box_art)
normalize_mock.assert_called_once_with(game.box_art)
save_mock.assert_called_once_with(
"https://cdn.example.com/box.jpg",
"123",
game.box_art_file,
)
def test_download_game_image_retries_on_download_error() -> None:
"""It should retry when the image download helper fails unexpectedly."""
game = SimpleNamespace(
box_art="https://static-cdn.jtvnw.net/ttv-boxart/1-{width}x{height}.jpg",
twitch_id="123",
box_art_file=MagicMock(),
)
error = RuntimeError("boom")
with (
patch("twitch.models.Game.objects.get", return_value=game),
patch("twitch.utils.is_twitch_box_art_url", return_value=True),
patch(
"twitch.utils.normalize_twitch_box_art_url",
return_value="https://cdn.example.com/box.jpg",
),
patch("twitch.tasks._download_and_save", side_effect=error),
patch.object(
download_game_image,
"retry",
side_effect=RuntimeError("retried"),
) as retry_mock,
pytest.raises(RuntimeError, match="retried"),
):
download_game_image.run(1)
retry_mock.assert_called_once_with(exc=error)
def test_download_all_images_calls_expected_commands() -> None:
"""It should invoke both image import management commands."""
with patch("twitch.tasks.call_command") as call_command_mock:
download_all_images.run()
call_command_mock.assert_has_calls([
call("download_box_art"),
call("download_campaign_images", model="all"),
])
def test_import_chat_badges_retries_on_error() -> None:
"""It should retry when import_chat_badges command execution fails."""
error = RuntimeError("boom")
with (
patch("twitch.tasks.call_command", side_effect=error),
patch.object(
import_chat_badges,
"retry",
side_effect=RuntimeError("retried"),
) as retry_mock,
pytest.raises(RuntimeError, match="retried"),
):
import_chat_badges.run()
retry_mock.assert_called_once_with(exc=error)
def test_backup_database_calls_backup_command() -> None:
"""It should invoke the backup_db management command."""
with patch("twitch.tasks.call_command") as call_command_mock:
backup_database.run()
call_command_mock.assert_called_once_with("backup_db")
def test_convert_to_modern_formats_skips_non_image_files(tmp_path: Path) -> None:
"""It should skip files that are not jpg, jpeg, or png."""
text_file = tmp_path / "document.txt"
text_file.write_text("Not an image", encoding="utf-8")
with patch("PIL.Image.open") as open_mock:
_convert_to_modern_formats(text_file)
open_mock.assert_not_called()
def test_convert_to_modern_formats_skips_missing_files(tmp_path: Path) -> None:
"""It should skip files that do not exist."""
missing_file = tmp_path / "nonexistent.jpg"
with patch("PIL.Image.open") as open_mock:
_convert_to_modern_formats(missing_file)
open_mock.assert_not_called()
def test_convert_to_modern_formats_converts_rgb_image(tmp_path: Path) -> None:
"""It should save image in WebP and AVIF formats for RGB images."""
original = tmp_path / "image.jpg"
original.write_bytes(b"fake-jpeg")
mock_image = MagicMock()
mock_image.mode = "RGB"
mock_copy = MagicMock()
mock_image.copy.return_value = mock_copy
with (
patch("PIL.Image.open") as open_mock,
):
open_mock.return_value.__enter__.return_value = mock_image
_convert_to_modern_formats(original)
open_mock.assert_called_once_with(original)
assert mock_copy.save.call_count == 2
webp_call = [c for c in mock_copy.save.call_args_list if "webp" in str(c)]
avif_call = [c for c in mock_copy.save.call_args_list if "avif" in str(c)]
assert len(webp_call) == 1
assert len(avif_call) == 1
def test_convert_to_modern_formats_converts_rgba_image_with_background(
tmp_path: Path,
) -> None:
"""It should create RGB background and paste RGBA image onto it."""
original = tmp_path / "image.png"
original.write_bytes(b"fake-png")
mock_rgba = MagicMock()
mock_rgba.mode = "RGBA"
mock_rgba.split.return_value = [MagicMock(), MagicMock(), MagicMock(), MagicMock()]
mock_rgba.size = (100, 100)
mock_rgba.convert.return_value = mock_rgba
mock_bg = MagicMock()
with (
patch("PIL.Image.open") as open_mock,
patch("PIL.Image.new", return_value=mock_bg) as new_mock,
):
open_mock.return_value.__enter__.return_value = mock_rgba
_convert_to_modern_formats(original)
new_mock.assert_called_once_with("RGB", (100, 100), (255, 255, 255))
mock_bg.paste.assert_called_once()
assert mock_bg.save.call_count == 2
def test_convert_to_modern_formats_handles_conversion_error(tmp_path: Path) -> None:
"""It should gracefully handle format conversion failures."""
original = tmp_path / "image.jpg"
original.write_bytes(b"fake-jpeg")
mock_image = MagicMock()
mock_image.mode = "RGB"
mock_image.copy.return_value = MagicMock()
mock_image.copy.return_value.save.side_effect = Exception("PIL error")
with (
patch("PIL.Image.open") as open_mock,
):
open_mock.return_value.__enter__.return_value = mock_image
_convert_to_modern_formats(original) # Should not raise
def test_download_campaign_image_downloads_when_url_exists() -> None:
"""It should download and save campaign image when image_url is present."""
campaign = SimpleNamespace(
image_url="https://example.com/campaign.jpg",
twitch_id="camp123",
image_file=MagicMock(),
)
with (
patch("twitch.models.DropCampaign.objects.get", return_value=campaign),
patch("twitch.tasks._download_and_save", return_value=True) as save_mock,
):
download_campaign_image.run(1)
save_mock.assert_called_once_with(
"https://example.com/campaign.jpg",
"camp123",
campaign.image_file,
)
def test_download_campaign_image_skips_when_no_url() -> None:
"""It should skip when campaign has no image_url."""
campaign = SimpleNamespace(
image_url=None,
twitch_id="camp123",
image_file=MagicMock(),
)
with (
patch("twitch.models.DropCampaign.objects.get", return_value=campaign),
patch("twitch.tasks._download_and_save") as save_mock,
):
download_campaign_image.run(1)
save_mock.assert_not_called()
def test_download_campaign_image_skips_when_not_found() -> None:
"""It should skip when campaign does not exist."""
with (
patch(
"twitch.models.DropCampaign.objects.get",
side_effect=DropCampaign.DoesNotExist(),
),
patch("twitch.tasks._download_and_save") as save_mock,
):
download_campaign_image.run(1)
save_mock.assert_not_called()
def test_download_campaign_image_retries_on_error() -> None:
"""It should retry when image download fails unexpectedly."""
campaign = SimpleNamespace(
image_url="https://example.com/campaign.jpg",
twitch_id="camp123",
image_file=MagicMock(),
)
error = RuntimeError("boom")
with (
patch("twitch.models.DropCampaign.objects.get", return_value=campaign),
patch("twitch.tasks._download_and_save", side_effect=error),
patch.object(
download_campaign_image,
"retry",
side_effect=RuntimeError("retried"),
) as retry_mock,
pytest.raises(RuntimeError, match="retried"),
):
download_campaign_image.run(1)
retry_mock.assert_called_once_with(exc=error)
def test_download_benefit_image_downloads_when_url_exists() -> None:
"""It should download and save benefit image when image_asset_url is present."""
benefit = SimpleNamespace(
image_asset_url="https://example.com/benefit.png",
twitch_id="benefit123",
image_file=MagicMock(),
)
with (
patch("twitch.models.DropBenefit.objects.get", return_value=benefit),
patch("twitch.tasks._download_and_save", return_value=True) as save_mock,
):
download_benefit_image.run(1)
save_mock.assert_called_once_with(
url="https://example.com/benefit.png",
name="benefit123",
file_field=benefit.image_file,
)
def test_download_benefit_image_skips_when_no_url() -> None:
"""It should skip when benefit has no image_asset_url."""
benefit = SimpleNamespace(
image_asset_url=None,
twitch_id="benefit123",
image_file=MagicMock(),
)
with (
patch("twitch.models.DropBenefit.objects.get", return_value=benefit),
patch("twitch.tasks._download_and_save") as save_mock,
):
download_benefit_image.run(1)
save_mock.assert_not_called()
def test_download_benefit_image_skips_when_not_found() -> None:
"""It should skip when benefit does not exist."""
with (
patch(
"twitch.models.DropBenefit.objects.get",
side_effect=DropBenefit.DoesNotExist(),
),
patch("twitch.tasks._download_and_save") as save_mock,
):
download_benefit_image.run(1)
save_mock.assert_not_called()
def test_download_benefit_image_retries_on_error() -> None:
"""It should retry when image download fails unexpectedly."""
benefit = SimpleNamespace(
image_asset_url="https://example.com/benefit.png",
twitch_id="benefit123",
image_file=MagicMock(),
)
error = RuntimeError("boom")
with (
patch("twitch.models.DropBenefit.objects.get", return_value=benefit),
patch("twitch.tasks._download_and_save", side_effect=error),
patch.object(
download_benefit_image,
"retry",
side_effect=RuntimeError("retried"),
) as retry_mock,
pytest.raises(RuntimeError, match="retried"),
):
download_benefit_image.run(1)
retry_mock.assert_called_once_with(exc=error)
def test_download_reward_campaign_image_downloads_when_url_exists() -> None:
"""It should download and save reward campaign image when image_url is present."""
reward = SimpleNamespace(
image_url="https://example.com/reward.jpg",
twitch_id="reward123",
image_file=MagicMock(),
)
with (
patch("twitch.models.RewardCampaign.objects.get", return_value=reward),
patch("twitch.tasks._download_and_save", return_value=True) as save_mock,
):
download_reward_campaign_image.run(1)
save_mock.assert_called_once_with(
"https://example.com/reward.jpg",
"reward123",
reward.image_file,
)
def test_download_reward_campaign_image_skips_when_no_url() -> None:
"""It should skip when reward has no image_url."""
reward = SimpleNamespace(
image_url=None,
twitch_id="reward123",
image_file=MagicMock(),
)
with (
patch("twitch.models.RewardCampaign.objects.get", return_value=reward),
patch("twitch.tasks._download_and_save") as save_mock,
):
download_reward_campaign_image.run(1)
save_mock.assert_not_called()
def test_download_reward_campaign_image_skips_when_not_found() -> None:
"""It should skip when reward does not exist."""
with (
patch(
"twitch.models.RewardCampaign.objects.get",
side_effect=RewardCampaign.DoesNotExist(),
),
patch("twitch.tasks._download_and_save") as save_mock,
):
download_reward_campaign_image.run(1)
save_mock.assert_not_called()
def test_download_reward_campaign_image_retries_on_error() -> None:
"""It should retry when image download fails unexpectedly."""
reward = SimpleNamespace(
image_url="https://example.com/reward.jpg",
twitch_id="reward123",
image_file=MagicMock(),
)
error = RuntimeError("boom")
with (
patch("twitch.models.RewardCampaign.objects.get", return_value=reward),
patch("twitch.tasks._download_and_save", side_effect=error),
patch.object(
download_reward_campaign_image,
"retry",
side_effect=RuntimeError("retried"),
) as retry_mock,
pytest.raises(RuntimeError, match="retried"),
):
download_reward_campaign_image.run(1)
retry_mock.assert_called_once_with(exc=error)