Compare commits
No commits in common. "66ea46cf2331470d9e4ac20c134161b55e64e4bc" and "7a6273c4a88769cd20a6ffaa7c1e6a2c1a1b6499" have entirely different histories.
66ea46cf23
...
7a6273c4a8
26 changed files with 182 additions and 2173 deletions
28
.env.example
28
.env.example
|
|
@ -1,13 +1,7 @@
|
||||||
# Environment variables for ttvdrops Django application
|
# Django Configuration
|
||||||
# Copy this file to `.env` and fill in the appropriate values before running the application.
|
# Set to False in production
|
||||||
# For local development, you can use the default values provided here, but make sure to change the secret key and Twitch API credentials for production use.
|
|
||||||
|
|
||||||
# Debug mode (set to False in production)
|
|
||||||
DEBUG=True
|
DEBUG=True
|
||||||
|
|
||||||
# Base URL used for absolute URL generation
|
|
||||||
BASE_URL=https://ttvdrops.lovinator.space
|
|
||||||
|
|
||||||
# Django Secret Key
|
# Django Secret Key
|
||||||
# Generate a new secret key for production: python -c 'from django.core.management.utils import get_random_secret_key; print(get_random_secret_key())'
|
# Generate a new secret key for production: python -c 'from django.core.management.utils import get_random_secret_key; print(get_random_secret_key())'
|
||||||
DJANGO_SECRET_KEY=your-secret-key-here
|
DJANGO_SECRET_KEY=your-secret-key-here
|
||||||
|
|
@ -17,8 +11,6 @@ DJANGO_SECRET_KEY=your-secret-key-here
|
||||||
# You can use either an app access token or user access token
|
# You can use either an app access token or user access token
|
||||||
TWITCH_CLIENT_ID=your-twitch-client-id
|
TWITCH_CLIENT_ID=your-twitch-client-id
|
||||||
TWITCH_CLIENT_SECRET=your-twitch-client-secret
|
TWITCH_CLIENT_SECRET=your-twitch-client-secret
|
||||||
# Optional: if omitted, some commands can fetch an app access token using the client credentials
|
|
||||||
TWITCH_ACCESS_TOKEN=
|
|
||||||
|
|
||||||
# Email Configuration
|
# Email Configuration
|
||||||
# SMTP Host (examples below)
|
# SMTP Host (examples below)
|
||||||
|
|
@ -50,24 +42,10 @@ POSTGRES_HOST=/run/postgresql
|
||||||
|
|
||||||
# Note: Changed from 5432 to 6432 to use PgBouncer
|
# Note: Changed from 5432 to 6432 to use PgBouncer
|
||||||
POSTGRES_PORT=5432
|
POSTGRES_PORT=5432
|
||||||
|
|
||||||
# Optional database connection tuning
|
|
||||||
# CONN_MAX_AGE=60
|
|
||||||
# CONN_HEALTH_CHECKS=True
|
|
||||||
# DB_CONNECT_TIMEOUT=10
|
|
||||||
|
|
||||||
# Use SQLite instead of PostgreSQL (useful for local development)
|
|
||||||
USE_SQLITE=False
|
|
||||||
|
|
||||||
# Where to store Twitch API responses
|
# Where to store Twitch API responses
|
||||||
TTVDROPS_IMPORTED_DIR=/mnt/fourteen/Data/Responses/imported
|
TTVDROPS_IMPORTED_DIR=/mnt/fourteen/Data/Responses/imported
|
||||||
|
|
||||||
# Where to store Twitch API responses that failed processing (e.g. due to missing fields or unrecognized formats)
|
|
||||||
TTVDROPS_BROKEN_DIR=/mnt/fourteen/Data/Responses/broken
|
TTVDROPS_BROKEN_DIR=/mnt/fourteen/Data/Responses/broken
|
||||||
|
|
||||||
# Where to store Twitch API responses that are pending processing (e.g. waiting for a periodic task to process them)
|
# Redis Configuration
|
||||||
TTVDROPS_PENDING_DIR=/mnt/fourteen/Data/Responses/pending
|
|
||||||
|
|
||||||
# Redis Configuration for caching and Celery
|
|
||||||
REDIS_URL_CACHE=unix:///var/run/redis/redis.sock?db=0
|
REDIS_URL_CACHE=unix:///var/run/redis/redis.sock?db=0
|
||||||
REDIS_URL_CELERY=unix:///var/run/redis/redis.sock?db=1
|
REDIS_URL_CELERY=unix:///var/run/redis/redis.sock?db=1
|
||||||
|
|
|
||||||
106
.github/copilot-instructions.md
vendored
106
.github/copilot-instructions.md
vendored
|
|
@ -1,40 +1,78 @@
|
||||||
# Project Guidelines
|
# Django Guidelines
|
||||||
|
|
||||||
## Code Style
|
## Python Best Practices
|
||||||
- Follow PEP 8 with a 120 character line limit.
|
- Follow PEP 8 with 120 character line limit
|
||||||
- Use double quotes for Python strings.
|
- Use double quotes for Python strings
|
||||||
- Keep comments focused on why, not what.
|
|
||||||
|
|
||||||
## Build and Test
|
## Django Best Practices
|
||||||
- Install/update dependencies with `uv sync -U`.
|
- Follow Django's "batteries included" philosophy - use built-in features before third-party packages
|
||||||
- Run tests with `uv run pytest`.
|
- Prioritize security and follow Django's security best practices
|
||||||
- Run Django commands with `uv run python manage.py <command>`.
|
- Use Django's ORM effectively
|
||||||
- Typical local sequence: `uv run python manage.py makemigrations`, `uv run python manage.py migrate`, `uv run python manage.py runserver`.
|
|
||||||
- Before finishing model changes, verify migrations with `uv run python manage.py makemigrations --check --dry-run`.
|
|
||||||
|
|
||||||
## Architecture
|
## Models
|
||||||
- This is a Django multi-app project:
|
- Add `__str__` methods to all models for a better admin interface
|
||||||
- `twitch/`, `kick/`, `chzzk/`, and `youtube/` hold platform-specific domain logic.
|
- Use `related_name` for foreign keys when needed
|
||||||
- `core/` holds shared cross-app utilities (base URL, SEO, shared views/helpers).
|
- Define `Meta` class with appropriate options (ordering, verbose_name, etc.)
|
||||||
- `config/` holds settings, URLs, WSGI, and Celery app wiring.
|
- Use `blank=True` for optional form fields, `null=True` for optional database fields
|
||||||
- Favor Django's batteries-included approach before adding new third-party dependencies.
|
|
||||||
- Follow MTV with fat models and thin views; keep template logic simple.
|
|
||||||
|
|
||||||
## Conventions
|
## Views
|
||||||
- Models should include `__str__` and explicit `Meta` options (ordering/indexes/verbose names) where relevant.
|
- Always validate and sanitize user input
|
||||||
- Use `blank=True` for optional form fields and `null=True` for optional database fields.
|
- Handle exceptions gracefully with try/except blocks
|
||||||
- Prefer `get_object_or_404` over manual DoesNotExist handling in views.
|
- Use `get_object_or_404` instead of manual exception handling
|
||||||
- URL patterns should use descriptive names and trailing slashes.
|
- Implement proper pagination for list views
|
||||||
- For schema models that parse external APIs, prefer Pydantic `extra="forbid"` and normalize variant payloads with validators.
|
|
||||||
- Add tests for both positive and negative scenarios, especially when handling multiple upstream payload formats.
|
|
||||||
|
|
||||||
## Environment and Pitfalls
|
## URLs
|
||||||
- `DJANGO_SECRET_KEY` must be set; the app exits if it is missing.
|
- Use descriptive URL names for reverse URL lookups
|
||||||
- Database defaults to PostgreSQL unless `USE_SQLITE=true`; tests use in-memory SQLite.
|
- Always end URL patterns with a trailing slash
|
||||||
- Redis is used for cache and Celery. Keep `REDIS_URL_CACHE` and `REDIS_URL_CELERY` configured in environments that run background tasks.
|
|
||||||
- Static and media files are stored under the platform data directory configured in `config/settings.py`, not in the repo tree.
|
|
||||||
|
|
||||||
## Documentation
|
## Templates
|
||||||
- Link to existing docs instead of duplicating them.
|
- Use template inheritance with base templates
|
||||||
- Use `README.md` as the source of truth for system setup, deployment, Celery, and management command examples.
|
- Use template tags and filters for common operations
|
||||||
- Only create new documentation files when explicitly requested.
|
- Avoid complex logic in templates - move it to views or template tags
|
||||||
|
- Use static files properly with `{% load static %}`
|
||||||
|
- Avoid hiding controls with `<details>` or other collapse elements unless explicitly needed
|
||||||
|
- Prioritize accessibility and discoverability of features
|
||||||
|
|
||||||
|
## Settings
|
||||||
|
- Use environment variables in a single `settings.py` file
|
||||||
|
- Never commit secrets to version control
|
||||||
|
|
||||||
|
## Database
|
||||||
|
- Use migrations for all database changes
|
||||||
|
- Optimize queries with `select_related` and `prefetch_related`
|
||||||
|
- Use database indexes for frequently queried fields
|
||||||
|
- Avoid N+1 query problems
|
||||||
|
|
||||||
|
## Testing
|
||||||
|
- Always write unit tests and check that they pass for new features
|
||||||
|
- Test both positive and negative scenarios
|
||||||
|
|
||||||
|
## Pydantic Schemas
|
||||||
|
- Use `extra="forbid"` in model_config to catch API changes and new fields from external APIs
|
||||||
|
- Explicitly document all optional fields from different API operation formats
|
||||||
|
- This ensures validation failures alert you to API changes rather than silently ignoring new data
|
||||||
|
- When supporting multiple API formats (e.g., ViewerDropsDashboard vs Inventory operations):
|
||||||
|
- Make fields optional that differ between formats
|
||||||
|
- Use field validators or model validators to normalize data between formats
|
||||||
|
- Write tests covering both operation formats to ensure backward compatibility
|
||||||
|
|
||||||
|
## Architectural Patterns
|
||||||
|
- Follow Django's MTV (Model-Template-View) paradigm; keep business logic in models/services and presentation in templates
|
||||||
|
- Favor fat models and thin views; extract reusable business logic into services/helpers when complexity grows
|
||||||
|
- Keep forms and serializers (if added) responsible for validation; avoid duplicating validation in views
|
||||||
|
- Avoid circular dependencies; keep modules cohesive and decoupled
|
||||||
|
- Use settings modules and environment variables to configure behavior, not hardcoded constants
|
||||||
|
|
||||||
|
## Technology Stack
|
||||||
|
- Python 3, Django, PostgreSQL, Redis (Valkey), Celery for background tasks
|
||||||
|
- HTML templates with Django templating; static assets served from `static/` and collected to `staticfiles/`
|
||||||
|
- Management commands in `twitch/management/commands/` for data import and maintenance tasks
|
||||||
|
- Use `pyproject.toml` + uv for dependency and environment management
|
||||||
|
- Use `uv run python manage.py <command>` to run Django management commands
|
||||||
|
- Use `uv run pytest` to run tests
|
||||||
|
|
||||||
|
## Documentation & Project Organization
|
||||||
|
- Only create documentation files when explicitly requested by the user
|
||||||
|
- Do not generate markdown files summarizing work or changes unless asked
|
||||||
|
- Keep code comments focused on "why" not "what"; the code itself should be clear
|
||||||
|
- Update existing documentation rather than creating new files
|
||||||
|
|
|
||||||
48
README.md
48
README.md
|
|
@ -61,20 +61,13 @@ sudo systemctl enable --now ttvdrops.service
|
||||||
curl --unix-socket /run/ttvdrops/ttvdrops.sock https://ttvdrops.lovinator.space
|
curl --unix-socket /run/ttvdrops/ttvdrops.sock https://ttvdrops.lovinator.space
|
||||||
```
|
```
|
||||||
|
|
||||||
Enable Celery worker and Beat services:
|
Install and enable timers:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
sudo install -m 0644 tools/systemd/ttvdrops-celery-worker.service /etc/systemd/system/
|
sudo install -m 0644 tools/systemd/ttvdrops-backup.{service,timer} /etc/systemd/system/
|
||||||
sudo install -m 0644 tools/systemd/ttvdrops-celery-beat.service /etc/systemd/system/
|
sudo install -m 0644 tools/systemd/ttvdrops-import-drops.{service,timer} /etc/systemd/system/
|
||||||
sudo systemctl daemon-reload
|
sudo systemctl daemon-reload
|
||||||
sudo systemctl enable --now ttvdrops-celery-worker.service
|
sudo systemctl enable --now ttvdrops-backup.timer ttvdrops-import-drops.timer
|
||||||
sudo systemctl enable --now ttvdrops-celery-beat.service
|
|
||||||
```
|
|
||||||
|
|
||||||
Set `TTVDROPS_PENDING_DIR` in `.env` to the directory where Twitch JSON drop files are dropped:
|
|
||||||
|
|
||||||
```env
|
|
||||||
TTVDROPS_PENDING_DIR=/mnt/fourteen/Data/Responses/pending
|
|
||||||
```
|
```
|
||||||
|
|
||||||
## Development
|
## Development
|
||||||
|
|
@ -90,40 +83,13 @@ uv run pytest
|
||||||
|
|
||||||
## Celery
|
## Celery
|
||||||
|
|
||||||
Celery powers all periodic and background work. Three services are required:
|
Start a worker:
|
||||||
|
|
||||||
| Service file | Queues | Purpose |
|
|
||||||
|---|---|---|
|
|
||||||
| `ttvdrops-celery-worker.service` | `imports`, `api-fetches`, `default` | Twitch/Kick/Chzzk imports, backup |
|
|
||||||
| `ttvdrops-celery-beat.service` | — | Periodic task scheduler (Beat) |
|
|
||||||
|
|
||||||
Start workers manually during development:
|
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# All-in-one worker (development)
|
uv run celery -A config worker --loglevel=info
|
||||||
uv run celery -A config worker --queues imports,api-fetches,image-downloads,default --loglevel=info
|
uv run celery -A config beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler
|
||||||
|
|
||||||
# Beat scheduler (requires database Beat tables — run migrate first)
|
|
||||||
uv run celery -A config beat --scheduler django_celery_beat.schedulers:DatabaseScheduler --loglevel=info
|
|
||||||
|
|
||||||
# Monitor tasks in the browser (optional)
|
|
||||||
uv run celery -A config flower
|
|
||||||
```
|
```
|
||||||
|
|
||||||
**Periodic tasks configured via `CELERY_BEAT_SCHEDULE`:**
|
|
||||||
|
|
||||||
| Task | Schedule | Queue |
|
|
||||||
|---|---|---|
|
|
||||||
| `twitch.tasks.scan_pending_twitch_files` | every 10 s | `imports` |
|
|
||||||
| `kick.tasks.import_kick_drops` | :01/:16/:31/:46 | `api-fetches` |
|
|
||||||
| `chzzk.tasks.discover_chzzk_campaigns` | every 2 h | `api-fetches` |
|
|
||||||
| `twitch.tasks.backup_database` | daily 02:15 UTC | `default` |
|
|
||||||
| `twitch.tasks.download_all_images` | Sunday 04:00 UTC | `image-downloads` |
|
|
||||||
| `twitch.tasks.import_chat_badges` | Sunday 03:00 UTC | `api-fetches` |
|
|
||||||
|
|
||||||
Image downloads also run **immediately** on record creation via `post_save` signals
|
|
||||||
(`Game`, `DropCampaign`, `DropBenefit`, `RewardCampaign`).
|
|
||||||
|
|
||||||
## Import Drops
|
## Import Drops
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
|
|
|
||||||
|
|
@ -1,26 +0,0 @@
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import logging
|
|
||||||
|
|
||||||
from celery import shared_task
|
|
||||||
from django.core.management import call_command
|
|
||||||
|
|
||||||
logger = logging.getLogger("ttvdrops.tasks")
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task(bind=True, queue="imports", max_retries=3, default_retry_delay=60)
|
|
||||||
def import_chzzk_campaign_task(self, campaign_no: int) -> None: # noqa: ANN001
|
|
||||||
"""Import a single Chzzk campaign by its campaign number."""
|
|
||||||
try:
|
|
||||||
call_command("import_chzzk_campaign", str(campaign_no))
|
|
||||||
except Exception as exc:
|
|
||||||
raise self.retry(exc=exc) from exc
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task(bind=True, queue="api-fetches", max_retries=3, default_retry_delay=120)
|
|
||||||
def discover_chzzk_campaigns(self) -> None: # noqa: ANN001
|
|
||||||
"""Discover and import the latest Chzzk campaigns (equivalent to --latest flag)."""
|
|
||||||
try:
|
|
||||||
call_command("import_chzzk_campaign", latest=True)
|
|
||||||
except Exception as exc:
|
|
||||||
raise self.retry(exc=exc) from exc
|
|
||||||
|
|
@ -1,481 +0,0 @@
|
||||||
from datetime import timedelta
|
|
||||||
from io import StringIO
|
|
||||||
from unittest.mock import Mock
|
|
||||||
from unittest.mock import call
|
|
||||||
from unittest.mock import patch
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
import requests
|
|
||||||
from django.core.management import CommandError
|
|
||||||
from django.core.management import call_command
|
|
||||||
from django.test import TestCase
|
|
||||||
from django.utils import timezone
|
|
||||||
|
|
||||||
from chzzk.management.commands.import_chzzk_campaign import (
|
|
||||||
Command as ImportChzzkCampaignCommand,
|
|
||||||
)
|
|
||||||
from chzzk.models import ChzzkCampaign
|
|
||||||
from chzzk.models import ChzzkReward
|
|
||||||
from chzzk.schemas import ChzzkCampaignV2
|
|
||||||
from chzzk.schemas import ChzzkRewardV2
|
|
||||||
|
|
||||||
|
|
||||||
class ImportChzzkCampaignRangeCommandTest(TestCase):
|
|
||||||
"""Tests for the import_chzzk_campaign_range management command."""
|
|
||||||
|
|
||||||
def test_imports_campaigns_in_range_descending_by_default(self) -> None:
|
|
||||||
"""Test that campaigns are imported in descending order when start > end."""
|
|
||||||
stdout = StringIO()
|
|
||||||
with patch(
|
|
||||||
"chzzk.management.commands.import_chzzk_campaign_range.call_command",
|
|
||||||
) as call_command_mock:
|
|
||||||
call_command(
|
|
||||||
"import_chzzk_campaign_range",
|
|
||||||
"5",
|
|
||||||
"3",
|
|
||||||
stdout=stdout,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Verify call_command was called for each campaign in descending order
|
|
||||||
expected_calls = [
|
|
||||||
call("import_chzzk_campaign", "5"),
|
|
||||||
call("import_chzzk_campaign", "4"),
|
|
||||||
call("import_chzzk_campaign", "3"),
|
|
||||||
]
|
|
||||||
call_command_mock.assert_has_calls(expected_calls)
|
|
||||||
assert call_command_mock.call_count == 3
|
|
||||||
|
|
||||||
def test_imports_campaigns_in_range_ascending_by_default(self) -> None:
|
|
||||||
"""Test that campaigns are imported in ascending order when start < end."""
|
|
||||||
stdout = StringIO()
|
|
||||||
with patch(
|
|
||||||
"chzzk.management.commands.import_chzzk_campaign_range.call_command",
|
|
||||||
) as call_command_mock:
|
|
||||||
call_command(
|
|
||||||
"import_chzzk_campaign_range",
|
|
||||||
"3",
|
|
||||||
"5",
|
|
||||||
stdout=stdout,
|
|
||||||
)
|
|
||||||
|
|
||||||
expected_calls = [
|
|
||||||
call("import_chzzk_campaign", "3"),
|
|
||||||
call("import_chzzk_campaign", "4"),
|
|
||||||
call("import_chzzk_campaign", "5"),
|
|
||||||
]
|
|
||||||
call_command_mock.assert_has_calls(expected_calls)
|
|
||||||
assert call_command_mock.call_count == 3
|
|
||||||
|
|
||||||
def test_imports_single_campaign_when_start_equals_end(self) -> None:
|
|
||||||
"""Test that a single campaign is imported when start equals end."""
|
|
||||||
stdout = StringIO()
|
|
||||||
with patch(
|
|
||||||
"chzzk.management.commands.import_chzzk_campaign_range.call_command",
|
|
||||||
) as call_command_mock:
|
|
||||||
call_command(
|
|
||||||
"import_chzzk_campaign_range",
|
|
||||||
"5",
|
|
||||||
"5",
|
|
||||||
stdout=stdout,
|
|
||||||
)
|
|
||||||
|
|
||||||
call_command_mock.assert_called_once_with("import_chzzk_campaign", "5")
|
|
||||||
|
|
||||||
def test_respects_custom_step_parameter(self) -> None:
|
|
||||||
"""Test that custom step parameter is respected."""
|
|
||||||
stdout = StringIO()
|
|
||||||
with patch(
|
|
||||||
"chzzk.management.commands.import_chzzk_campaign_range.call_command",
|
|
||||||
) as call_command_mock:
|
|
||||||
call_command(
|
|
||||||
"import_chzzk_campaign_range",
|
|
||||||
"1",
|
|
||||||
"10",
|
|
||||||
"--step",
|
|
||||||
"2",
|
|
||||||
stdout=stdout,
|
|
||||||
)
|
|
||||||
|
|
||||||
expected_calls = [
|
|
||||||
call("import_chzzk_campaign", "1"),
|
|
||||||
call("import_chzzk_campaign", "3"),
|
|
||||||
call("import_chzzk_campaign", "5"),
|
|
||||||
call("import_chzzk_campaign", "7"),
|
|
||||||
call("import_chzzk_campaign", "9"),
|
|
||||||
]
|
|
||||||
call_command_mock.assert_has_calls(expected_calls)
|
|
||||||
assert call_command_mock.call_count == 5
|
|
||||||
|
|
||||||
def test_respects_custom_negative_step(self) -> None:
|
|
||||||
"""Test that custom negative step parameter works correctly."""
|
|
||||||
stdout = StringIO()
|
|
||||||
with patch(
|
|
||||||
"chzzk.management.commands.import_chzzk_campaign_range.call_command",
|
|
||||||
) as call_command_mock:
|
|
||||||
call_command(
|
|
||||||
"import_chzzk_campaign_range",
|
|
||||||
"10",
|
|
||||||
"1",
|
|
||||||
"--step",
|
|
||||||
"-2",
|
|
||||||
stdout=stdout,
|
|
||||||
)
|
|
||||||
|
|
||||||
expected_calls = [
|
|
||||||
call("import_chzzk_campaign", "10"),
|
|
||||||
call("import_chzzk_campaign", "8"),
|
|
||||||
call("import_chzzk_campaign", "6"),
|
|
||||||
call("import_chzzk_campaign", "4"),
|
|
||||||
call("import_chzzk_campaign", "2"),
|
|
||||||
]
|
|
||||||
call_command_mock.assert_has_calls(expected_calls)
|
|
||||||
assert call_command_mock.call_count == 5
|
|
||||||
|
|
||||||
def test_handles_command_error_gracefully(self) -> None:
|
|
||||||
"""Test that CommandError from import_chzzk_campaign is caught and reported."""
|
|
||||||
stdout = StringIO()
|
|
||||||
stderr = StringIO()
|
|
||||||
|
|
||||||
def side_effect(command: str, *args: str, **kwargs: object) -> None:
|
|
||||||
if "4" in args:
|
|
||||||
msg = "Campaign 4 not found"
|
|
||||||
raise CommandError(msg)
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"chzzk.management.commands.import_chzzk_campaign_range.call_command",
|
|
||||||
side_effect=side_effect,
|
|
||||||
):
|
|
||||||
call_command(
|
|
||||||
"import_chzzk_campaign_range",
|
|
||||||
"3",
|
|
||||||
"5",
|
|
||||||
stdout=stdout,
|
|
||||||
stderr=stderr,
|
|
||||||
)
|
|
||||||
|
|
||||||
output = stdout.getvalue()
|
|
||||||
assert "Importing campaign 3" in output
|
|
||||||
assert "Importing campaign 4" in output
|
|
||||||
assert "Importing campaign 5" in output
|
|
||||||
assert "Failed campaign 4" in output
|
|
||||||
assert "Campaign 4 not found" in output
|
|
||||||
assert "Batch import complete" in output
|
|
||||||
|
|
||||||
def test_continues_after_command_error(self) -> None:
|
|
||||||
"""Test that import continues after encountering a CommandError."""
|
|
||||||
call_count = 0
|
|
||||||
|
|
||||||
def side_effect(command: str, campaign_no: str) -> None:
|
|
||||||
nonlocal call_count
|
|
||||||
call_count += 1
|
|
||||||
if campaign_no == "4":
|
|
||||||
msg = "Campaign 4 error"
|
|
||||||
raise CommandError(msg)
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"chzzk.management.commands.import_chzzk_campaign_range.call_command",
|
|
||||||
side_effect=side_effect,
|
|
||||||
):
|
|
||||||
call_command(
|
|
||||||
"import_chzzk_campaign_range",
|
|
||||||
"3",
|
|
||||||
"5",
|
|
||||||
stdout=StringIO(),
|
|
||||||
)
|
|
||||||
|
|
||||||
# Verify all campaigns were attempted
|
|
||||||
assert call_count == 3
|
|
||||||
|
|
||||||
def test_outputs_success_messages(self) -> None:
|
|
||||||
"""Test that success messages are written to stdout."""
|
|
||||||
stdout = StringIO()
|
|
||||||
with patch(
|
|
||||||
"chzzk.management.commands.import_chzzk_campaign_range.call_command",
|
|
||||||
):
|
|
||||||
call_command(
|
|
||||||
"import_chzzk_campaign_range",
|
|
||||||
"1",
|
|
||||||
"2",
|
|
||||||
stdout=stdout,
|
|
||||||
)
|
|
||||||
|
|
||||||
output: str = stdout.getvalue()
|
|
||||||
assert "Importing campaigns from 1 to 2 with step 1" in output
|
|
||||||
assert "Batch import complete" in output
|
|
||||||
|
|
||||||
def test_raises_error_when_step_is_zero(self) -> None:
|
|
||||||
"""Test that ValueError is raised when step is 0."""
|
|
||||||
with pytest.raises(ValueError, match="Step cannot be 0"):
|
|
||||||
call_command(
|
|
||||||
"import_chzzk_campaign_range",
|
|
||||||
"1",
|
|
||||||
"5",
|
|
||||||
"--step",
|
|
||||||
"0",
|
|
||||||
stdout=StringIO(),
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_handles_large_range(self) -> None:
|
|
||||||
"""Test that large ranges are handled correctly."""
|
|
||||||
stdout = StringIO()
|
|
||||||
with patch(
|
|
||||||
"chzzk.management.commands.import_chzzk_campaign_range.call_command",
|
|
||||||
) as call_command_mock:
|
|
||||||
call_command(
|
|
||||||
"import_chzzk_campaign_range",
|
|
||||||
"1",
|
|
||||||
"100",
|
|
||||||
"--step",
|
|
||||||
"10",
|
|
||||||
stdout=stdout,
|
|
||||||
)
|
|
||||||
|
|
||||||
assert call_command_mock.call_count == 10
|
|
||||||
first_call = call_command_mock.call_args_list[0]
|
|
||||||
assert first_call == call("import_chzzk_campaign", "1")
|
|
||||||
last_call = call_command_mock.call_args_list[-1]
|
|
||||||
assert last_call == call("import_chzzk_campaign", "91")
|
|
||||||
|
|
||||||
|
|
||||||
class ImportChzzkCampaignCommandTest(TestCase):
|
|
||||||
"""Tests for the import_chzzk_campaign management command."""
|
|
||||||
|
|
||||||
def _create_campaign(self, campaign_no: int) -> ChzzkCampaign:
|
|
||||||
now = timezone.now()
|
|
||||||
return ChzzkCampaign.objects.create(
|
|
||||||
campaign_no=campaign_no,
|
|
||||||
title=f"Campaign {campaign_no}",
|
|
||||||
description="Campaign description",
|
|
||||||
category_type="game",
|
|
||||||
category_id="1",
|
|
||||||
category_value="Game",
|
|
||||||
service_id="chzzk",
|
|
||||||
state="ACTIVE",
|
|
||||||
start_date=now - timedelta(days=1),
|
|
||||||
end_date=now + timedelta(days=1),
|
|
||||||
has_ios_based_reward=False,
|
|
||||||
drops_campaign_not_started=False,
|
|
||||||
source_api="unit-test",
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_requires_campaign_no_when_latest_not_used(self) -> None:
|
|
||||||
"""Command should fail without campaign_no unless --latest is provided."""
|
|
||||||
with pytest.raises(
|
|
||||||
CommandError,
|
|
||||||
match="campaign_no is required unless --latest is used",
|
|
||||||
):
|
|
||||||
call_command("import_chzzk_campaign", stdout=StringIO())
|
|
||||||
|
|
||||||
def test_imports_single_campaign_no(self) -> None:
|
|
||||||
"""Command should import the provided campaign number."""
|
|
||||||
with patch(
|
|
||||||
"chzzk.management.commands.import_chzzk_campaign.Command._import_campaign",
|
|
||||||
autospec=True,
|
|
||||||
) as import_campaign_mock:
|
|
||||||
call_command("import_chzzk_campaign", "42", stdout=StringIO())
|
|
||||||
|
|
||||||
assert import_campaign_mock.call_count == 1
|
|
||||||
assert import_campaign_mock.call_args.args[1] == 42
|
|
||||||
|
|
||||||
def test_latest_imports_candidates(self) -> None:
|
|
||||||
"""Command should import all candidates returned by get_campaign_import_candidates."""
|
|
||||||
with (
|
|
||||||
patch(
|
|
||||||
"chzzk.management.commands.import_chzzk_campaign.Command.get_campaign_import_candidates",
|
|
||||||
autospec=True,
|
|
||||||
return_value=[9, 10, 11],
|
|
||||||
),
|
|
||||||
patch(
|
|
||||||
"chzzk.management.commands.import_chzzk_campaign.Command._import_campaign",
|
|
||||||
autospec=True,
|
|
||||||
) as import_campaign_mock,
|
|
||||||
):
|
|
||||||
call_command("import_chzzk_campaign", "--latest", stdout=StringIO())
|
|
||||||
|
|
||||||
called_campaigns = [
|
|
||||||
call_.args[1] for call_ in import_campaign_mock.call_args_list
|
|
||||||
]
|
|
||||||
assert called_campaigns == [9, 10, 11]
|
|
||||||
|
|
||||||
def test_latest_with_no_candidates_exits_cleanly(self) -> None:
|
|
||||||
"""Command should not import anything when --latest has no candidates."""
|
|
||||||
stdout = StringIO()
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch(
|
|
||||||
"chzzk.management.commands.import_chzzk_campaign.Command.get_campaign_import_candidates",
|
|
||||||
autospec=True,
|
|
||||||
return_value=[],
|
|
||||||
),
|
|
||||||
patch(
|
|
||||||
"chzzk.management.commands.import_chzzk_campaign.Command._import_campaign",
|
|
||||||
autospec=True,
|
|
||||||
) as import_campaign_mock,
|
|
||||||
):
|
|
||||||
call_command("import_chzzk_campaign", "--latest", stdout=stdout)
|
|
||||||
|
|
||||||
assert import_campaign_mock.call_count == 0
|
|
||||||
assert "Nothing to import with --latest at this time." in stdout.getvalue()
|
|
||||||
|
|
||||||
def test_get_campaign_import_candidates_uses_initial_range_on_empty_db(
|
|
||||||
self,
|
|
||||||
) -> None:
|
|
||||||
"""When there are no campaigns, candidates should be 1..5."""
|
|
||||||
command: ImportChzzkCampaignCommand = ImportChzzkCampaignCommand()
|
|
||||||
candidates: list[int] = command.get_campaign_import_candidates()
|
|
||||||
assert candidates == [1, 2, 3, 4, 5]
|
|
||||||
|
|
||||||
def test_get_campaign_import_candidates_adds_backfill_and_new_candidates(
|
|
||||||
self,
|
|
||||||
) -> None:
|
|
||||||
"""Candidates should include missing IDs from latest-5..latest-1 plus latest+1..latest+5."""
|
|
||||||
self._create_campaign(10)
|
|
||||||
self._create_campaign(8)
|
|
||||||
self._create_campaign(6)
|
|
||||||
|
|
||||||
command: ImportChzzkCampaignCommand = ImportChzzkCampaignCommand()
|
|
||||||
candidates: list[int] = command.get_campaign_import_candidates()
|
|
||||||
|
|
||||||
assert candidates == [5, 7, 9, 11, 12, 13, 14, 15]
|
|
||||||
|
|
||||||
def test_get_campaign_import_candidates_ignores_outlier_max_campaign_no(
|
|
||||||
self,
|
|
||||||
) -> None:
|
|
||||||
"""If max campaign_no is an outlier, the second max should be used."""
|
|
||||||
self._create_campaign(250)
|
|
||||||
self._create_campaign(100_002_000)
|
|
||||||
|
|
||||||
command: ImportChzzkCampaignCommand = ImportChzzkCampaignCommand()
|
|
||||||
candidates: list[int] = command.get_campaign_import_candidates()
|
|
||||||
|
|
||||||
assert candidates == [245, 246, 247, 248, 249, 251, 252, 253, 254, 255]
|
|
||||||
|
|
||||||
def test_import_campaign_handles_http_error_with_json_message(self) -> None:
|
|
||||||
"""Import should fail gracefully on HTTP errors and include JSON message when present."""
|
|
||||||
command = ImportChzzkCampaignCommand()
|
|
||||||
|
|
||||||
response = Mock()
|
|
||||||
response.raise_for_status.side_effect = requests.HTTPError("404 Client Error")
|
|
||||||
response.headers = {"Content-Type": "application/json; charset=utf-8"}
|
|
||||||
response.json.return_value = {"message": "Campaign not found"}
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch(
|
|
||||||
"chzzk.management.commands.import_chzzk_campaign.requests.get",
|
|
||||||
return_value=response,
|
|
||||||
),
|
|
||||||
patch.object(command.stdout, "write") as stdout_write,
|
|
||||||
patch.object(command, "import_campaign_data") as import_campaign_data_mock,
|
|
||||||
):
|
|
||||||
command._import_campaign(12345)
|
|
||||||
|
|
||||||
assert import_campaign_data_mock.call_count == 0
|
|
||||||
assert stdout_write.call_count == 1
|
|
||||||
msg = stdout_write.call_args.args[0]
|
|
||||||
assert "Failed to fetch campaign 12345" in msg
|
|
||||||
assert "Campaign not found" in msg
|
|
||||||
|
|
||||||
def test_update_or_create_reward_updates_existing_reward(self) -> None:
|
|
||||||
"""Existing rewards should be updated when incoming reward data differs."""
|
|
||||||
campaign = self._create_campaign(500)
|
|
||||||
existing_reward = ChzzkReward.objects.create(
|
|
||||||
campaign=campaign,
|
|
||||||
reward_no=1,
|
|
||||||
title="Old title",
|
|
||||||
image_url="https://example.com/old.png",
|
|
||||||
reward_type="OLD",
|
|
||||||
campaign_reward_type="",
|
|
||||||
condition_type="TIME",
|
|
||||||
condition_for_minutes=10,
|
|
||||||
ios_based_reward=False,
|
|
||||||
code_remaining_count=5,
|
|
||||||
)
|
|
||||||
|
|
||||||
reward_data = ChzzkRewardV2.model_validate(
|
|
||||||
{
|
|
||||||
"title": "New title",
|
|
||||||
"rewardNo": 1,
|
|
||||||
"imageUrl": "https://example.com/new.png",
|
|
||||||
"rewardType": "NEW",
|
|
||||||
"conditionType": "WATCH",
|
|
||||||
"conditionForMinutes": 20,
|
|
||||||
"iosBasedReward": True,
|
|
||||||
"codeRemainingCount": 99,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
command = ImportChzzkCampaignCommand()
|
|
||||||
command.update_or_create_reward(500, campaign, reward_data)
|
|
||||||
|
|
||||||
existing_reward.refresh_from_db()
|
|
||||||
assert existing_reward.title == "New title"
|
|
||||||
assert existing_reward.image_url == "https://example.com/new.png"
|
|
||||||
assert existing_reward.reward_type == "NEW"
|
|
||||||
assert not existing_reward.campaign_reward_type
|
|
||||||
assert existing_reward.condition_type == "WATCH"
|
|
||||||
assert existing_reward.condition_for_minutes == 20
|
|
||||||
assert existing_reward.ios_based_reward is True
|
|
||||||
assert existing_reward.code_remaining_count == 99
|
|
||||||
|
|
||||||
def test_import_campaign_data_updates_existing_campaign(self) -> None:
|
|
||||||
"""Existing campaigns should be updated when imported fields have changed."""
|
|
||||||
campaign = self._create_campaign(600)
|
|
||||||
original_scraped_at = campaign.scraped_at
|
|
||||||
|
|
||||||
command = ImportChzzkCampaignCommand()
|
|
||||||
campaign_data = ChzzkCampaignV2.model_validate(
|
|
||||||
{
|
|
||||||
"campaignNo": 600,
|
|
||||||
"title": "Updated title",
|
|
||||||
"imageUrl": "https://example.com/new-campaign.png",
|
|
||||||
"description": "Updated description",
|
|
||||||
"categoryType": "game",
|
|
||||||
"categoryId": "2",
|
|
||||||
"categoryValue": "Game 2",
|
|
||||||
"pcLinkUrl": "https://example.com/pc",
|
|
||||||
"mobileLinkUrl": "https://example.com/mobile",
|
|
||||||
"serviceId": "chzzk",
|
|
||||||
"state": "ACTIVE",
|
|
||||||
"startDate": campaign.start_date.isoformat(),
|
|
||||||
"endDate": campaign.end_date.isoformat(),
|
|
||||||
"rewardList": [],
|
|
||||||
"hasIosBasedReward": True,
|
|
||||||
"dropsCampaignNotStarted": False,
|
|
||||||
"rewardType": "DROP",
|
|
||||||
"accountLinkUrl": "https://example.com/account",
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
updated_campaign = command.import_campaign_data(
|
|
||||||
campaign_no=600,
|
|
||||||
api_version="v2",
|
|
||||||
data={"key": "value"},
|
|
||||||
cd=campaign_data,
|
|
||||||
)
|
|
||||||
|
|
||||||
updated_campaign.refresh_from_db()
|
|
||||||
assert updated_campaign.title == "Updated title"
|
|
||||||
assert updated_campaign.description == "Updated description"
|
|
||||||
assert updated_campaign.category_id == "2"
|
|
||||||
assert updated_campaign.has_ios_based_reward is True
|
|
||||||
assert not updated_campaign.campaign_reward_type
|
|
||||||
assert updated_campaign.reward_type == "DROP"
|
|
||||||
assert updated_campaign.account_link_url == "https://example.com/account"
|
|
||||||
assert updated_campaign.raw_json_v2 == {"key": "value"}
|
|
||||||
assert updated_campaign.scrape_status == "success"
|
|
||||||
assert updated_campaign.scraped_at >= original_scraped_at
|
|
||||||
|
|
||||||
def test_apply_updates_if_changed_returns_false_on_noop(self) -> None:
|
|
||||||
"""Helper should return False when values are unchanged."""
|
|
||||||
campaign = self._create_campaign(700)
|
|
||||||
command = ImportChzzkCampaignCommand()
|
|
||||||
|
|
||||||
changed = command._apply_updates_if_changed(
|
|
||||||
campaign,
|
|
||||||
{
|
|
||||||
"title": campaign.title,
|
|
||||||
"description": campaign.description,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
assert changed is False
|
|
||||||
|
|
@ -1,58 +0,0 @@
|
||||||
from unittest.mock import patch
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from chzzk.tasks import discover_chzzk_campaigns
|
|
||||||
from chzzk.tasks import import_chzzk_campaign_task
|
|
||||||
|
|
||||||
|
|
||||||
def test_import_chzzk_campaign_task_calls_command_with_campaign_no() -> None:
|
|
||||||
"""It should invoke import_chzzk_campaign for the given campaign number."""
|
|
||||||
with patch("chzzk.tasks.call_command") as call_command_mock:
|
|
||||||
import_chzzk_campaign_task.run(905)
|
|
||||||
|
|
||||||
call_command_mock.assert_called_once_with("import_chzzk_campaign", "905")
|
|
||||||
|
|
||||||
|
|
||||||
def test_import_chzzk_campaign_task_retries_on_command_error() -> None:
|
|
||||||
"""It should retry when import_chzzk_campaign raises an exception."""
|
|
||||||
error = RuntimeError("boom")
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("chzzk.tasks.call_command", side_effect=error),
|
|
||||||
patch.object(
|
|
||||||
import_chzzk_campaign_task,
|
|
||||||
"retry",
|
|
||||||
side_effect=RuntimeError("retried"),
|
|
||||||
) as retry_mock,
|
|
||||||
pytest.raises(RuntimeError, match="retried"),
|
|
||||||
):
|
|
||||||
import_chzzk_campaign_task.run(905)
|
|
||||||
|
|
||||||
retry_mock.assert_called_once_with(exc=error)
|
|
||||||
|
|
||||||
|
|
||||||
def test_discover_chzzk_campaigns_calls_command_with_latest_flag() -> None:
|
|
||||||
"""It should invoke import_chzzk_campaign with latest=True."""
|
|
||||||
with patch("chzzk.tasks.call_command") as call_command_mock:
|
|
||||||
discover_chzzk_campaigns.run()
|
|
||||||
|
|
||||||
call_command_mock.assert_called_once_with("import_chzzk_campaign", latest=True)
|
|
||||||
|
|
||||||
|
|
||||||
def test_discover_chzzk_campaigns_retries_on_command_error() -> None:
|
|
||||||
"""It should retry when import_chzzk_campaign raises an exception."""
|
|
||||||
error = RuntimeError("boom")
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("chzzk.tasks.call_command", side_effect=error),
|
|
||||||
patch.object(
|
|
||||||
discover_chzzk_campaigns,
|
|
||||||
"retry",
|
|
||||||
side_effect=RuntimeError("retried"),
|
|
||||||
) as retry_mock,
|
|
||||||
pytest.raises(RuntimeError, match="retried"),
|
|
||||||
):
|
|
||||||
discover_chzzk_campaigns.run()
|
|
||||||
|
|
||||||
retry_mock.assert_called_once_with(exc=error)
|
|
||||||
|
|
@ -5,7 +5,6 @@ from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
import sentry_sdk
|
import sentry_sdk
|
||||||
from celery.schedules import crontab
|
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from platformdirs import user_data_dir
|
from platformdirs import user_data_dir
|
||||||
|
|
||||||
|
|
@ -110,8 +109,6 @@ STATICFILES_DIRS: list[Path] = [BASE_DIR / "static"]
|
||||||
TIME_ZONE = "UTC"
|
TIME_ZONE = "UTC"
|
||||||
WSGI_APPLICATION = "config.wsgi.application"
|
WSGI_APPLICATION = "config.wsgi.application"
|
||||||
|
|
||||||
TTVDROPS_PENDING_DIR: str = os.getenv("TTVDROPS_PENDING_DIR", "")
|
|
||||||
|
|
||||||
INTERNAL_IPS: list[str] = []
|
INTERNAL_IPS: list[str] = []
|
||||||
if DEBUG:
|
if DEBUG:
|
||||||
INTERNAL_IPS = ["127.0.0.1", "localhost"] # pyright: ignore[reportConstantRedefinition]
|
INTERNAL_IPS = ["127.0.0.1", "localhost"] # pyright: ignore[reportConstantRedefinition]
|
||||||
|
|
@ -262,63 +259,6 @@ CELERY_BROKER_URL: str = REDIS_URL_CELERY
|
||||||
CELERY_RESULT_BACKEND = "django-db"
|
CELERY_RESULT_BACKEND = "django-db"
|
||||||
CELERY_RESULT_EXTENDED = True
|
CELERY_RESULT_EXTENDED = True
|
||||||
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
|
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
|
||||||
CELERY_TASK_SOFT_TIME_LIMIT: int = 3600 # warn at 1 h
|
|
||||||
CELERY_TASK_TIME_LIMIT: int = 3900 # hard-kill at 1 h 5 min
|
|
||||||
|
|
||||||
CELERY_TASK_ROUTES: dict[str, dict[str, str]] = {
|
|
||||||
"twitch.tasks.scan_pending_twitch_files": {"queue": "imports"},
|
|
||||||
"twitch.tasks.import_twitch_file": {"queue": "imports"},
|
|
||||||
"twitch.tasks.download_game_image": {"queue": "image-downloads"},
|
|
||||||
"twitch.tasks.download_campaign_image": {"queue": "image-downloads"},
|
|
||||||
"twitch.tasks.download_benefit_image": {"queue": "image-downloads"},
|
|
||||||
"twitch.tasks.download_reward_campaign_image": {"queue": "image-downloads"},
|
|
||||||
"twitch.tasks.download_all_images": {"queue": "image-downloads"},
|
|
||||||
"twitch.tasks.import_chat_badges": {"queue": "api-fetches"},
|
|
||||||
"twitch.tasks.backup_database": {"queue": "default"},
|
|
||||||
"kick.tasks.import_kick_drops": {"queue": "api-fetches"},
|
|
||||||
"chzzk.tasks.discover_chzzk_campaigns": {"queue": "api-fetches"},
|
|
||||||
"chzzk.tasks.import_chzzk_campaign_task": {"queue": "imports"},
|
|
||||||
"core.tasks.submit_indexnow_task": {"queue": "default"},
|
|
||||||
}
|
|
||||||
|
|
||||||
CELERY_BEAT_SCHEDULE: dict[str, Any] = {
|
|
||||||
# Scan for new Twitch JSON drops every 10 seconds.
|
|
||||||
"scan-pending-twitch-files": {
|
|
||||||
"task": "twitch.tasks.scan_pending_twitch_files",
|
|
||||||
"schedule": 10.0,
|
|
||||||
"options": {"queue": "imports"},
|
|
||||||
},
|
|
||||||
# Import Kick drops from the API (:01, :16, :31, :46 each hour).
|
|
||||||
"import-kick-drops": {
|
|
||||||
"task": "kick.tasks.import_kick_drops",
|
|
||||||
"schedule": crontab(minute="1,16,31,46"),
|
|
||||||
"options": {"queue": "api-fetches"},
|
|
||||||
},
|
|
||||||
# Backup database nightly at 02:15.
|
|
||||||
"backup-database": {
|
|
||||||
"task": "twitch.tasks.backup_database",
|
|
||||||
"schedule": crontab(hour=2, minute=15),
|
|
||||||
"options": {"queue": "default"},
|
|
||||||
},
|
|
||||||
# Discover new Chzzk campaigns every 2 hours at minute 0.
|
|
||||||
"discover-chzzk-campaigns": {
|
|
||||||
"task": "chzzk.tasks.discover_chzzk_campaigns",
|
|
||||||
"schedule": crontab(minute=0, hour="*/2"),
|
|
||||||
"options": {"queue": "api-fetches"},
|
|
||||||
},
|
|
||||||
# Weekly full image refresh (Sunday 04:00 UTC).
|
|
||||||
"download-all-images-weekly": {
|
|
||||||
"task": "twitch.tasks.download_all_images",
|
|
||||||
"schedule": crontab(hour=4, minute=0, day_of_week=0),
|
|
||||||
"options": {"queue": "image-downloads"},
|
|
||||||
},
|
|
||||||
# Weekly chat badge refresh (Sunday 03:00 UTC).
|
|
||||||
"import-chat-badges-weekly": {
|
|
||||||
"task": "twitch.tasks.import_chat_badges",
|
|
||||||
"schedule": crontab(hour=3, minute=0, day_of_week=0),
|
|
||||||
"options": {"queue": "api-fetches"},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
# Define BASE_URL for dynamic URL generation
|
# Define BASE_URL for dynamic URL generation
|
||||||
BASE_URL: str = "https://ttvdrops.lovinator.space"
|
BASE_URL: str = "https://ttvdrops.lovinator.space"
|
||||||
|
|
|
||||||
|
|
@ -1,17 +0,0 @@
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import logging
|
|
||||||
|
|
||||||
from celery import shared_task
|
|
||||||
from django.core.management import call_command
|
|
||||||
|
|
||||||
logger = logging.getLogger("ttvdrops.tasks")
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task(bind=True, queue="default", max_retries=3, default_retry_delay=300)
|
|
||||||
def submit_indexnow_task(self) -> None: # noqa: ANN001
|
|
||||||
"""Submit all site URLs to the IndexNow search index."""
|
|
||||||
try:
|
|
||||||
call_command("submit_indexnow")
|
|
||||||
except Exception as exc:
|
|
||||||
raise self.retry(exc=exc) from exc
|
|
||||||
|
|
@ -1,17 +0,0 @@
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import logging
|
|
||||||
|
|
||||||
from celery import shared_task
|
|
||||||
from django.core.management import call_command
|
|
||||||
|
|
||||||
logger = logging.getLogger("ttvdrops.tasks")
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task(bind=True, queue="api-fetches", max_retries=3, default_retry_delay=120)
|
|
||||||
def import_kick_drops(self) -> None: # noqa: ANN001
|
|
||||||
"""Fetch and upsert Kick drop campaigns from the public API."""
|
|
||||||
try:
|
|
||||||
call_command("import_kick_drops")
|
|
||||||
except Exception as exc:
|
|
||||||
raise self.retry(exc=exc) from exc
|
|
||||||
|
|
@ -50,7 +50,7 @@ dev = [
|
||||||
[tool.pytest.ini_options]
|
[tool.pytest.ini_options]
|
||||||
DJANGO_SETTINGS_MODULE = "config.settings"
|
DJANGO_SETTINGS_MODULE = "config.settings"
|
||||||
python_files = ["test_*.py", "*_test.py"]
|
python_files = ["test_*.py", "*_test.py"]
|
||||||
addopts = "--tb=short -n auto --cov"
|
addopts = ""
|
||||||
filterwarnings = [
|
filterwarnings = [
|
||||||
"ignore:Parsing dates involving a day of month without a year specified is ambiguous:DeprecationWarning",
|
"ignore:Parsing dates involving a day of month without a year specified is ambiguous:DeprecationWarning",
|
||||||
]
|
]
|
||||||
|
|
|
||||||
10
tools/systemd/ttvdrops-backup.service
Normal file
10
tools/systemd/ttvdrops-backup.service
Normal file
|
|
@ -0,0 +1,10 @@
|
||||||
|
[Unit]
|
||||||
|
Description=TTVDrops database backup
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
Type=oneshot
|
||||||
|
User=ttvdrops
|
||||||
|
Group=ttvdrops
|
||||||
|
WorkingDirectory=/home/ttvdrops/ttvdrops
|
||||||
|
EnvironmentFile=/home/ttvdrops/ttvdrops/.env
|
||||||
|
ExecStart=/usr/bin/uv run python manage.py backup_db
|
||||||
9
tools/systemd/ttvdrops-backup.timer
Normal file
9
tools/systemd/ttvdrops-backup.timer
Normal file
|
|
@ -0,0 +1,9 @@
|
||||||
|
[Unit]
|
||||||
|
Description=Nightly TTVDrops database backup
|
||||||
|
|
||||||
|
[Timer]
|
||||||
|
OnCalendar=*-*-* 02:15:00
|
||||||
|
Persistent=true
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=timers.target
|
||||||
|
|
@ -1,27 +0,0 @@
|
||||||
[Unit]
|
|
||||||
Description=TTVDrops Celery Beat scheduler
|
|
||||||
After=network-online.target valkey.service ttvdrops-celery-worker.service
|
|
||||||
Wants=network-online.target valkey.service
|
|
||||||
|
|
||||||
[Service]
|
|
||||||
Type=simple
|
|
||||||
User=ttvdrops
|
|
||||||
Group=ttvdrops
|
|
||||||
SupplementaryGroups=http
|
|
||||||
UMask=0002
|
|
||||||
WorkingDirectory=/home/ttvdrops/ttvdrops
|
|
||||||
EnvironmentFile=/home/ttvdrops/ttvdrops/.env
|
|
||||||
ExecStart=/usr/bin/uv run celery -A config beat \
|
|
||||||
--scheduler django_celery_beat.schedulers:DatabaseScheduler \
|
|
||||||
--loglevel INFO
|
|
||||||
ExecStop=/bin/kill -s TERM $MAINPID
|
|
||||||
|
|
||||||
StandardOutput=journal
|
|
||||||
StandardError=journal
|
|
||||||
SyslogIdentifier=ttvdrops-celery-beat
|
|
||||||
|
|
||||||
Restart=on-failure
|
|
||||||
RestartSec=10s
|
|
||||||
|
|
||||||
[Install]
|
|
||||||
WantedBy=multi-user.target
|
|
||||||
|
|
@ -1,31 +0,0 @@
|
||||||
[Unit]
|
|
||||||
Description=TTVDrops Celery worker
|
|
||||||
After=network-online.target valkey.service
|
|
||||||
Wants=network-online.target valkey.service
|
|
||||||
|
|
||||||
[Service]
|
|
||||||
Type=simple
|
|
||||||
User=ttvdrops
|
|
||||||
Group=ttvdrops
|
|
||||||
SupplementaryGroups=http
|
|
||||||
UMask=0002
|
|
||||||
WorkingDirectory=/home/ttvdrops/ttvdrops
|
|
||||||
EnvironmentFile=/home/ttvdrops/ttvdrops/.env
|
|
||||||
ExecStart=/usr/bin/uv run celery -A config worker \
|
|
||||||
--queues imports,api-fetches,default \
|
|
||||||
--concurrency 4 \
|
|
||||||
--loglevel INFO
|
|
||||||
ExecStop=/bin/kill -s TERM $MAINPID
|
|
||||||
ExecReload=/bin/kill -s HUP $MAINPID
|
|
||||||
|
|
||||||
MemoryLimit=512M
|
|
||||||
CPUQuota=75%
|
|
||||||
StandardOutput=journal
|
|
||||||
StandardError=journal
|
|
||||||
SyslogIdentifier=ttvdrops-celery-worker
|
|
||||||
|
|
||||||
Restart=on-failure
|
|
||||||
RestartSec=10s
|
|
||||||
|
|
||||||
[Install]
|
|
||||||
WantedBy=multi-user.target
|
|
||||||
34
tools/systemd/ttvdrops-import-drops.service
Normal file
34
tools/systemd/ttvdrops-import-drops.service
Normal file
|
|
@ -0,0 +1,34 @@
|
||||||
|
[Unit]
|
||||||
|
Description=TTVDrops watch and import drops from pending directory
|
||||||
|
After=network-online.target
|
||||||
|
Wants=network-online.target
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
Type=simple
|
||||||
|
User=ttvdrops
|
||||||
|
Group=ttvdrops
|
||||||
|
SupplementaryGroups=http
|
||||||
|
UMask=0002
|
||||||
|
WorkingDirectory=/home/ttvdrops/ttvdrops
|
||||||
|
EnvironmentFile=/home/ttvdrops/ttvdrops/.env
|
||||||
|
ExecStart=/usr/bin/uv run python manage.py watch_imports /mnt/fourteen/Data/Responses/pending --verbose
|
||||||
|
|
||||||
|
# Restart policy
|
||||||
|
Restart=on-failure
|
||||||
|
RestartSec=5s
|
||||||
|
|
||||||
|
# Process management
|
||||||
|
KillMode=mixed
|
||||||
|
KillSignal=SIGTERM
|
||||||
|
|
||||||
|
# Resource limits
|
||||||
|
MemoryLimit=512M
|
||||||
|
CPUQuota=50%
|
||||||
|
|
||||||
|
# Logging
|
||||||
|
StandardOutput=journal
|
||||||
|
StandardError=journal
|
||||||
|
SyslogIdentifier=ttvdrops-watch
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
||||||
26
tools/systemd/ttvdrops-import-kick-drops.service
Normal file
26
tools/systemd/ttvdrops-import-kick-drops.service
Normal file
|
|
@ -0,0 +1,26 @@
|
||||||
|
[Unit]
|
||||||
|
Description=TTVDrops import Kick drops
|
||||||
|
After=network-online.target
|
||||||
|
Wants=network-online.target
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
Type=oneshot
|
||||||
|
User=ttvdrops
|
||||||
|
Group=ttvdrops
|
||||||
|
SupplementaryGroups=http
|
||||||
|
UMask=0002
|
||||||
|
WorkingDirectory=/home/ttvdrops/ttvdrops
|
||||||
|
EnvironmentFile=/home/ttvdrops/ttvdrops/.env
|
||||||
|
ExecStart=/usr/bin/uv run python manage.py import_kick_drops
|
||||||
|
|
||||||
|
# Logging
|
||||||
|
StandardOutput=journal
|
||||||
|
StandardError=journal
|
||||||
|
SyslogIdentifier=ttvdrops-import-kick
|
||||||
|
|
||||||
|
# Resource limits
|
||||||
|
MemoryLimit=512M
|
||||||
|
CPUQuota=50%
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
||||||
9
tools/systemd/ttvdrops-import-kick-drops.timer
Normal file
9
tools/systemd/ttvdrops-import-kick-drops.timer
Normal file
|
|
@ -0,0 +1,9 @@
|
||||||
|
[Unit]
|
||||||
|
Description=TTVDrops import Kick drops at :01, :16, :31, and :46
|
||||||
|
|
||||||
|
[Timer]
|
||||||
|
OnCalendar=*-*-* *:01,16,31,46:00
|
||||||
|
Persistent=true
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=timers.target
|
||||||
|
|
@ -36,21 +36,3 @@ class TwitchConfig(AppConfig):
|
||||||
FieldFile.open = _safe_open
|
FieldFile.open = _safe_open
|
||||||
except (AttributeError, TypeError) as exc:
|
except (AttributeError, TypeError) as exc:
|
||||||
logger.debug("Failed to patch FieldFile.open: %s", exc)
|
logger.debug("Failed to patch FieldFile.open: %s", exc)
|
||||||
|
|
||||||
# Register post_save signal handlers that dispatch image download tasks
|
|
||||||
# when new Twitch records are created.
|
|
||||||
from django.db.models.signals import post_save # noqa: PLC0415
|
|
||||||
|
|
||||||
from twitch.models import DropBenefit # noqa: PLC0415
|
|
||||||
from twitch.models import DropCampaign # noqa: PLC0415
|
|
||||||
from twitch.models import Game # noqa: PLC0415
|
|
||||||
from twitch.models import RewardCampaign # noqa: PLC0415
|
|
||||||
from twitch.signals import on_drop_benefit_saved # noqa: PLC0415
|
|
||||||
from twitch.signals import on_drop_campaign_saved # noqa: PLC0415
|
|
||||||
from twitch.signals import on_game_saved # noqa: PLC0415
|
|
||||||
from twitch.signals import on_reward_campaign_saved # noqa: PLC0415
|
|
||||||
|
|
||||||
post_save.connect(on_game_saved, sender=Game)
|
|
||||||
post_save.connect(on_drop_campaign_saved, sender=DropCampaign)
|
|
||||||
post_save.connect(on_drop_benefit_saved, sender=DropBenefit)
|
|
||||||
post_save.connect(on_reward_campaign_saved, sender=RewardCampaign)
|
|
||||||
|
|
|
||||||
|
|
@ -8,8 +8,7 @@ from twitch.models import Channel
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from argparse import ArgumentParser
|
from argparse import ArgumentParser
|
||||||
|
|
||||||
from django.db.models import QuerySet
|
from debug_toolbar.panels.templates.panel import QuerySet
|
||||||
|
|
||||||
|
|
||||||
SAMPLE_PREVIEW_COUNT = 10
|
SAMPLE_PREVIEW_COUNT = 10
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -193,8 +193,8 @@ class Command(BaseCommand):
|
||||||
img.mode == "P" and "transparency" in img.info
|
img.mode == "P" and "transparency" in img.info
|
||||||
):
|
):
|
||||||
# Create white background for transparency
|
# Create white background for transparency
|
||||||
background: Image.Image = Image.new("RGB", img.size, (255, 255, 255))
|
background = Image.new("RGB", img.size, (255, 255, 255))
|
||||||
rgba_img: Image.Image = img.convert("RGBA") if img.mode == "P" else img
|
rgba_img = img.convert("RGBA") if img.mode == "P" else img
|
||||||
background.paste(
|
background.paste(
|
||||||
rgba_img,
|
rgba_img,
|
||||||
mask=rgba_img.split()[-1] if rgba_img.mode in {"RGBA", "LA"} else None,
|
mask=rgba_img.split()[-1] if rgba_img.mode in {"RGBA", "LA"} else None,
|
||||||
|
|
|
||||||
|
|
@ -17,16 +17,9 @@ logger: logging.Logger = logging.getLogger("ttvdrops.watch_imports")
|
||||||
|
|
||||||
|
|
||||||
class Command(BaseCommand):
|
class Command(BaseCommand):
|
||||||
"""Watch for JSON files in a directory and import them automatically.
|
"""Watch for JSON files in a directory and import them automatically."""
|
||||||
|
|
||||||
.. deprecated::
|
help = "Watch a directory for JSON files and import them automatically"
|
||||||
This command is superseded by the Celery Beat task
|
|
||||||
``twitch.tasks.scan_pending_twitch_files`` (runs every 10 s via
|
|
||||||
``ttvdrops-celery-beat.service``). Keep this command for ad-hoc use
|
|
||||||
or in environments that run without a Celery worker.
|
|
||||||
"""
|
|
||||||
|
|
||||||
help = "Watch a directory for JSON files and import them automatically (superseded by Celery Beat)"
|
|
||||||
requires_migrations_checks = True
|
requires_migrations_checks = True
|
||||||
|
|
||||||
def add_arguments(self, parser: CommandParser) -> None:
|
def add_arguments(self, parser: CommandParser) -> None:
|
||||||
|
|
|
||||||
|
|
@ -1,65 +0,0 @@
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import logging
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
logger = logging.getLogger("ttvdrops.signals")
|
|
||||||
|
|
||||||
|
|
||||||
def _dispatch(task_fn: Any, pk: int) -> None: # noqa: ANN401
|
|
||||||
"""Dispatch a Celery task, logging rather than raising when the broker is unavailable."""
|
|
||||||
try:
|
|
||||||
task_fn.delay(pk)
|
|
||||||
except Exception: # noqa: BLE001
|
|
||||||
logger.debug(
|
|
||||||
"Could not dispatch %s(%d) — broker may be unavailable.",
|
|
||||||
task_fn.name,
|
|
||||||
pk,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def on_game_saved(sender: Any, instance: Any, created: bool, **kwargs: Any) -> None: # noqa: ANN401, FBT001
|
|
||||||
"""Dispatch a box-art download task when a new Game is created."""
|
|
||||||
if created:
|
|
||||||
from twitch.tasks import download_game_image # noqa: PLC0415
|
|
||||||
|
|
||||||
_dispatch(download_game_image, instance.pk)
|
|
||||||
|
|
||||||
|
|
||||||
def on_drop_campaign_saved(
|
|
||||||
sender: Any, # noqa: ANN401
|
|
||||||
instance: Any, # noqa: ANN401
|
|
||||||
created: bool, # noqa: FBT001
|
|
||||||
**kwargs: Any, # noqa: ANN401
|
|
||||||
) -> None:
|
|
||||||
"""Dispatch an image download task when a new DropCampaign is created."""
|
|
||||||
if created:
|
|
||||||
from twitch.tasks import download_campaign_image # noqa: PLC0415
|
|
||||||
|
|
||||||
_dispatch(download_campaign_image, instance.pk)
|
|
||||||
|
|
||||||
|
|
||||||
def on_drop_benefit_saved(
|
|
||||||
sender: Any, # noqa: ANN401
|
|
||||||
instance: Any, # noqa: ANN401
|
|
||||||
created: bool, # noqa: FBT001
|
|
||||||
**kwargs: Any, # noqa: ANN401
|
|
||||||
) -> None:
|
|
||||||
"""Dispatch an image download task when a new DropBenefit is created."""
|
|
||||||
if created:
|
|
||||||
from twitch.tasks import download_benefit_image # noqa: PLC0415
|
|
||||||
|
|
||||||
_dispatch(download_benefit_image, instance.pk)
|
|
||||||
|
|
||||||
|
|
||||||
def on_reward_campaign_saved(
|
|
||||||
sender: Any, # noqa: ANN401
|
|
||||||
instance: Any, # noqa: ANN401
|
|
||||||
created: bool, # noqa: FBT001
|
|
||||||
**kwargs: Any, # noqa: ANN401
|
|
||||||
) -> None:
|
|
||||||
"""Dispatch an image download task when a new RewardCampaign is created."""
|
|
||||||
if created:
|
|
||||||
from twitch.tasks import download_reward_campaign_image # noqa: PLC0415
|
|
||||||
|
|
||||||
_dispatch(download_reward_campaign_image, instance.pk)
|
|
||||||
257
twitch/tasks.py
257
twitch/tasks.py
|
|
@ -1,257 +0,0 @@
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import TYPE_CHECKING
|
|
||||||
from urllib.parse import urlparse
|
|
||||||
|
|
||||||
import httpx
|
|
||||||
from celery import shared_task
|
|
||||||
from django.core.files.base import ContentFile
|
|
||||||
from django.core.management import call_command
|
|
||||||
from PIL.Image import Image
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from urllib.parse import ParseResult
|
|
||||||
|
|
||||||
from django.db.models.fields.files import ImageFieldFile
|
|
||||||
from PIL.Image import Image
|
|
||||||
from PIL.ImageFile import ImageFile
|
|
||||||
|
|
||||||
logger: logging.Logger = logging.getLogger("ttvdrops.tasks")
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task(bind=True, queue="imports", max_retries=3, default_retry_delay=60)
|
|
||||||
def scan_pending_twitch_files(self) -> None: # noqa: ANN001
|
|
||||||
"""Scan TTVDROPS_PENDING_DIR for JSON files and dispatch an import task for each."""
|
|
||||||
pending_dir: str = os.getenv("TTVDROPS_PENDING_DIR", "")
|
|
||||||
if not pending_dir:
|
|
||||||
logger.debug("TTVDROPS_PENDING_DIR not configured; skipping scan.")
|
|
||||||
return
|
|
||||||
|
|
||||||
path = Path(pending_dir)
|
|
||||||
if not path.is_dir():
|
|
||||||
logger.warning("TTVDROPS_PENDING_DIR %r is not a directory.", pending_dir)
|
|
||||||
return
|
|
||||||
|
|
||||||
json_files: list[Path] = [
|
|
||||||
f for f in path.iterdir() if f.suffix == ".json" and f.is_file()
|
|
||||||
]
|
|
||||||
for json_file in json_files:
|
|
||||||
import_twitch_file.delay(str(json_file))
|
|
||||||
|
|
||||||
if json_files:
|
|
||||||
logger.info("Dispatched %d Twitch file import task(s).", len(json_files))
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task(bind=True, queue="imports", max_retries=3, default_retry_delay=60)
|
|
||||||
def import_twitch_file(self, file_path: str) -> None: # noqa: ANN001
|
|
||||||
"""Import a single Twitch JSON drop file via BetterImportDrops logic."""
|
|
||||||
from twitch.management.commands.better_import_drops import Command as Importer # noqa: I001, PLC0415
|
|
||||||
|
|
||||||
path = Path(file_path)
|
|
||||||
if not path.is_file():
|
|
||||||
# Already moved to imported/ or broken/ by a prior run.
|
|
||||||
logger.debug("File %s no longer exists; skipping.", file_path)
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
Importer().handle(path=path)
|
|
||||||
except Exception as exc:
|
|
||||||
logger.exception("Failed to import %s.", file_path)
|
|
||||||
raise self.retry(exc=exc) from exc
|
|
||||||
|
|
||||||
|
|
||||||
def _download_and_save(url: str, name: str, file_field: ImageFieldFile) -> bool:
|
|
||||||
"""Download url and save the content to file_field (Django ImageField).
|
|
||||||
|
|
||||||
Files that are already cached (non-empty .name) are skipped.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
True when the image was saved, False when skipped or on error.
|
|
||||||
"""
|
|
||||||
if not url or file_field is None:
|
|
||||||
return False
|
|
||||||
|
|
||||||
if getattr(file_field, "name", None):
|
|
||||||
return False # already cached
|
|
||||||
|
|
||||||
parsed: ParseResult = urlparse(url)
|
|
||||||
suffix: str = Path(parsed.path).suffix or ".jpg"
|
|
||||||
file_name: str = f"{name}{suffix}"
|
|
||||||
|
|
||||||
try:
|
|
||||||
with httpx.Client(timeout=20, follow_redirects=True) as client:
|
|
||||||
response = client.get(url)
|
|
||||||
response.raise_for_status()
|
|
||||||
except httpx.HTTPError:
|
|
||||||
logger.warning("HTTP error downloading image for %r.", name)
|
|
||||||
return False
|
|
||||||
|
|
||||||
file_field.save(file_name, ContentFile(response.content), save=True) # type: ignore[union-attr]
|
|
||||||
|
|
||||||
image_path: str | None = getattr(file_field, "path", None)
|
|
||||||
if image_path:
|
|
||||||
_convert_to_modern_formats(Path(image_path))
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
def _convert_to_modern_formats(source: Path) -> None:
|
|
||||||
"""Convert *source* image to WebP and AVIF alongside the original."""
|
|
||||||
if not source.exists() or source.suffix.lower() not in {".jpg", ".jpeg", ".png"}:
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
from PIL import Image # noqa: PLC0415
|
|
||||||
except ImportError:
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
with Image.open(source) as raw:
|
|
||||||
if raw.mode in {"RGBA", "LA"} or (
|
|
||||||
raw.mode == "P" and "transparency" in raw.info
|
|
||||||
):
|
|
||||||
background: Image = Image.new("RGB", raw.size, (255, 255, 255))
|
|
||||||
rgba: Image | ImageFile = (
|
|
||||||
raw.convert("RGBA") if raw.mode == "P" else raw
|
|
||||||
)
|
|
||||||
mask: Image | None = (
|
|
||||||
rgba.split()[-1] if rgba.mode in {"RGBA", "LA"} else None
|
|
||||||
)
|
|
||||||
background.paste(rgba, mask=mask)
|
|
||||||
img: Image = background
|
|
||||||
elif raw.mode != "RGB":
|
|
||||||
img = raw.convert("RGB")
|
|
||||||
else:
|
|
||||||
img: Image = raw.copy()
|
|
||||||
|
|
||||||
for fmt, ext in (("WEBP", ".webp"), ("AVIF", ".avif")):
|
|
||||||
out: Path = source.with_suffix(ext)
|
|
||||||
try:
|
|
||||||
img.save(out, fmt, quality=80)
|
|
||||||
except Exception: # noqa: BLE001
|
|
||||||
logger.debug("Could not convert %s to %s.", source, fmt)
|
|
||||||
except Exception: # noqa: BLE001
|
|
||||||
logger.debug("Format conversion failed for %s.", source)
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Per-model image tasks — triggered by post_save signals on new records
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task(bind=True, queue="image-downloads", max_retries=3, default_retry_delay=300)
|
|
||||||
def download_game_image(self, game_pk: int) -> None: # noqa: ANN001
|
|
||||||
"""Download and cache the box art image for a single Game."""
|
|
||||||
from twitch.models import Game # noqa: PLC0415
|
|
||||||
from twitch.utils import is_twitch_box_art_url # noqa: PLC0415
|
|
||||||
from twitch.utils import normalize_twitch_box_art_url # noqa: PLC0415
|
|
||||||
|
|
||||||
try:
|
|
||||||
game = Game.objects.get(pk=game_pk)
|
|
||||||
except Game.DoesNotExist:
|
|
||||||
return
|
|
||||||
|
|
||||||
if not game.box_art or not is_twitch_box_art_url(game.box_art):
|
|
||||||
return
|
|
||||||
|
|
||||||
url = normalize_twitch_box_art_url(game.box_art)
|
|
||||||
try:
|
|
||||||
_download_and_save(url, game.twitch_id, game.box_art_file)
|
|
||||||
except Exception as exc:
|
|
||||||
raise self.retry(exc=exc) from exc
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task(bind=True, queue="image-downloads", max_retries=3, default_retry_delay=300)
|
|
||||||
def download_campaign_image(self, campaign_pk: int) -> None: # noqa: ANN001
|
|
||||||
"""Download and cache the image for a single DropCampaign."""
|
|
||||||
from twitch.models import DropCampaign # noqa: PLC0415
|
|
||||||
|
|
||||||
try:
|
|
||||||
campaign = DropCampaign.objects.get(pk=campaign_pk)
|
|
||||||
except DropCampaign.DoesNotExist:
|
|
||||||
return
|
|
||||||
|
|
||||||
if not campaign.image_url:
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
_download_and_save(campaign.image_url, campaign.twitch_id, campaign.image_file)
|
|
||||||
except Exception as exc:
|
|
||||||
raise self.retry(exc=exc) from exc
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task(bind=True, queue="image-downloads", max_retries=3, default_retry_delay=300)
|
|
||||||
def download_benefit_image(self, benefit_pk: int) -> None: # noqa: ANN001
|
|
||||||
"""Download and cache the image for a single DropBenefit."""
|
|
||||||
from twitch.models import DropBenefit # noqa: PLC0415
|
|
||||||
|
|
||||||
try:
|
|
||||||
benefit = DropBenefit.objects.get(pk=benefit_pk)
|
|
||||||
except DropBenefit.DoesNotExist:
|
|
||||||
return
|
|
||||||
|
|
||||||
if not benefit.image_asset_url:
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
_download_and_save(
|
|
||||||
url=benefit.image_asset_url,
|
|
||||||
name=benefit.twitch_id,
|
|
||||||
file_field=benefit.image_file,
|
|
||||||
)
|
|
||||||
except Exception as exc:
|
|
||||||
raise self.retry(exc=exc) from exc
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task(bind=True, queue="image-downloads", max_retries=3, default_retry_delay=300)
|
|
||||||
def download_reward_campaign_image(self, reward_pk: int) -> None: # noqa: ANN001
|
|
||||||
"""Download and cache the image for a single RewardCampaign."""
|
|
||||||
from twitch.models import RewardCampaign # noqa: PLC0415
|
|
||||||
|
|
||||||
try:
|
|
||||||
reward = RewardCampaign.objects.get(pk=reward_pk)
|
|
||||||
except RewardCampaign.DoesNotExist:
|
|
||||||
return
|
|
||||||
|
|
||||||
if not reward.image_url:
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
_download_and_save(reward.image_url, reward.twitch_id, reward.image_file)
|
|
||||||
except Exception as exc:
|
|
||||||
raise self.retry(exc=exc) from exc
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task(queue="image-downloads")
|
|
||||||
def download_all_images() -> None:
|
|
||||||
"""Weekly full-refresh: download images for all Twitch models."""
|
|
||||||
call_command("download_box_art")
|
|
||||||
call_command("download_campaign_images", model="all")
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Twitch API tasks
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task(bind=True, queue="api-fetches", max_retries=3, default_retry_delay=120)
|
|
||||||
def import_chat_badges(self) -> None: # noqa: ANN001
|
|
||||||
"""Fetch and upsert Twitch global chat badges via the Helix API."""
|
|
||||||
try:
|
|
||||||
call_command("import_chat_badges")
|
|
||||||
except Exception as exc:
|
|
||||||
raise self.retry(exc=exc) from exc
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Maintenance
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task(queue="default")
|
|
||||||
def backup_database() -> None:
|
|
||||||
"""Create a zstd-compressed SQL backup of the dataset tables."""
|
|
||||||
call_command("backup_db")
|
|
||||||
|
|
@ -1,22 +1,13 @@
|
||||||
import json
|
import json
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from tempfile import TemporaryDirectory
|
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
from unittest import skipIf
|
from unittest import skipIf
|
||||||
from unittest.mock import patch
|
|
||||||
|
|
||||||
from django.db import connection
|
from django.db import connection
|
||||||
from django.test import TestCase
|
from django.test import TestCase
|
||||||
|
|
||||||
from twitch.management.commands.better_import_drops import Command
|
from twitch.management.commands.better_import_drops import Command
|
||||||
from twitch.management.commands.better_import_drops import detect_error_only_response
|
from twitch.management.commands.better_import_drops import detect_error_only_response
|
||||||
from twitch.management.commands.better_import_drops import detect_non_campaign_keyword
|
|
||||||
from twitch.management.commands.better_import_drops import (
|
|
||||||
extract_operation_name_from_parsed,
|
|
||||||
)
|
|
||||||
from twitch.management.commands.better_import_drops import move_completed_file
|
|
||||||
from twitch.management.commands.better_import_drops import move_file_to_broken_subdir
|
|
||||||
from twitch.management.commands.better_import_drops import repair_partially_broken_json
|
|
||||||
from twitch.models import DropBenefit
|
from twitch.models import DropBenefit
|
||||||
from twitch.models import DropCampaign
|
from twitch.models import DropCampaign
|
||||||
from twitch.models import Game
|
from twitch.models import Game
|
||||||
|
|
@ -435,92 +426,6 @@ class CampaignStructureDetectionTests(TestCase):
|
||||||
structure: str | None = command._detect_campaign_structure(response)
|
structure: str | None = command._detect_campaign_structure(response)
|
||||||
assert structure == "current_user_drop_campaigns"
|
assert structure == "current_user_drop_campaigns"
|
||||||
|
|
||||||
def test_detects_user_drop_campaign_structure(self) -> None:
|
|
||||||
"""Ensure user.dropCampaign structure is correctly detected."""
|
|
||||||
command = Command()
|
|
||||||
|
|
||||||
response: dict[str, object] = {
|
|
||||||
"data": {
|
|
||||||
"user": {
|
|
||||||
"id": "123",
|
|
||||||
"dropCampaign": {"id": "c1", "name": "Test Campaign"},
|
|
||||||
"__typename": "User",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
structure: str | None = command._detect_campaign_structure(response)
|
|
||||||
assert structure == "user_drop_campaign"
|
|
||||||
|
|
||||||
def test_detects_channel_viewer_campaigns_structure(self) -> None:
|
|
||||||
"""Ensure channel.viewerDropCampaigns structure is correctly detected."""
|
|
||||||
command = Command()
|
|
||||||
|
|
||||||
response: dict[str, object] = {
|
|
||||||
"data": {
|
|
||||||
"channel": {
|
|
||||||
"id": "123",
|
|
||||||
"viewerDropCampaigns": [{"id": "c1", "name": "Test Campaign"}],
|
|
||||||
"__typename": "Channel",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
structure: str | None = command._detect_campaign_structure(response)
|
|
||||||
assert structure == "channel_viewer_campaigns"
|
|
||||||
|
|
||||||
|
|
||||||
class FileMoveUtilityTests(TestCase):
|
|
||||||
"""Tests for imported/broken file move utility helpers."""
|
|
||||||
|
|
||||||
def test_move_completed_file_sanitizes_operation_directory_name(self) -> None:
|
|
||||||
"""Ensure operation names are sanitized and campaign structure subdir is respected."""
|
|
||||||
with TemporaryDirectory() as tmp_dir:
|
|
||||||
root_path = Path(tmp_dir)
|
|
||||||
imported_root = root_path / "imported"
|
|
||||||
source_file = root_path / "payload.json"
|
|
||||||
source_file.write_text("{}", encoding="utf-8")
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"twitch.management.commands.better_import_drops.get_imported_directory_root",
|
|
||||||
return_value=imported_root,
|
|
||||||
):
|
|
||||||
target_dir = move_completed_file(
|
|
||||||
file_path=source_file,
|
|
||||||
operation_name="My Op/Name\\v1",
|
|
||||||
campaign_structure="inventory_campaigns",
|
|
||||||
)
|
|
||||||
|
|
||||||
expected_dir = imported_root / "My_Op_Name_v1" / "inventory_campaigns"
|
|
||||||
assert target_dir == expected_dir
|
|
||||||
assert not source_file.exists()
|
|
||||||
assert (expected_dir / "payload.json").exists()
|
|
||||||
|
|
||||||
def test_move_file_to_broken_subdir_avoids_duplicate_operation_segment(
|
|
||||||
self,
|
|
||||||
) -> None:
|
|
||||||
"""Ensure matching reason and operation names do not create duplicate directories."""
|
|
||||||
with TemporaryDirectory() as tmp_dir:
|
|
||||||
root_path = Path(tmp_dir)
|
|
||||||
broken_root = root_path / "broken"
|
|
||||||
source_file = root_path / "broken_payload.json"
|
|
||||||
source_file.write_text("{}", encoding="utf-8")
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"twitch.management.commands.better_import_drops.get_broken_directory_root",
|
|
||||||
return_value=broken_root,
|
|
||||||
):
|
|
||||||
broken_dir = move_file_to_broken_subdir(
|
|
||||||
file_path=source_file,
|
|
||||||
subdir="validation_failed",
|
|
||||||
operation_name="validation_failed",
|
|
||||||
)
|
|
||||||
|
|
||||||
path_segments = broken_dir.as_posix().split("/")
|
|
||||||
assert path_segments.count("validation_failed") == 1
|
|
||||||
assert not source_file.exists()
|
|
||||||
assert (broken_dir / "broken_payload.json").exists()
|
|
||||||
|
|
||||||
|
|
||||||
class OperationNameFilteringTests(TestCase):
|
class OperationNameFilteringTests(TestCase):
|
||||||
"""Tests for filtering campaigns by operation_name field."""
|
"""Tests for filtering campaigns by operation_name field."""
|
||||||
|
|
@ -959,179 +864,3 @@ class ErrorOnlyResponseDetectionTests(TestCase):
|
||||||
|
|
||||||
result = detect_error_only_response(parsed_json)
|
result = detect_error_only_response(parsed_json)
|
||||||
assert result == "error_only: unknown error"
|
assert result == "error_only: unknown error"
|
||||||
|
|
||||||
|
|
||||||
class NonCampaignKeywordDetectionTests(TestCase):
|
|
||||||
"""Tests for non-campaign operation keyword detection."""
|
|
||||||
|
|
||||||
def test_detects_known_non_campaign_operation(self) -> None:
|
|
||||||
"""Ensure known operationName values are detected as non-campaign payloads."""
|
|
||||||
raw_text = json.dumps({"extensions": {"operationName": "PlaybackAccessToken"}})
|
|
||||||
|
|
||||||
result = detect_non_campaign_keyword(raw_text)
|
|
||||||
assert result == "PlaybackAccessToken"
|
|
||||||
|
|
||||||
def test_returns_none_for_unknown_operation(self) -> None:
|
|
||||||
"""Ensure unrelated operation names are not flagged."""
|
|
||||||
raw_text = json.dumps({"extensions": {"operationName": "DropCampaignDetails"}})
|
|
||||||
|
|
||||||
result = detect_non_campaign_keyword(raw_text)
|
|
||||||
assert result is None
|
|
||||||
|
|
||||||
|
|
||||||
class OperationNameExtractionTests(TestCase):
|
|
||||||
"""Tests for operation name extraction across supported payload shapes."""
|
|
||||||
|
|
||||||
def test_extracts_operation_name_from_json_repair_tuple(self) -> None:
|
|
||||||
"""Ensure extraction supports tuple payloads returned by json_repair."""
|
|
||||||
payload = (
|
|
||||||
{"extensions": {"operationName": "ViewerDropsDashboard"}},
|
|
||||||
[{"json_repair": "log"}],
|
|
||||||
)
|
|
||||||
|
|
||||||
result = extract_operation_name_from_parsed(payload)
|
|
||||||
assert result == "ViewerDropsDashboard"
|
|
||||||
|
|
||||||
def test_extracts_operation_name_from_list_payload(self) -> None:
|
|
||||||
"""Ensure extraction inspects the first response in list payloads."""
|
|
||||||
payload = [
|
|
||||||
{"extensions": {"operationName": "Inventory"}},
|
|
||||||
{"extensions": {"operationName": "IgnoredSecondItem"}},
|
|
||||||
]
|
|
||||||
|
|
||||||
result = extract_operation_name_from_parsed(payload)
|
|
||||||
assert result == "Inventory"
|
|
||||||
|
|
||||||
def test_returns_none_for_empty_list_payload(self) -> None:
|
|
||||||
"""Ensure extraction returns None for empty list payloads."""
|
|
||||||
result = extract_operation_name_from_parsed([])
|
|
||||||
assert result is None
|
|
||||||
|
|
||||||
|
|
||||||
class JsonRepairTests(TestCase):
|
|
||||||
"""Tests for partial JSON repair fallback behavior."""
|
|
||||||
|
|
||||||
def test_repair_filters_non_graphql_items_from_list(self) -> None:
|
|
||||||
"""Ensure repaired list output only keeps GraphQL-like response objects."""
|
|
||||||
raw_text = '[{"foo": 1}, {"data": {"currentUser": {"id": "1"}}}]'
|
|
||||||
|
|
||||||
repaired = repair_partially_broken_json(raw_text)
|
|
||||||
parsed = json.loads(repaired)
|
|
||||||
|
|
||||||
assert parsed == [{"data": {"currentUser": {"id": "1"}}}]
|
|
||||||
|
|
||||||
|
|
||||||
class ProcessFileWorkerTests(TestCase):
|
|
||||||
"""Tests for process_file_worker early-return behaviors."""
|
|
||||||
|
|
||||||
def test_returns_reason_when_drop_campaign_key_missing(self) -> None:
|
|
||||||
"""Ensure files without dropCampaign are marked failed with clear reason."""
|
|
||||||
command = Command()
|
|
||||||
|
|
||||||
repo_root: Path = Path(__file__).resolve().parents[2]
|
|
||||||
temp_path: Path = repo_root / "twitch" / "tests" / "tmp_no_drop_campaign.json"
|
|
||||||
temp_path.write_text(
|
|
||||||
json.dumps({"data": {"currentUser": {"id": "123"}}}),
|
|
||||||
encoding="utf-8",
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
result = command.process_file_worker(
|
|
||||||
file_path=temp_path,
|
|
||||||
options={"crash_on_error": False, "skip_broken_moves": True},
|
|
||||||
)
|
|
||||||
finally:
|
|
||||||
if temp_path.exists():
|
|
||||||
temp_path.unlink()
|
|
||||||
|
|
||||||
assert result["success"] is False
|
|
||||||
assert result["broken_dir"] == "(skipped)"
|
|
||||||
assert result["reason"] == "no dropCampaign present"
|
|
||||||
|
|
||||||
def test_returns_reason_for_error_only_response(self) -> None:
|
|
||||||
"""Ensure error-only responses are marked failed with extracted reason."""
|
|
||||||
command = Command()
|
|
||||||
|
|
||||||
repo_root: Path = Path(__file__).resolve().parents[2]
|
|
||||||
temp_path: Path = repo_root / "twitch" / "tests" / "tmp_error_only.json"
|
|
||||||
temp_path.write_text(
|
|
||||||
json.dumps(
|
|
||||||
{
|
|
||||||
"errors": [{"message": "service timeout"}],
|
|
||||||
"data": None,
|
|
||||||
},
|
|
||||||
),
|
|
||||||
encoding="utf-8",
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
result = command.process_file_worker(
|
|
||||||
file_path=temp_path,
|
|
||||||
options={"crash_on_error": False, "skip_broken_moves": True},
|
|
||||||
)
|
|
||||||
finally:
|
|
||||||
if temp_path.exists():
|
|
||||||
temp_path.unlink()
|
|
||||||
|
|
||||||
assert result["success"] is False
|
|
||||||
assert result["broken_dir"] == "(skipped)"
|
|
||||||
assert result["reason"] == "error_only: service timeout"
|
|
||||||
|
|
||||||
def test_returns_reason_for_known_non_campaign_keyword(self) -> None:
|
|
||||||
"""Ensure known non-campaign operation payloads are rejected with reason."""
|
|
||||||
command = Command()
|
|
||||||
|
|
||||||
repo_root: Path = Path(__file__).resolve().parents[2]
|
|
||||||
temp_path: Path = (
|
|
||||||
repo_root / "twitch" / "tests" / "tmp_non_campaign_keyword.json"
|
|
||||||
)
|
|
||||||
temp_path.write_text(
|
|
||||||
json.dumps(
|
|
||||||
{
|
|
||||||
"data": {"currentUser": {"id": "123"}},
|
|
||||||
"extensions": {"operationName": "PlaybackAccessToken"},
|
|
||||||
},
|
|
||||||
),
|
|
||||||
encoding="utf-8",
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
result = command.process_file_worker(
|
|
||||||
file_path=temp_path,
|
|
||||||
options={"crash_on_error": False, "skip_broken_moves": True},
|
|
||||||
)
|
|
||||||
finally:
|
|
||||||
if temp_path.exists():
|
|
||||||
temp_path.unlink()
|
|
||||||
|
|
||||||
assert result["success"] is False
|
|
||||||
assert result["broken_dir"] == "(skipped)"
|
|
||||||
assert result["reason"] == "matched 'PlaybackAccessToken'"
|
|
||||||
|
|
||||||
|
|
||||||
class NormalizeResponsesTests(TestCase):
|
|
||||||
"""Tests for response normalization across supported payload formats."""
|
|
||||||
|
|
||||||
def test_normalizes_batched_responses_wrapper(self) -> None:
|
|
||||||
"""Ensure batched payloads under responses key are unwrapped and filtered."""
|
|
||||||
command = Command()
|
|
||||||
|
|
||||||
parsed_json: dict[str, object] = {
|
|
||||||
"responses": [
|
|
||||||
{"data": {"currentUser": {"id": "1"}}},
|
|
||||||
"invalid-item",
|
|
||||||
{"extensions": {"operationName": "Inventory"}},
|
|
||||||
],
|
|
||||||
}
|
|
||||||
|
|
||||||
normalized = command._normalize_responses(parsed_json)
|
|
||||||
assert len(normalized) == 2
|
|
||||||
assert normalized[0]["data"]["currentUser"]["id"] == "1"
|
|
||||||
assert normalized[1]["extensions"]["operationName"] == "Inventory"
|
|
||||||
|
|
||||||
def test_returns_empty_list_for_empty_tuple_payload(self) -> None:
|
|
||||||
"""Ensure empty tuple payloads from json_repair produce no responses."""
|
|
||||||
command = Command()
|
|
||||||
|
|
||||||
normalized = command._normalize_responses(()) # type: ignore[arg-type]
|
|
||||||
assert normalized == []
|
|
||||||
|
|
|
||||||
|
|
@ -1,150 +0,0 @@
|
||||||
from io import StringIO
|
|
||||||
from unittest.mock import patch
|
|
||||||
|
|
||||||
import httpx
|
|
||||||
import pytest
|
|
||||||
from django.core.management import CommandError
|
|
||||||
from django.core.management import call_command
|
|
||||||
|
|
||||||
from twitch.models import ChatBadge
|
|
||||||
from twitch.models import ChatBadgeSet
|
|
||||||
from twitch.schemas import GlobalChatBadgesResponse
|
|
||||||
|
|
||||||
pytestmark: pytest.MarkDecorator = pytest.mark.django_db
|
|
||||||
|
|
||||||
|
|
||||||
def _build_response(title: str = "VIP") -> GlobalChatBadgesResponse:
|
|
||||||
"""Build a valid GlobalChatBadgesResponse payload for tests.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A validated Twitch global chat badges response object.
|
|
||||||
"""
|
|
||||||
return GlobalChatBadgesResponse.model_validate({
|
|
||||||
"data": [
|
|
||||||
{
|
|
||||||
"set_id": "vip",
|
|
||||||
"versions": [
|
|
||||||
{
|
|
||||||
"id": "1",
|
|
||||||
"image_url_1x": "https://example.com/vip-1x.png",
|
|
||||||
"image_url_2x": "https://example.com/vip-2x.png",
|
|
||||||
"image_url_4x": "https://example.com/vip-4x.png",
|
|
||||||
"title": title,
|
|
||||||
"description": "VIP Badge",
|
|
||||||
"click_action": "visit_url",
|
|
||||||
"click_url": "https://help.twitch.tv",
|
|
||||||
},
|
|
||||||
],
|
|
||||||
},
|
|
||||||
],
|
|
||||||
})
|
|
||||||
|
|
||||||
|
|
||||||
def test_raises_when_client_id_missing(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
||||||
"""Command should fail when client ID is not provided."""
|
|
||||||
monkeypatch.delenv("TWITCH_CLIENT_ID", raising=False)
|
|
||||||
monkeypatch.delenv("TWITCH_CLIENT_SECRET", raising=False)
|
|
||||||
monkeypatch.delenv("TWITCH_ACCESS_TOKEN", raising=False)
|
|
||||||
|
|
||||||
with pytest.raises(CommandError, match="Twitch Client ID is required"):
|
|
||||||
call_command("import_chat_badges", stdout=StringIO())
|
|
||||||
|
|
||||||
|
|
||||||
def test_raises_when_token_and_secret_missing(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
||||||
"""Command should fail when no token and no client secret are available."""
|
|
||||||
monkeypatch.delenv("TWITCH_CLIENT_SECRET", raising=False)
|
|
||||||
monkeypatch.delenv("TWITCH_ACCESS_TOKEN", raising=False)
|
|
||||||
|
|
||||||
with pytest.raises(CommandError, match="Either --access-token or --client-secret"):
|
|
||||||
call_command(
|
|
||||||
"import_chat_badges",
|
|
||||||
"--client-id",
|
|
||||||
"client-id",
|
|
||||||
stdout=StringIO(),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
|
||||||
def test_import_creates_then_updates_existing_badge() -> None:
|
|
||||||
"""Running import twice should update existing badges rather than duplicate them."""
|
|
||||||
with patch(
|
|
||||||
"twitch.management.commands.import_chat_badges.Command._fetch_global_chat_badges",
|
|
||||||
side_effect=[_build_response("VIP"), _build_response("VIP Updated")],
|
|
||||||
) as fetch_mock:
|
|
||||||
call_command(
|
|
||||||
"import_chat_badges",
|
|
||||||
"--client-id",
|
|
||||||
"client-id",
|
|
||||||
"--access-token",
|
|
||||||
"access-token",
|
|
||||||
stdout=StringIO(),
|
|
||||||
)
|
|
||||||
call_command(
|
|
||||||
"import_chat_badges",
|
|
||||||
"--client-id",
|
|
||||||
"client-id",
|
|
||||||
"--access-token",
|
|
||||||
"access-token",
|
|
||||||
stdout=StringIO(),
|
|
||||||
)
|
|
||||||
|
|
||||||
assert fetch_mock.call_count == 2
|
|
||||||
assert ChatBadgeSet.objects.count() == 1
|
|
||||||
assert ChatBadge.objects.count() == 1
|
|
||||||
|
|
||||||
badge_set = ChatBadgeSet.objects.get(set_id="vip")
|
|
||||||
badge = ChatBadge.objects.get(badge_set=badge_set, badge_id="1")
|
|
||||||
assert badge.title == "VIP Updated"
|
|
||||||
assert badge.click_action == "visit_url"
|
|
||||||
assert badge.click_url == "https://help.twitch.tv"
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
|
||||||
def test_uses_client_credentials_when_access_token_missing() -> None:
|
|
||||||
"""Command should obtain token from Twitch when no access token is provided."""
|
|
||||||
with (
|
|
||||||
patch(
|
|
||||||
"twitch.management.commands.import_chat_badges.Command._get_app_access_token",
|
|
||||||
return_value="generated-token",
|
|
||||||
) as token_mock,
|
|
||||||
patch(
|
|
||||||
"twitch.management.commands.import_chat_badges.Command._fetch_global_chat_badges",
|
|
||||||
return_value=GlobalChatBadgesResponse.model_validate({"data": []}),
|
|
||||||
) as fetch_mock,
|
|
||||||
):
|
|
||||||
call_command(
|
|
||||||
"import_chat_badges",
|
|
||||||
"--client-id",
|
|
||||||
"client-id",
|
|
||||||
"--client-secret",
|
|
||||||
"client-secret",
|
|
||||||
stdout=StringIO(),
|
|
||||||
)
|
|
||||||
|
|
||||||
token_mock.assert_called_once_with("client-id", "client-secret")
|
|
||||||
fetch_mock.assert_called_once_with(
|
|
||||||
client_id="client-id",
|
|
||||||
access_token="generated-token",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def test_wraps_http_errors_from_badges_fetch() -> None:
|
|
||||||
"""Command should convert HTTP client errors to CommandError."""
|
|
||||||
with (
|
|
||||||
patch(
|
|
||||||
"twitch.management.commands.import_chat_badges.Command._fetch_global_chat_badges",
|
|
||||||
side_effect=httpx.HTTPError("boom"),
|
|
||||||
),
|
|
||||||
pytest.raises(
|
|
||||||
CommandError,
|
|
||||||
match="Failed to fetch chat badges from Twitch API",
|
|
||||||
),
|
|
||||||
):
|
|
||||||
call_command(
|
|
||||||
"import_chat_badges",
|
|
||||||
"--client-id",
|
|
||||||
"client-id",
|
|
||||||
"--access-token",
|
|
||||||
"access-token",
|
|
||||||
stdout=StringIO(),
|
|
||||||
)
|
|
||||||
|
|
@ -1,575 +0,0 @@
|
||||||
from pathlib import Path
|
|
||||||
from types import SimpleNamespace
|
|
||||||
from unittest.mock import MagicMock
|
|
||||||
from unittest.mock import call
|
|
||||||
from unittest.mock import patch
|
|
||||||
|
|
||||||
import httpx
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from twitch.models import DropBenefit
|
|
||||||
from twitch.models import DropCampaign
|
|
||||||
from twitch.models import RewardCampaign
|
|
||||||
from twitch.tasks import _convert_to_modern_formats
|
|
||||||
from twitch.tasks import _download_and_save
|
|
||||||
from twitch.tasks import backup_database
|
|
||||||
from twitch.tasks import download_all_images
|
|
||||||
from twitch.tasks import download_benefit_image
|
|
||||||
from twitch.tasks import download_campaign_image
|
|
||||||
from twitch.tasks import download_game_image
|
|
||||||
from twitch.tasks import download_reward_campaign_image
|
|
||||||
from twitch.tasks import import_chat_badges
|
|
||||||
from twitch.tasks import import_twitch_file
|
|
||||||
from twitch.tasks import scan_pending_twitch_files
|
|
||||||
|
|
||||||
|
|
||||||
def test_scan_pending_twitch_files_dispatches_json_files(
|
|
||||||
monkeypatch: pytest.MonkeyPatch,
|
|
||||||
tmp_path: Path,
|
|
||||||
) -> None:
|
|
||||||
"""It should dispatch an import task for each JSON file in the pending directory."""
|
|
||||||
first = tmp_path / "one.json"
|
|
||||||
second = tmp_path / "two.json"
|
|
||||||
ignored = tmp_path / "notes.txt"
|
|
||||||
first.write_text("{}", encoding="utf-8")
|
|
||||||
second.write_text("{}", encoding="utf-8")
|
|
||||||
ignored.write_text("ignored", encoding="utf-8")
|
|
||||||
monkeypatch.setenv("TTVDROPS_PENDING_DIR", str(tmp_path))
|
|
||||||
|
|
||||||
with patch("twitch.tasks.import_twitch_file.delay") as delay_mock:
|
|
||||||
scan_pending_twitch_files.run()
|
|
||||||
|
|
||||||
delay_mock.assert_has_calls([call(str(first)), call(str(second))], any_order=True)
|
|
||||||
assert delay_mock.call_count == 2
|
|
||||||
|
|
||||||
|
|
||||||
def test_scan_pending_twitch_files_skips_when_env_missing(
|
|
||||||
monkeypatch: pytest.MonkeyPatch,
|
|
||||||
) -> None:
|
|
||||||
"""It should do nothing when TTVDROPS_PENDING_DIR is not configured."""
|
|
||||||
monkeypatch.delenv("TTVDROPS_PENDING_DIR", raising=False)
|
|
||||||
|
|
||||||
with patch("twitch.tasks.import_twitch_file.delay") as delay_mock:
|
|
||||||
scan_pending_twitch_files.run()
|
|
||||||
|
|
||||||
delay_mock.assert_not_called()
|
|
||||||
|
|
||||||
|
|
||||||
def test_import_twitch_file_calls_importer_for_existing_file(tmp_path: Path) -> None:
|
|
||||||
"""It should run BetterImportDrops with the provided path when the file exists."""
|
|
||||||
source = tmp_path / "drops.json"
|
|
||||||
source.write_text("{}", encoding="utf-8")
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"twitch.management.commands.better_import_drops.Command.handle",
|
|
||||||
) as handle_mock:
|
|
||||||
import_twitch_file.run(str(source))
|
|
||||||
|
|
||||||
assert handle_mock.call_count == 1
|
|
||||||
assert handle_mock.call_args.kwargs["path"] == source
|
|
||||||
|
|
||||||
|
|
||||||
def test_import_twitch_file_retries_on_import_error(tmp_path: Path) -> None:
|
|
||||||
"""It should retry when the importer raises an exception."""
|
|
||||||
source = tmp_path / "drops.json"
|
|
||||||
source.write_text("{}", encoding="utf-8")
|
|
||||||
error = RuntimeError("boom")
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch(
|
|
||||||
"twitch.management.commands.better_import_drops.Command.handle",
|
|
||||||
side_effect=error,
|
|
||||||
),
|
|
||||||
patch.object(
|
|
||||||
import_twitch_file,
|
|
||||||
"retry",
|
|
||||||
side_effect=RuntimeError("retried"),
|
|
||||||
) as retry_mock,
|
|
||||||
pytest.raises(RuntimeError, match="retried"),
|
|
||||||
):
|
|
||||||
import_twitch_file.run(str(source))
|
|
||||||
|
|
||||||
retry_mock.assert_called_once_with(exc=error)
|
|
||||||
|
|
||||||
|
|
||||||
def test_download_and_save_skips_when_image_already_cached() -> None:
|
|
||||||
"""It should not download when the target ImageField already has a file name."""
|
|
||||||
file_field = MagicMock()
|
|
||||||
file_field.name = "already-there.jpg"
|
|
||||||
|
|
||||||
with patch("twitch.tasks.httpx.Client") as client_mock:
|
|
||||||
result = _download_and_save(
|
|
||||||
url="https://example.com/test.jpg",
|
|
||||||
name="game-1",
|
|
||||||
file_field=file_field,
|
|
||||||
)
|
|
||||||
|
|
||||||
assert result is False
|
|
||||||
client_mock.assert_not_called()
|
|
||||||
|
|
||||||
|
|
||||||
def test_download_and_save_saves_downloaded_content() -> None:
|
|
||||||
"""It should save downloaded bytes and trigger modern format conversion."""
|
|
||||||
file_field = MagicMock()
|
|
||||||
file_field.name = ""
|
|
||||||
file_field.path = "C:/cache/game-1.png"
|
|
||||||
|
|
||||||
response = MagicMock()
|
|
||||||
response.content = b"img-bytes"
|
|
||||||
response.raise_for_status.return_value = None
|
|
||||||
|
|
||||||
client = MagicMock()
|
|
||||||
client.get.return_value = response
|
|
||||||
client_cm = MagicMock()
|
|
||||||
client_cm.__enter__.return_value = client
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("twitch.tasks.httpx.Client", return_value=client_cm),
|
|
||||||
patch("twitch.tasks._convert_to_modern_formats") as convert_mock,
|
|
||||||
):
|
|
||||||
result = _download_and_save(
|
|
||||||
url="https://example.com/path/picture.png",
|
|
||||||
name="game-1",
|
|
||||||
file_field=file_field,
|
|
||||||
)
|
|
||||||
|
|
||||||
assert result is True
|
|
||||||
file_field.save.assert_called_once()
|
|
||||||
assert file_field.save.call_args.args[0] == "game-1.png"
|
|
||||||
assert file_field.save.call_args.kwargs["save"] is True
|
|
||||||
convert_mock.assert_called_once_with(Path("C:/cache/game-1.png"))
|
|
||||||
|
|
||||||
|
|
||||||
def test_download_and_save_returns_false_on_http_error() -> None:
|
|
||||||
"""It should return False when HTTP requests fail."""
|
|
||||||
file_field = MagicMock()
|
|
||||||
file_field.name = ""
|
|
||||||
|
|
||||||
client = MagicMock()
|
|
||||||
client.get.side_effect = httpx.HTTPError("network down")
|
|
||||||
client_cm = MagicMock()
|
|
||||||
client_cm.__enter__.return_value = client
|
|
||||||
|
|
||||||
with patch("twitch.tasks.httpx.Client", return_value=client_cm):
|
|
||||||
result = _download_and_save(
|
|
||||||
url="https://example.com/path/picture.png",
|
|
||||||
name="game-1",
|
|
||||||
file_field=file_field,
|
|
||||||
)
|
|
||||||
|
|
||||||
assert result is False
|
|
||||||
file_field.save.assert_not_called()
|
|
||||||
|
|
||||||
|
|
||||||
def test_download_game_image_downloads_normalized_twitch_url() -> None:
|
|
||||||
"""It should normalize Twitch box art URLs before downloading."""
|
|
||||||
game = SimpleNamespace(
|
|
||||||
box_art="https://static-cdn.jtvnw.net/ttv-boxart/1-{width}x{height}.jpg",
|
|
||||||
twitch_id="123",
|
|
||||||
box_art_file=MagicMock(),
|
|
||||||
)
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("twitch.models.Game.objects.get", return_value=game),
|
|
||||||
patch(
|
|
||||||
"twitch.utils.is_twitch_box_art_url",
|
|
||||||
return_value=True,
|
|
||||||
) as is_twitch_mock,
|
|
||||||
patch(
|
|
||||||
"twitch.utils.normalize_twitch_box_art_url",
|
|
||||||
return_value="https://cdn.example.com/box.jpg",
|
|
||||||
) as normalize_mock,
|
|
||||||
patch("twitch.tasks._download_and_save") as save_mock,
|
|
||||||
):
|
|
||||||
download_game_image.run(1)
|
|
||||||
|
|
||||||
is_twitch_mock.assert_called_once_with(game.box_art)
|
|
||||||
normalize_mock.assert_called_once_with(game.box_art)
|
|
||||||
save_mock.assert_called_once_with(
|
|
||||||
"https://cdn.example.com/box.jpg",
|
|
||||||
"123",
|
|
||||||
game.box_art_file,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def test_download_game_image_retries_on_download_error() -> None:
|
|
||||||
"""It should retry when the image download helper fails unexpectedly."""
|
|
||||||
game = SimpleNamespace(
|
|
||||||
box_art="https://static-cdn.jtvnw.net/ttv-boxart/1-{width}x{height}.jpg",
|
|
||||||
twitch_id="123",
|
|
||||||
box_art_file=MagicMock(),
|
|
||||||
)
|
|
||||||
error = RuntimeError("boom")
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("twitch.models.Game.objects.get", return_value=game),
|
|
||||||
patch("twitch.utils.is_twitch_box_art_url", return_value=True),
|
|
||||||
patch(
|
|
||||||
"twitch.utils.normalize_twitch_box_art_url",
|
|
||||||
return_value="https://cdn.example.com/box.jpg",
|
|
||||||
),
|
|
||||||
patch("twitch.tasks._download_and_save", side_effect=error),
|
|
||||||
patch.object(
|
|
||||||
download_game_image,
|
|
||||||
"retry",
|
|
||||||
side_effect=RuntimeError("retried"),
|
|
||||||
) as retry_mock,
|
|
||||||
pytest.raises(RuntimeError, match="retried"),
|
|
||||||
):
|
|
||||||
download_game_image.run(1)
|
|
||||||
|
|
||||||
retry_mock.assert_called_once_with(exc=error)
|
|
||||||
|
|
||||||
|
|
||||||
def test_download_all_images_calls_expected_commands() -> None:
|
|
||||||
"""It should invoke both image import management commands."""
|
|
||||||
with patch("twitch.tasks.call_command") as call_command_mock:
|
|
||||||
download_all_images.run()
|
|
||||||
|
|
||||||
call_command_mock.assert_has_calls([
|
|
||||||
call("download_box_art"),
|
|
||||||
call("download_campaign_images", model="all"),
|
|
||||||
])
|
|
||||||
|
|
||||||
|
|
||||||
def test_import_chat_badges_retries_on_error() -> None:
|
|
||||||
"""It should retry when import_chat_badges command execution fails."""
|
|
||||||
error = RuntimeError("boom")
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("twitch.tasks.call_command", side_effect=error),
|
|
||||||
patch.object(
|
|
||||||
import_chat_badges,
|
|
||||||
"retry",
|
|
||||||
side_effect=RuntimeError("retried"),
|
|
||||||
) as retry_mock,
|
|
||||||
pytest.raises(RuntimeError, match="retried"),
|
|
||||||
):
|
|
||||||
import_chat_badges.run()
|
|
||||||
|
|
||||||
retry_mock.assert_called_once_with(exc=error)
|
|
||||||
|
|
||||||
|
|
||||||
def test_backup_database_calls_backup_command() -> None:
|
|
||||||
"""It should invoke the backup_db management command."""
|
|
||||||
with patch("twitch.tasks.call_command") as call_command_mock:
|
|
||||||
backup_database.run()
|
|
||||||
|
|
||||||
call_command_mock.assert_called_once_with("backup_db")
|
|
||||||
|
|
||||||
|
|
||||||
def test_convert_to_modern_formats_skips_non_image_files(tmp_path: Path) -> None:
|
|
||||||
"""It should skip files that are not jpg, jpeg, or png."""
|
|
||||||
text_file = tmp_path / "document.txt"
|
|
||||||
text_file.write_text("Not an image", encoding="utf-8")
|
|
||||||
|
|
||||||
with patch("PIL.Image.open") as open_mock:
|
|
||||||
_convert_to_modern_formats(text_file)
|
|
||||||
|
|
||||||
open_mock.assert_not_called()
|
|
||||||
|
|
||||||
|
|
||||||
def test_convert_to_modern_formats_skips_missing_files(tmp_path: Path) -> None:
|
|
||||||
"""It should skip files that do not exist."""
|
|
||||||
missing_file = tmp_path / "nonexistent.jpg"
|
|
||||||
|
|
||||||
with patch("PIL.Image.open") as open_mock:
|
|
||||||
_convert_to_modern_formats(missing_file)
|
|
||||||
|
|
||||||
open_mock.assert_not_called()
|
|
||||||
|
|
||||||
|
|
||||||
def test_convert_to_modern_formats_converts_rgb_image(tmp_path: Path) -> None:
|
|
||||||
"""It should save image in WebP and AVIF formats for RGB images."""
|
|
||||||
original = tmp_path / "image.jpg"
|
|
||||||
original.write_bytes(b"fake-jpeg")
|
|
||||||
|
|
||||||
mock_image = MagicMock()
|
|
||||||
mock_image.mode = "RGB"
|
|
||||||
mock_copy = MagicMock()
|
|
||||||
mock_image.copy.return_value = mock_copy
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("PIL.Image.open") as open_mock,
|
|
||||||
):
|
|
||||||
open_mock.return_value.__enter__.return_value = mock_image
|
|
||||||
_convert_to_modern_formats(original)
|
|
||||||
|
|
||||||
open_mock.assert_called_once_with(original)
|
|
||||||
assert mock_copy.save.call_count == 2
|
|
||||||
webp_call = [c for c in mock_copy.save.call_args_list if "webp" in str(c)]
|
|
||||||
avif_call = [c for c in mock_copy.save.call_args_list if "avif" in str(c)]
|
|
||||||
assert len(webp_call) == 1
|
|
||||||
assert len(avif_call) == 1
|
|
||||||
|
|
||||||
|
|
||||||
def test_convert_to_modern_formats_converts_rgba_image_with_background(
|
|
||||||
tmp_path: Path,
|
|
||||||
) -> None:
|
|
||||||
"""It should create RGB background and paste RGBA image onto it."""
|
|
||||||
original = tmp_path / "image.png"
|
|
||||||
original.write_bytes(b"fake-png")
|
|
||||||
|
|
||||||
mock_rgba = MagicMock()
|
|
||||||
mock_rgba.mode = "RGBA"
|
|
||||||
mock_rgba.split.return_value = [MagicMock(), MagicMock(), MagicMock(), MagicMock()]
|
|
||||||
mock_rgba.size = (100, 100)
|
|
||||||
mock_rgba.convert.return_value = mock_rgba
|
|
||||||
|
|
||||||
mock_bg = MagicMock()
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("PIL.Image.open") as open_mock,
|
|
||||||
patch("PIL.Image.new", return_value=mock_bg) as new_mock,
|
|
||||||
):
|
|
||||||
open_mock.return_value.__enter__.return_value = mock_rgba
|
|
||||||
_convert_to_modern_formats(original)
|
|
||||||
|
|
||||||
new_mock.assert_called_once_with("RGB", (100, 100), (255, 255, 255))
|
|
||||||
mock_bg.paste.assert_called_once()
|
|
||||||
assert mock_bg.save.call_count == 2
|
|
||||||
|
|
||||||
|
|
||||||
def test_convert_to_modern_formats_handles_conversion_error(tmp_path: Path) -> None:
|
|
||||||
"""It should gracefully handle format conversion failures."""
|
|
||||||
original = tmp_path / "image.jpg"
|
|
||||||
original.write_bytes(b"fake-jpeg")
|
|
||||||
|
|
||||||
mock_image = MagicMock()
|
|
||||||
mock_image.mode = "RGB"
|
|
||||||
mock_image.copy.return_value = MagicMock()
|
|
||||||
mock_image.copy.return_value.save.side_effect = Exception("PIL error")
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("PIL.Image.open") as open_mock,
|
|
||||||
):
|
|
||||||
open_mock.return_value.__enter__.return_value = mock_image
|
|
||||||
_convert_to_modern_formats(original) # Should not raise
|
|
||||||
|
|
||||||
|
|
||||||
def test_download_campaign_image_downloads_when_url_exists() -> None:
|
|
||||||
"""It should download and save campaign image when image_url is present."""
|
|
||||||
campaign = SimpleNamespace(
|
|
||||||
image_url="https://example.com/campaign.jpg",
|
|
||||||
twitch_id="camp123",
|
|
||||||
image_file=MagicMock(),
|
|
||||||
)
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("twitch.models.DropCampaign.objects.get", return_value=campaign),
|
|
||||||
patch("twitch.tasks._download_and_save", return_value=True) as save_mock,
|
|
||||||
):
|
|
||||||
download_campaign_image.run(1)
|
|
||||||
|
|
||||||
save_mock.assert_called_once_with(
|
|
||||||
"https://example.com/campaign.jpg",
|
|
||||||
"camp123",
|
|
||||||
campaign.image_file,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def test_download_campaign_image_skips_when_no_url() -> None:
|
|
||||||
"""It should skip when campaign has no image_url."""
|
|
||||||
campaign = SimpleNamespace(
|
|
||||||
image_url=None,
|
|
||||||
twitch_id="camp123",
|
|
||||||
image_file=MagicMock(),
|
|
||||||
)
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("twitch.models.DropCampaign.objects.get", return_value=campaign),
|
|
||||||
patch("twitch.tasks._download_and_save") as save_mock,
|
|
||||||
):
|
|
||||||
download_campaign_image.run(1)
|
|
||||||
|
|
||||||
save_mock.assert_not_called()
|
|
||||||
|
|
||||||
|
|
||||||
def test_download_campaign_image_skips_when_not_found() -> None:
|
|
||||||
"""It should skip when campaign does not exist."""
|
|
||||||
with (
|
|
||||||
patch(
|
|
||||||
"twitch.models.DropCampaign.objects.get",
|
|
||||||
side_effect=DropCampaign.DoesNotExist(),
|
|
||||||
),
|
|
||||||
patch("twitch.tasks._download_and_save") as save_mock,
|
|
||||||
):
|
|
||||||
download_campaign_image.run(1)
|
|
||||||
|
|
||||||
save_mock.assert_not_called()
|
|
||||||
|
|
||||||
|
|
||||||
def test_download_campaign_image_retries_on_error() -> None:
|
|
||||||
"""It should retry when image download fails unexpectedly."""
|
|
||||||
campaign = SimpleNamespace(
|
|
||||||
image_url="https://example.com/campaign.jpg",
|
|
||||||
twitch_id="camp123",
|
|
||||||
image_file=MagicMock(),
|
|
||||||
)
|
|
||||||
error = RuntimeError("boom")
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("twitch.models.DropCampaign.objects.get", return_value=campaign),
|
|
||||||
patch("twitch.tasks._download_and_save", side_effect=error),
|
|
||||||
patch.object(
|
|
||||||
download_campaign_image,
|
|
||||||
"retry",
|
|
||||||
side_effect=RuntimeError("retried"),
|
|
||||||
) as retry_mock,
|
|
||||||
pytest.raises(RuntimeError, match="retried"),
|
|
||||||
):
|
|
||||||
download_campaign_image.run(1)
|
|
||||||
|
|
||||||
retry_mock.assert_called_once_with(exc=error)
|
|
||||||
|
|
||||||
|
|
||||||
def test_download_benefit_image_downloads_when_url_exists() -> None:
|
|
||||||
"""It should download and save benefit image when image_asset_url is present."""
|
|
||||||
benefit = SimpleNamespace(
|
|
||||||
image_asset_url="https://example.com/benefit.png",
|
|
||||||
twitch_id="benefit123",
|
|
||||||
image_file=MagicMock(),
|
|
||||||
)
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("twitch.models.DropBenefit.objects.get", return_value=benefit),
|
|
||||||
patch("twitch.tasks._download_and_save", return_value=True) as save_mock,
|
|
||||||
):
|
|
||||||
download_benefit_image.run(1)
|
|
||||||
|
|
||||||
save_mock.assert_called_once_with(
|
|
||||||
url="https://example.com/benefit.png",
|
|
||||||
name="benefit123",
|
|
||||||
file_field=benefit.image_file,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def test_download_benefit_image_skips_when_no_url() -> None:
|
|
||||||
"""It should skip when benefit has no image_asset_url."""
|
|
||||||
benefit = SimpleNamespace(
|
|
||||||
image_asset_url=None,
|
|
||||||
twitch_id="benefit123",
|
|
||||||
image_file=MagicMock(),
|
|
||||||
)
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("twitch.models.DropBenefit.objects.get", return_value=benefit),
|
|
||||||
patch("twitch.tasks._download_and_save") as save_mock,
|
|
||||||
):
|
|
||||||
download_benefit_image.run(1)
|
|
||||||
|
|
||||||
save_mock.assert_not_called()
|
|
||||||
|
|
||||||
|
|
||||||
def test_download_benefit_image_skips_when_not_found() -> None:
|
|
||||||
"""It should skip when benefit does not exist."""
|
|
||||||
with (
|
|
||||||
patch(
|
|
||||||
"twitch.models.DropBenefit.objects.get",
|
|
||||||
side_effect=DropBenefit.DoesNotExist(),
|
|
||||||
),
|
|
||||||
patch("twitch.tasks._download_and_save") as save_mock,
|
|
||||||
):
|
|
||||||
download_benefit_image.run(1)
|
|
||||||
|
|
||||||
save_mock.assert_not_called()
|
|
||||||
|
|
||||||
|
|
||||||
def test_download_benefit_image_retries_on_error() -> None:
|
|
||||||
"""It should retry when image download fails unexpectedly."""
|
|
||||||
benefit = SimpleNamespace(
|
|
||||||
image_asset_url="https://example.com/benefit.png",
|
|
||||||
twitch_id="benefit123",
|
|
||||||
image_file=MagicMock(),
|
|
||||||
)
|
|
||||||
error = RuntimeError("boom")
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("twitch.models.DropBenefit.objects.get", return_value=benefit),
|
|
||||||
patch("twitch.tasks._download_and_save", side_effect=error),
|
|
||||||
patch.object(
|
|
||||||
download_benefit_image,
|
|
||||||
"retry",
|
|
||||||
side_effect=RuntimeError("retried"),
|
|
||||||
) as retry_mock,
|
|
||||||
pytest.raises(RuntimeError, match="retried"),
|
|
||||||
):
|
|
||||||
download_benefit_image.run(1)
|
|
||||||
|
|
||||||
retry_mock.assert_called_once_with(exc=error)
|
|
||||||
|
|
||||||
|
|
||||||
def test_download_reward_campaign_image_downloads_when_url_exists() -> None:
|
|
||||||
"""It should download and save reward campaign image when image_url is present."""
|
|
||||||
reward = SimpleNamespace(
|
|
||||||
image_url="https://example.com/reward.jpg",
|
|
||||||
twitch_id="reward123",
|
|
||||||
image_file=MagicMock(),
|
|
||||||
)
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("twitch.models.RewardCampaign.objects.get", return_value=reward),
|
|
||||||
patch("twitch.tasks._download_and_save", return_value=True) as save_mock,
|
|
||||||
):
|
|
||||||
download_reward_campaign_image.run(1)
|
|
||||||
|
|
||||||
save_mock.assert_called_once_with(
|
|
||||||
"https://example.com/reward.jpg",
|
|
||||||
"reward123",
|
|
||||||
reward.image_file,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def test_download_reward_campaign_image_skips_when_no_url() -> None:
|
|
||||||
"""It should skip when reward has no image_url."""
|
|
||||||
reward = SimpleNamespace(
|
|
||||||
image_url=None,
|
|
||||||
twitch_id="reward123",
|
|
||||||
image_file=MagicMock(),
|
|
||||||
)
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("twitch.models.RewardCampaign.objects.get", return_value=reward),
|
|
||||||
patch("twitch.tasks._download_and_save") as save_mock,
|
|
||||||
):
|
|
||||||
download_reward_campaign_image.run(1)
|
|
||||||
|
|
||||||
save_mock.assert_not_called()
|
|
||||||
|
|
||||||
|
|
||||||
def test_download_reward_campaign_image_skips_when_not_found() -> None:
|
|
||||||
"""It should skip when reward does not exist."""
|
|
||||||
with (
|
|
||||||
patch(
|
|
||||||
"twitch.models.RewardCampaign.objects.get",
|
|
||||||
side_effect=RewardCampaign.DoesNotExist(),
|
|
||||||
),
|
|
||||||
patch("twitch.tasks._download_and_save") as save_mock,
|
|
||||||
):
|
|
||||||
download_reward_campaign_image.run(1)
|
|
||||||
|
|
||||||
save_mock.assert_not_called()
|
|
||||||
|
|
||||||
|
|
||||||
def test_download_reward_campaign_image_retries_on_error() -> None:
|
|
||||||
"""It should retry when image download fails unexpectedly."""
|
|
||||||
reward = SimpleNamespace(
|
|
||||||
image_url="https://example.com/reward.jpg",
|
|
||||||
twitch_id="reward123",
|
|
||||||
image_file=MagicMock(),
|
|
||||||
)
|
|
||||||
error = RuntimeError("boom")
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("twitch.models.RewardCampaign.objects.get", return_value=reward),
|
|
||||||
patch("twitch.tasks._download_and_save", side_effect=error),
|
|
||||||
patch.object(
|
|
||||||
download_reward_campaign_image,
|
|
||||||
"retry",
|
|
||||||
side_effect=RuntimeError("retried"),
|
|
||||||
) as retry_mock,
|
|
||||||
pytest.raises(RuntimeError, match="retried"),
|
|
||||||
):
|
|
||||||
download_reward_campaign_image.run(1)
|
|
||||||
|
|
||||||
retry_mock.assert_called_once_with(exc=error)
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue