diff --git a/accounts/migrations/0001_initial.py b/accounts/migrations/0001_initial.py index 84f8d52..d39a7e4 100644 --- a/accounts/migrations/0001_initial.py +++ b/accounts/migrations/0001_initial.py @@ -1,5 +1,4 @@ -# Generated by Django 5.2.4 on 2025-07-21 00:54 -from __future__ import annotations +# Generated by Django 5.2.4 on 2025-07-23 23:51 import django.contrib.auth.models import django.contrib.auth.validators @@ -8,86 +7,38 @@ from django.db import migrations, models class Migration(migrations.Migration): + initial = True dependencies = [ - ("auth", "0012_alter_user_first_name_max_length"), + ('auth', '0012_alter_user_first_name_max_length'), ] operations = [ migrations.CreateModel( - name="User", + name='User', fields=[ - ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), - ("password", models.CharField(max_length=128, verbose_name="password")), - ("last_login", models.DateTimeField(blank=True, null=True, verbose_name="last login")), - ( - "is_superuser", - models.BooleanField( - default=False, - help_text="Designates that this user has all permissions without explicitly assigning them.", - verbose_name="superuser status", - ), - ), - ( - "username", - models.CharField( - error_messages={"unique": "A user with that username already exists."}, - help_text="Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.", - max_length=150, - unique=True, - validators=[django.contrib.auth.validators.UnicodeUsernameValidator()], - verbose_name="username", - ), - ), - ("first_name", models.CharField(blank=True, max_length=150, verbose_name="first name")), - ("last_name", models.CharField(blank=True, max_length=150, verbose_name="last name")), - ("email", models.EmailField(blank=True, max_length=254, verbose_name="email address")), - ( - "is_staff", - models.BooleanField( - default=False, help_text="Designates whether the user can log into this admin site.", verbose_name="staff status" - ), - ), - ( - "is_active", - models.BooleanField( - default=True, - help_text="Designates whether this user should be treated as active. Unselect this instead of deleting accounts.", - verbose_name="active", - ), - ), - ("date_joined", models.DateTimeField(default=django.utils.timezone.now, verbose_name="date joined")), - ( - "groups", - models.ManyToManyField( - blank=True, - help_text="The groups this user belongs to. A user will get all permissions granted to each of their groups.", - related_name="user_set", - related_query_name="user", - to="auth.group", - verbose_name="groups", - ), - ), - ( - "user_permissions", - models.ManyToManyField( - blank=True, - help_text="Specific permissions for this user.", - related_name="user_set", - related_query_name="user", - to="auth.permission", - verbose_name="user permissions", - ), - ), + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('password', models.CharField(max_length=128, verbose_name='password')), + ('last_login', models.DateTimeField(blank=True, null=True, verbose_name='last login')), + ('is_superuser', models.BooleanField(default=False, help_text='Designates that this user has all permissions without explicitly assigning them.', verbose_name='superuser status')), + ('username', models.CharField(error_messages={'unique': 'A user with that username already exists.'}, help_text='Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.', max_length=150, unique=True, validators=[django.contrib.auth.validators.UnicodeUsernameValidator()], verbose_name='username')), + ('first_name', models.CharField(blank=True, max_length=150, verbose_name='first name')), + ('last_name', models.CharField(blank=True, max_length=150, verbose_name='last name')), + ('email', models.EmailField(blank=True, max_length=254, verbose_name='email address')), + ('is_staff', models.BooleanField(default=False, help_text='Designates whether the user can log into this admin site.', verbose_name='staff status')), + ('is_active', models.BooleanField(default=True, help_text='Designates whether this user should be treated as active. Unselect this instead of deleting accounts.', verbose_name='active')), + ('date_joined', models.DateTimeField(default=django.utils.timezone.now, verbose_name='date joined')), + ('groups', models.ManyToManyField(blank=True, help_text='The groups this user belongs to. A user will get all permissions granted to each of their groups.', related_name='user_set', related_query_name='user', to='auth.group', verbose_name='groups')), + ('user_permissions', models.ManyToManyField(blank=True, help_text='Specific permissions for this user.', related_name='user_set', related_query_name='user', to='auth.permission', verbose_name='user permissions')), ], options={ - "verbose_name": "User", - "verbose_name_plural": "Users", - "db_table": "auth_user", + 'verbose_name': 'User', + 'verbose_name_plural': 'Users', + 'db_table': 'auth_user', }, managers=[ - ("objects", django.contrib.auth.models.UserManager()), + ('objects', django.contrib.auth.models.UserManager()), ], ), ] diff --git a/accounts/tests.py b/accounts/tests.py deleted file mode 100644 index 26c8c43..0000000 --- a/accounts/tests.py +++ /dev/null @@ -1,66 +0,0 @@ -from __future__ import annotations - -from django.contrib.auth import get_user_model -from django.test import TestCase -from django.urls import reverse - -User = get_user_model() - - -class AuthenticationTestCase(TestCase): - """Test authentication functionality.""" - - def setUp(self) -> None: - """Set up test data.""" - self.username = "testuser" - self.password = "testpass123" - self.user = User.objects.create_user(username=self.username, password=self.password) - - def test_login_view_get(self) -> None: - """Test login view GET request.""" - response = self.client.get(reverse("accounts:login")) - assert response.status_code == 200 - self.assertContains(response, "Login") - - def test_signup_view_get(self) -> None: - """Test signup view GET request.""" - response = self.client.get(reverse("accounts:signup")) - assert response.status_code == 200 - self.assertContains(response, "Sign Up") - - def test_login_valid_user(self) -> None: - """Test login with valid credentials.""" - response = self.client.post(reverse("accounts:login"), {"username": self.username, "password": self.password}) - assert response.status_code == 302 # Redirect after login - - def test_login_invalid_user(self) -> None: - """Test login with invalid credentials.""" - response = self.client.post(reverse("accounts:login"), {"username": self.username, "password": "wrongpassword"}) - assert response.status_code == 200 # Stay on login page - self.assertContains(response, "Please enter a correct username and password") - - def test_profile_view_authenticated(self) -> None: - """Test profile view for authenticated user.""" - self.client.login(username=self.username, password=self.password) - response = self.client.get(reverse("accounts:profile")) - assert response.status_code == 200 - self.assertContains(response, self.username) - - def test_profile_view_unauthenticated(self) -> None: - """Test profile view redirects unauthenticated user.""" - response = self.client.get(reverse("accounts:profile")) - assert response.status_code == 302 # Redirect to login - - def test_user_signup(self) -> None: - """Test user signup functionality.""" - response = self.client.post( - reverse("accounts:signup"), {"username": "newuser", "password1": "complexpass123", "password2": "complexpass123"} - ) - assert response.status_code == 302 # Redirect after signup - assert User.objects.filter(username="newuser").exists() - - def test_logout(self) -> None: - """Test user logout functionality.""" - self.client.login(username=self.username, password=self.password) - response = self.client.get(reverse("accounts:logout")) - assert response.status_code == 302 # Redirect after logout diff --git a/accounts/tests/__init__.py b/accounts/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/accounts/tests/test_admin.py b/accounts/tests/test_admin.py deleted file mode 100644 index 3eb70ee..0000000 --- a/accounts/tests/test_admin.py +++ /dev/null @@ -1,86 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING, Any - -import pytest -from django.contrib import admin -from django.contrib.auth import get_user_model -from django.contrib.auth.admin import UserAdmin -from django.contrib.auth.models import AbstractUser -from django.db.models.base import Model -from django.test import RequestFactory - -if TYPE_CHECKING: - from django.contrib.admin import ModelAdmin - from django.contrib.auth.models import AbstractUser - from django.db.models.base import Model - -User: type[AbstractUser] = get_user_model() - - -@pytest.mark.django_db -class TestUserAdmin: - """Test cases for User admin configuration.""" - - def setup_method(self) -> None: - """Set up test data for each test method.""" - self.factory = RequestFactory() - self.superuser: AbstractUser = User.objects.create_superuser( - username="admin", - email="admin@example.com", - password="test_admin_password_123", - ) - - def test_user_model_registered_in_admin(self) -> None: - """Test that User model is registered in Django admin.""" - registry: dict[type[Model], ModelAdmin[Any]] = admin.site._registry # noqa: SLF001 - assert User in registry - - def test_user_admin_class(self) -> None: - """Test that User is registered with UserAdmin.""" - registry: dict[type[Model], ModelAdmin[Any]] = admin.site._registry # noqa: SLF001 - admin_class: admin.ModelAdmin[Any] = registry[User] - assert isinstance(admin_class, UserAdmin) - - def test_user_admin_can_create_user(self) -> None: - """Test that admin can create users through the interface.""" - # Test that the admin form can handle user creation - registry: dict[type[Model], ModelAdmin[Any]] = admin.site._registry # noqa: SLF001 - user_admin: admin.ModelAdmin[Any] = registry[User] - - # Check that the admin has the expected methods - assert hasattr(user_admin, "get_form") - assert hasattr(user_admin, "save_model") - - def test_user_admin_list_display(self) -> None: - """Test admin list display configuration.""" - registry: dict[type[Model], ModelAdmin[Any]] = admin.site._registry # noqa: SLF001 - user_admin: admin.ModelAdmin[Any] = registry[User] - - # UserAdmin should have default list_display - expected_fields: tuple[str, ...] = ( - "username", - "email", - "first_name", - "last_name", - "is_staff", - ) - assert user_admin.list_display == expected_fields - - def test_user_admin_search_fields(self) -> None: - """Test admin search fields configuration.""" - registry: dict[type[Model], ModelAdmin[Any]] = admin.site._registry # noqa: SLF001 - user_admin: admin.ModelAdmin[Any] = registry[User] - - # UserAdmin should have default search fields - expected_search_fields: tuple[str, ...] = ("username", "first_name", "last_name", "email") - assert user_admin.search_fields == expected_search_fields - - def test_user_admin_fieldsets(self) -> None: - """Test admin fieldsets configuration.""" - registry: dict[type[Model], ModelAdmin[Any]] = admin.site._registry # noqa: SLF001 - user_admin: admin.ModelAdmin[Any] = registry[User] - - # UserAdmin should have fieldsets defined - assert hasattr(user_admin, "fieldsets") - assert user_admin.fieldsets is not None diff --git a/accounts/tests/test_models.py b/accounts/tests/test_models.py deleted file mode 100644 index 970f465..0000000 --- a/accounts/tests/test_models.py +++ /dev/null @@ -1,178 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING - -import pytest -from django.contrib.auth import get_user_model -from django.contrib.auth.models import AbstractUser -from django.core.exceptions import ValidationError -from django.db import IntegrityError - -if TYPE_CHECKING: - from django.contrib.auth.models import AbstractUser - -User: type[AbstractUser] = get_user_model() - - -@pytest.mark.django_db -class TestUserModel: - """Test cases for the custom User model.""" - - def test_create_user(self) -> None: - """Test creating a regular user.""" - user: AbstractUser = User.objects.create_user(username="testuser", email="test@example.com", password="testpass123") - - assert user.username == "testuser" - assert user.email == "test@example.com" - assert user.check_password("testpass123") - assert not user.is_staff - assert not user.is_superuser - assert user.is_active - - def test_create_superuser(self) -> None: - """Test creating a superuser.""" - superuser: AbstractUser = User.objects.create_superuser(username="admin", email="admin@example.com", password="adminpass123") - - assert superuser.username == "admin" - assert superuser.email == "admin@example.com" - assert superuser.check_password("adminpass123") - assert superuser.is_staff - assert superuser.is_superuser - assert superuser.is_active - - def test_user_str_representation(self) -> None: - """Test the string representation of the user.""" - user: AbstractUser = User.objects.create_user(username="testuser", email="test@example.com") - - assert str(user) == "testuser" - - def test_user_email_field(self) -> None: - """Test that email field works correctly.""" - user: AbstractUser = User.objects.create_user(username="testuser", email="test@example.com", password="testpass123") - - assert user.email == "test@example.com" - - # Test email update - user.email = "newemail@example.com" - user.save() - user.refresh_from_db() - - assert user.email == "newemail@example.com" - - def test_user_unique_username(self) -> None: - """Test that username must be unique.""" - User.objects.create_user(username="testuser", email="test1@example.com", password="testpass123") - - # Attempting to create another user with the same username should raise an error - with pytest.raises(IntegrityError): - User.objects.create_user(username="testuser", email="test2@example.com", password="testpass123") - - def test_user_password_hashing(self) -> None: - """Test that passwords are properly hashed.""" - password = "testpass123" - user: AbstractUser = User.objects.create_user(username="testuser", email="test@example.com", password=password) - - # Password should be hashed, not stored in plain text - assert user.password != password - assert user.check_password(password) - assert not user.check_password("wrongpassword") - - def test_user_without_email(self) -> None: - """Test creating a user without email.""" - user: AbstractUser = User.objects.create_user(username="testuser", password="testpass123") - - assert user.username == "testuser" - assert not user.email - assert user.check_password("testpass123") - - def test_user_model_meta_options(self) -> None: - """Test the model meta options.""" - assert User._meta.db_table == "auth_user" # noqa: SLF001 - assert User._meta.verbose_name == "User" # noqa: SLF001 - assert User._meta.verbose_name_plural == "Users" # noqa: SLF001 - - def test_user_manager_methods(self) -> None: - """Test User manager methods.""" - # Test create_user method - user: AbstractUser = User.objects.create_user(username="regularuser", email="regular@example.com", password="pass123") - assert not user.is_staff - assert not user.is_superuser - - # Test create_superuser method - superuser: AbstractUser = User.objects.create_superuser(username="superuser", email="super@example.com", password="superpass123") - assert superuser.is_staff - assert superuser.is_superuser - - def test_user_permissions(self) -> None: - """Test user permissions and groups.""" - user: AbstractUser = User.objects.create_user(username="testuser", email="test@example.com", password="testpass123") - - # Initially user should have no permissions - assert not user.user_permissions.exists() - assert not user.groups.exists() - - # Test has_perm method (should be False for regular user) - assert not user.has_perm("auth.add_user") - - def test_user_active_status(self) -> None: - """Test user active status functionality.""" - user: AbstractUser = User.objects.create_user(username="testuser", email="test@example.com", password="testpass123") - - # User should be active by default - assert user.is_active - - # Test deactivating user - user.is_active = False - user.save() - user.refresh_from_db() - - assert not user.is_active - - def test_user_date_joined(self) -> None: - """Test that date_joined is automatically set.""" - user: AbstractUser = User.objects.create_user(username="testuser", email="test@example.com", password="testpass123") - - assert user.date_joined is not None - - def test_user_last_login_initially_none(self) -> None: - """Test that last_login is initially None.""" - user: AbstractUser = User.objects.create_user(username="testuser", email="test@example.com", password="testpass123") - - assert user.last_login is None - - -@pytest.mark.django_db -class TestUserModelEdgeCases: - """Test edge cases and error conditions for the User model.""" - - def test_create_user_without_username(self) -> None: - """Test that creating a user without username raises an error.""" - with pytest.raises(ValueError, match="The given username must be set"): - User.objects.create_user(username="", email="test@example.com", password="testpass123") - - def test_create_superuser_without_is_staff(self) -> None: - """Test that create_superuser enforces is_staff=True.""" - with pytest.raises(ValueError, match=r"Superuser must have is_staff=True."): - User.objects.create_superuser(username="admin", email="admin@example.com", password="adminpass123", is_staff=False) - - def test_create_superuser_without_is_superuser(self) -> None: - """Test that create_superuser enforces is_superuser=True.""" - with pytest.raises(ValueError, match=r"Superuser must have is_superuser=True."): - User.objects.create_superuser(username="admin", email="admin@example.com", password="adminpass123", is_superuser=False) - - def test_user_with_very_long_username(self) -> None: - """Test username length validation.""" - # Django's default max_length for username is 150 - long_username: str = "a" * 151 - - user = User(username=long_username, email="test@example.com") - - with pytest.raises(ValidationError): - user.full_clean() - - def test_user_with_invalid_email_format(self) -> None: - """Test email format validation.""" - user = User(username="testuser", email="invalid-email-format") - - with pytest.raises(ValidationError): - user.full_clean() diff --git a/templates/twitch/campaign_detail.html b/templates/twitch/campaign_detail.html index f1c400f..e26d424 100644 --- a/templates/twitch/campaign_detail.html +++ b/templates/twitch/campaign_detail.html @@ -17,7 +17,7 @@ src="{{ campaign.image_url }}" alt="{{ campaign.name }}"> {% endif %} -
{{ campaign.description }}
+{{ campaign.description|linebreaksbr }}
Start: {{ campaign.start_at }} diff --git a/twitch/admin.py b/twitch/admin.py index e2c7460..b6e026d 100644 --- a/twitch/admin.py +++ b/twitch/admin.py @@ -32,10 +32,10 @@ class TimeBasedDropInline(admin.TabularInline): class DropCampaignAdmin(admin.ModelAdmin): """Admin configuration for DropCampaign model.""" - list_display = ("id", "name", "game", "owner", "status", "start_at", "end_at", "is_active") - list_filter = ("status", "game", "owner") + list_display = ("id", "name", "game", "owner", "start_at", "end_at", "is_active") + list_filter = ("game", "owner") search_fields = ("id", "name", "description") - inlines = [TimeBasedDropInline] + inlines = [TimeBasedDropInline] # noqa: RUF012 readonly_fields = ("created_at", "updated_at") @@ -61,7 +61,7 @@ class TimeBasedDropAdmin(admin.ModelAdmin): ) list_filter = ("campaign__game", "campaign") search_fields = ("id", "name") - inlines = [DropBenefitEdgeInline] + inlines = [DropBenefitEdgeInline] # noqa: RUF012 @admin.register(DropBenefit) diff --git a/twitch/management/commands/clean_playback_token_files.py b/twitch/management/commands/clean_playback_token_files.py index 66504ce..4b06711 100644 --- a/twitch/management/commands/clean_playback_token_files.py +++ b/twitch/management/commands/clean_playback_token_files.py @@ -142,11 +142,7 @@ class Command(BaseCommand): return # Set up the deleted directory - if deleted_dir_path: - deleted_dir = Path(str(deleted_dir_path)) - else: - # Default to a 'deleted' subdirectory in the source directory - deleted_dir = base_dir / "deleted" + deleted_dir: Path = Path(str(deleted_dir_path)) if deleted_dir_path else base_dir / "deleted" if not dry_run and not deleted_dir.exists(): deleted_dir.mkdir(parents=True, exist_ok=True) diff --git a/twitch/management/commands/import_drop_campaign.py b/twitch/management/commands/import_drop_campaign.py index e8ebaf0..d5c2fe7 100644 --- a/twitch/management/commands/import_drop_campaign.py +++ b/twitch/management/commands/import_drop_campaign.py @@ -38,14 +38,14 @@ class Command(BaseCommand): parser.add_argument( "--max-workers", type=int, - default=4, - help="Maximum number of worker processes to use for parallel importing (default: 4)", + default=100, + help="Maximum number of worker processes to use for parallel importing (default: 100)", ) parser.add_argument( "--batch-size", type=int, - default=100, - help="Number of files to process in each batch (default: 100)", + default=500, + help="Number of files to process in each batch (default: 500)", ) parser.add_argument( "--max-retries", @@ -96,7 +96,7 @@ class Command(BaseCommand): msg = f"Path {path} is neither a file nor a directory" raise CommandError(msg) - def _process_directory(self, directory: Path, processed_dir: str, max_workers: int = 4, batch_size: int = 100) -> None: + def _process_directory(self, directory: Path, processed_dir: str, max_workers: int = 100, batch_size: int = 1000) -> None: """Process all JSON files in a directory using parallel processing. Args: @@ -189,9 +189,13 @@ class Command(BaseCommand): try: with file_path.open(encoding="utf-8") as f: data = json.load(f) - except json.JSONDecodeError as e: - msg = f"Error decoding JSON: {e}" - raise CommandError(msg) from e + except json.JSONDecodeError: + error_dir_name = "error" + error_dir: Path = file_path.parent / error_dir_name + error_dir.mkdir(exist_ok=True) + self.stdout.write(self.style.WARNING(f"Invalid JSON in '{file_path.name}'. Moving to '{error_dir_name}'.")) + shutil.move(str(file_path), str(error_dir / file_path.name)) + return 0 # Counter for imported campaigns campaigns_imported = 0 @@ -274,7 +278,6 @@ class Command(BaseCommand): "image_url": campaign_data.get("imageURL", ""), "start_at": campaign_data["startAt"], "end_at": campaign_data["endAt"], - "status": campaign_data["status"], "is_account_connected": campaign_data["self"]["isAccountConnected"], "game": game, "owner": organization, diff --git a/twitch/migrations/0001_initial.py b/twitch/migrations/0001_initial.py index 8133056..cfdaf6a 100644 --- a/twitch/migrations/0001_initial.py +++ b/twitch/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 5.2.4 on 2025-07-09 20:44 +# Generated by Django 5.2.4 on 2025-07-23 23:51 import django.db.models.deletion from django.db import migrations, models @@ -16,27 +16,12 @@ class Migration(migrations.Migration): name='DropBenefit', fields=[ ('id', models.TextField(primary_key=True, serialize=False)), - ('name', models.TextField()), + ('name', models.TextField(db_index=True)), ('image_asset_url', models.URLField(blank=True, default='', max_length=500)), - ('created_at', models.DateTimeField()), + ('created_at', models.DateTimeField(db_index=True)), ('entitlement_limit', models.PositiveIntegerField(default=1)), ('is_ios_available', models.BooleanField(default=False)), - ('distribution_type', models.TextField(choices=[('DIRECT_ENTITLEMENT', 'Direct Entitlement'), ('CODE', 'Code')])), - ], - ), - migrations.CreateModel( - name='Game', - fields=[ - ('id', models.TextField(primary_key=True, serialize=False)), - ('slug', models.TextField(blank=True, default='')), - ('display_name', models.TextField()), - ], - ), - migrations.CreateModel( - name='Organization', - fields=[ - ('id', models.TextField(primary_key=True, serialize=False)), - ('name', models.TextField()), + ('distribution_type', models.TextField(choices=[('DIRECT_ENTITLEMENT', 'Direct Entitlement'), ('CODE', 'Code')], db_index=True)), ], ), migrations.CreateModel( @@ -47,23 +32,43 @@ class Migration(migrations.Migration): ('benefit', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='twitch.dropbenefit')), ], ), + migrations.CreateModel( + name='Game', + fields=[ + ('id', models.TextField(primary_key=True, serialize=False)), + ('slug', models.TextField(blank=True, db_index=True, default='')), + ('display_name', models.TextField(db_index=True)), + ], + options={ + 'indexes': [models.Index(fields=['slug'], name='twitch_game_slug_a02d3c_idx'), models.Index(fields=['display_name'], name='twitch_game_display_a35ba3_idx')], + }, + ), migrations.AddField( model_name='dropbenefit', name='game', field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='drop_benefits', to='twitch.game'), ), + migrations.CreateModel( + name='Organization', + fields=[ + ('id', models.TextField(primary_key=True, serialize=False)), + ('name', models.TextField(db_index=True)), + ], + options={ + 'indexes': [models.Index(fields=['name'], name='twitch_orga_name_febe72_idx')], + }, + ), migrations.CreateModel( name='DropCampaign', fields=[ ('id', models.TextField(primary_key=True, serialize=False)), - ('name', models.TextField()), + ('name', models.TextField(db_index=True)), ('description', models.TextField(blank=True)), ('details_url', models.URLField(blank=True, default='', max_length=500)), ('account_link_url', models.URLField(blank=True, default='', max_length=500)), ('image_url', models.URLField(blank=True, default='', max_length=500)), - ('start_at', models.DateTimeField()), - ('end_at', models.DateTimeField()), - ('status', models.TextField(choices=[('ACTIVE', 'Active'), ('UPCOMING', 'Upcoming'), ('EXPIRED', 'Expired')])), + ('start_at', models.DateTimeField(db_index=True)), + ('end_at', models.DateTimeField(db_index=True)), ('is_account_connected', models.BooleanField(default=False)), ('created_at', models.DateTimeField(auto_now_add=True)), ('updated_at', models.DateTimeField(auto_now=True)), @@ -80,11 +85,11 @@ class Migration(migrations.Migration): name='TimeBasedDrop', fields=[ ('id', models.TextField(primary_key=True, serialize=False)), - ('name', models.TextField()), - ('required_minutes_watched', models.PositiveIntegerField()), + ('name', models.TextField(db_index=True)), + ('required_minutes_watched', models.PositiveIntegerField(db_index=True)), ('required_subs', models.PositiveIntegerField(default=0)), - ('start_at', models.DateTimeField()), - ('end_at', models.DateTimeField()), + ('start_at', models.DateTimeField(db_index=True)), + ('end_at', models.DateTimeField(db_index=True)), ('benefits', models.ManyToManyField(related_name='drops', through='twitch.DropBenefitEdge', to='twitch.dropbenefit')), ('campaign', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='time_based_drops', to='twitch.dropcampaign')), ], @@ -94,6 +99,62 @@ class Migration(migrations.Migration): name='drop', field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='twitch.timebaseddrop'), ), + migrations.AddIndex( + model_name='dropcampaign', + index=models.Index(fields=['name'], name='twitch_drop_name_3b70b3_idx'), + ), + migrations.AddIndex( + model_name='dropcampaign', + index=models.Index(fields=['start_at', 'end_at'], name='twitch_drop_start_a_6e5fb6_idx'), + ), + migrations.AddIndex( + model_name='dropcampaign', + index=models.Index(fields=['game'], name='twitch_drop_game_id_868e70_idx'), + ), + migrations.AddIndex( + model_name='dropcampaign', + index=models.Index(fields=['owner'], name='twitch_drop_owner_i_37241d_idx'), + ), + migrations.AddIndex( + model_name='dropbenefit', + index=models.Index(fields=['name'], name='twitch_drop_name_7125ff_idx'), + ), + migrations.AddIndex( + model_name='dropbenefit', + index=models.Index(fields=['created_at'], name='twitch_drop_created_a3563e_idx'), + ), + migrations.AddIndex( + model_name='dropbenefit', + index=models.Index(fields=['distribution_type'], name='twitch_drop_distrib_08b224_idx'), + ), + migrations.AddIndex( + model_name='dropbenefit', + index=models.Index(fields=['game'], name='twitch_drop_game_id_a9209e_idx'), + ), + migrations.AddIndex( + model_name='dropbenefit', + index=models.Index(fields=['owner_organization'], name='twitch_drop_owner_o_45b4cc_idx'), + ), + migrations.AddIndex( + model_name='timebaseddrop', + index=models.Index(fields=['name'], name='twitch_time_name_47c0f4_idx'), + ), + migrations.AddIndex( + model_name='timebaseddrop', + index=models.Index(fields=['start_at', 'end_at'], name='twitch_time_start_a_c481f1_idx'), + ), + migrations.AddIndex( + model_name='timebaseddrop', + index=models.Index(fields=['campaign'], name='twitch_time_campaig_bbe349_idx'), + ), + migrations.AddIndex( + model_name='timebaseddrop', + index=models.Index(fields=['required_minutes_watched'], name='twitch_time_require_82c30c_idx'), + ), + migrations.AddIndex( + model_name='dropbenefitedge', + index=models.Index(fields=['drop', 'benefit'], name='twitch_drop_drop_id_5a574c_idx'), + ), migrations.AlterUniqueTogether( name='dropbenefitedge', unique_together={('drop', 'benefit')}, diff --git a/twitch/models.py b/twitch/models.py index a8c6e94..da07aec 100644 --- a/twitch/models.py +++ b/twitch/models.py @@ -10,8 +10,14 @@ class Game(models.Model): """Represents a game on Twitch.""" id = models.TextField(primary_key=True) - slug = models.TextField(blank=True, default="") - display_name = models.TextField() + slug = models.TextField(blank=True, default="", db_index=True) + display_name = models.TextField(db_index=True) + + class Meta: + indexes: ClassVar[list] = [ + models.Index(fields=["slug"]), + models.Index(fields=["display_name"]), + ] def __str__(self) -> str: """Return a string representation of the game.""" @@ -22,7 +28,12 @@ class Organization(models.Model): """Represents an organization on Twitch that can own drop campaigns.""" id = models.TextField(primary_key=True) - name = models.TextField() + name = models.TextField(db_index=True) + + class Meta: + indexes: ClassVar[list] = [ + models.Index(fields=["name"]), + ] def __str__(self) -> str: """Return a string representation of the organization.""" @@ -32,31 +43,32 @@ class Organization(models.Model): class DropCampaign(models.Model): """Represents a Twitch drop campaign.""" - STATUS_CHOICES: ClassVar[list[tuple[str, str]]] = [ - ("ACTIVE", "Active"), - ("UPCOMING", "Upcoming"), - ("EXPIRED", "Expired"), - ] - id = models.TextField(primary_key=True) - name = models.TextField() + name = models.TextField(db_index=True) description = models.TextField(blank=True) details_url = models.URLField(max_length=500, blank=True, default="") account_link_url = models.URLField(max_length=500, blank=True, default="") image_url = models.URLField(max_length=500, blank=True, default="") - start_at = models.DateTimeField() - end_at = models.DateTimeField() - status = models.TextField(choices=STATUS_CHOICES) + start_at = models.DateTimeField(db_index=True) + end_at = models.DateTimeField(db_index=True) is_account_connected = models.BooleanField(default=False) # Foreign keys - game = models.ForeignKey(Game, on_delete=models.CASCADE, related_name="drop_campaigns") - owner = models.ForeignKey(Organization, on_delete=models.CASCADE, related_name="drop_campaigns") + game = models.ForeignKey(Game, on_delete=models.CASCADE, related_name="drop_campaigns", db_index=True) + owner = models.ForeignKey(Organization, on_delete=models.CASCADE, related_name="drop_campaigns", db_index=True) # Tracking fields created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) + class Meta: + indexes: ClassVar[list] = [ + models.Index(fields=["name"]), + models.Index(fields=["start_at", "end_at"]), + models.Index(fields=["game"]), + models.Index(fields=["owner"]), + ] + def __str__(self) -> str: """Return a string representation of the drop campaign.""" return self.name @@ -65,7 +77,7 @@ class DropCampaign(models.Model): def is_active(self) -> bool: """Check if the campaign is currently active.""" now = timezone.now() - return self.start_at <= now <= self.end_at and self.status == "ACTIVE" + return self.start_at <= now <= self.end_at @property def clean_name(self) -> str: @@ -115,16 +127,25 @@ class DropBenefit(models.Model): ] id = models.TextField(primary_key=True) - name = models.TextField() + name = models.TextField(db_index=True) image_asset_url = models.URLField(max_length=500, blank=True, default="") - created_at = models.DateTimeField() + created_at = models.DateTimeField(db_index=True) entitlement_limit = models.PositiveIntegerField(default=1) is_ios_available = models.BooleanField(default=False) - distribution_type = models.TextField(choices=DISTRIBUTION_TYPES) + distribution_type = models.TextField(choices=DISTRIBUTION_TYPES, db_index=True) # Foreign keys - game = models.ForeignKey(Game, on_delete=models.CASCADE, related_name="drop_benefits") - owner_organization = models.ForeignKey(Organization, on_delete=models.CASCADE, related_name="drop_benefits") + game = models.ForeignKey(Game, on_delete=models.CASCADE, related_name="drop_benefits", db_index=True) + owner_organization = models.ForeignKey(Organization, on_delete=models.CASCADE, related_name="drop_benefits", db_index=True) + + class Meta: + indexes: ClassVar[list] = [ + models.Index(fields=["name"]), + models.Index(fields=["created_at"]), + models.Index(fields=["distribution_type"]), + models.Index(fields=["game"]), + models.Index(fields=["owner_organization"]), + ] def __str__(self) -> str: """Return a string representation of the drop benefit.""" @@ -135,16 +156,24 @@ class TimeBasedDrop(models.Model): """Represents a time-based drop in a drop campaign.""" id = models.TextField(primary_key=True) - name = models.TextField() - required_minutes_watched = models.PositiveIntegerField() + name = models.TextField(db_index=True) + required_minutes_watched = models.PositiveIntegerField(db_index=True) required_subs = models.PositiveIntegerField(default=0) - start_at = models.DateTimeField() - end_at = models.DateTimeField() + start_at = models.DateTimeField(db_index=True) + end_at = models.DateTimeField(db_index=True) # Foreign keys - campaign = models.ForeignKey(DropCampaign, on_delete=models.CASCADE, related_name="time_based_drops") + campaign = models.ForeignKey(DropCampaign, on_delete=models.CASCADE, related_name="time_based_drops", db_index=True) benefits = models.ManyToManyField(DropBenefit, through="DropBenefitEdge", related_name="drops") + class Meta: + indexes: ClassVar[list] = [ + models.Index(fields=["name"]), + models.Index(fields=["start_at", "end_at"]), + models.Index(fields=["campaign"]), + models.Index(fields=["required_minutes_watched"]), + ] + def __str__(self) -> str: """Return a string representation of the time-based drop.""" return self.name @@ -153,12 +182,15 @@ class TimeBasedDrop(models.Model): class DropBenefitEdge(models.Model): """Represents the relationship between a TimeBasedDrop and a DropBenefit.""" - drop = models.ForeignKey(TimeBasedDrop, on_delete=models.CASCADE) - benefit = models.ForeignKey(DropBenefit, on_delete=models.CASCADE) + drop = models.ForeignKey(TimeBasedDrop, on_delete=models.CASCADE, db_index=True) + benefit = models.ForeignKey(DropBenefit, on_delete=models.CASCADE, db_index=True) entitlement_limit = models.PositiveIntegerField(default=1) class Meta: unique_together = ("drop", "benefit") + indexes: ClassVar[list] = [ + models.Index(fields=["drop", "benefit"]), + ] def __str__(self) -> str: """Return a string representation of the drop benefit edge.""" diff --git a/twitch/tests.py b/twitch/tests.py deleted file mode 100644 index a39b155..0000000 --- a/twitch/tests.py +++ /dev/null @@ -1 +0,0 @@ -# Create your tests here. diff --git a/twitch/tests/__init__.py b/twitch/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/twitch/tests/test_pagination.py b/twitch/tests/test_pagination.py deleted file mode 100644 index 0bacaed..0000000 --- a/twitch/tests/test_pagination.py +++ /dev/null @@ -1,168 +0,0 @@ -"""Tests for pagination functionality in the campaign list view.""" - -from __future__ import annotations - -from typing import TYPE_CHECKING - -import pytest -from django.urls import reverse -from django.utils import timezone - -from twitch.models import DropCampaign, Game, Organization - -if TYPE_CHECKING: - from django.test import Client - - -@pytest.mark.django_db -class TestCampaignListPagination: - """Test cases for campaign list pagination.""" - - def test_default_pagination(self, client: Client) -> None: - """Test that pagination works with default settings.""" - # Create test data - enough to require pagination - game = Game.objects.create(id="test-game", slug="test-game", display_name="Test Game") - org = Organization.objects.create(id="test-org", name="Test Org") - - # Create 30 campaigns to test pagination - now = timezone.now() - campaigns = [] - for i in range(30): - campaign = DropCampaign.objects.create( - id=f"campaign-{i}", - name=f"Campaign {i}", - game=game, - owner=org, - start_at=now, - end_at=now + timezone.timedelta(days=1), - status="ACTIVE", - ) - campaigns.append(campaign) - - # Test first page - response = client.get(reverse("twitch:campaign_list")) - assert response.status_code == 200 - assert "page_obj" in response.context - assert response.context["page_obj"].number == 1 - assert len(response.context["campaigns"]) == 24 # Default paginate_by - assert response.context["page_obj"].paginator.count == 30 - - # Test second page - response = client.get(reverse("twitch:campaign_list") + "?page=2") - assert response.status_code == 200 - assert response.context["page_obj"].number == 2 - assert len(response.context["campaigns"]) == 6 # Remaining campaigns - - def test_custom_per_page(self, client: Client) -> None: - """Test that custom per_page parameter works.""" - # Create test data - game = Game.objects.create(id="test-game", slug="test-game", display_name="Test Game") - org = Organization.objects.create(id="test-org", name="Test Org") - - now = timezone.now() - for i in range(25): - DropCampaign.objects.create( - id=f"campaign-{i}", - name=f"Campaign {i}", - game=game, - owner=org, - start_at=now, - end_at=now + timezone.timedelta(days=1), - status="ACTIVE", - ) - - # Test with per_page=12 - response = client.get(reverse("twitch:campaign_list") + "?per_page=12") - assert response.status_code == 200 - assert len(response.context["campaigns"]) == 12 - assert response.context["selected_per_page"] == 12 - - # Test with per_page=48 - response = client.get(reverse("twitch:campaign_list") + "?per_page=48") - assert response.status_code == 200 - assert len(response.context["campaigns"]) == 25 # All campaigns fit on one page - - def test_invalid_per_page_fallback(self, client: Client) -> None: - """Test that invalid per_page values fall back to default.""" - # Create test data - game = Game.objects.create(id="test-game", slug="test-game", display_name="Test Game") - org = Organization.objects.create(id="test-org", name="Test Org") - - now = timezone.now() - for i in range(30): - DropCampaign.objects.create( - id=f"campaign-{i}", - name=f"Campaign {i}", - game=game, - owner=org, - start_at=now, - end_at=now + timezone.timedelta(days=1), - status="ACTIVE", - ) - - # Test with invalid per_page value - response = client.get(reverse("twitch:campaign_list") + "?per_page=999") - assert response.status_code == 200 - assert len(response.context["campaigns"]) == 24 # Falls back to default - assert response.context["selected_per_page"] == 24 - - # Test with non-numeric per_page value - response = client.get(reverse("twitch:campaign_list") + "?per_page=invalid") - assert response.status_code == 200 - assert len(response.context["campaigns"]) == 24 # Falls back to default - - def test_pagination_with_filters(self, client: Client) -> None: - """Test that pagination works correctly with filters.""" - # Create test data with different statuses - game = Game.objects.create(id="test-game", slug="test-game", display_name="Test Game") - org = Organization.objects.create(id="test-org", name="Test Org") - - now = timezone.now() - # Create 20 active campaigns - for i in range(20): - DropCampaign.objects.create( - id=f"active-{i}", - name=f"Active Campaign {i}", - game=game, - owner=org, - start_at=now, - end_at=now + timezone.timedelta(days=1), - status="ACTIVE", - ) - - # Create 10 expired campaigns - for i in range(10): - DropCampaign.objects.create( - id=f"expired-{i}", - name=f"Expired Campaign {i}", - game=game, - owner=org, - start_at=now - timezone.timedelta(days=2), - end_at=now - timezone.timedelta(days=1), - status="EXPIRED", - ) - - # Test filtering by active status with pagination - response = client.get(reverse("twitch:campaign_list") + "?status=ACTIVE&per_page=12") - assert response.status_code == 200 - assert len(response.context["campaigns"]) == 12 - assert response.context["page_obj"].paginator.count == 20 # Only active campaigns - assert all(c.status == "ACTIVE" for c in response.context["campaigns"]) - - # Test second page of active campaigns - response = client.get(reverse("twitch:campaign_list") + "?status=ACTIVE&per_page=12&page=2") - assert response.status_code == 200 - assert len(response.context["campaigns"]) == 8 # Remaining active campaigns - assert response.context["page_obj"].number == 2 - - def test_context_variables(self, client: Client) -> None: - """Test that all necessary context variables are present.""" - response = client.get(reverse("twitch:campaign_list")) - assert response.status_code == 200 - - # Check for pagination-related context - context = response.context - assert "per_page_options" in context - assert "selected_per_page" in context - assert context["per_page_options"] == [12, 24, 48, 96] - assert context["selected_per_page"] == 24 # Default value diff --git a/twitch/tests/test_views.py b/twitch/tests/test_views.py deleted file mode 100644 index ecdc5b6..0000000 --- a/twitch/tests/test_views.py +++ /dev/null @@ -1,83 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING - -import pytest -from django.urls import reverse -from django.utils import timezone - -from twitch.models import DropCampaign, Game, Organization - -if TYPE_CHECKING: - from django.test import Client - - -@pytest.mark.django_db -class TestGameDetailView: - """Test cases for GameDetailView.""" - - def test_expired_campaigns_filtering(self, client: Client) -> None: - """Test that expired campaigns are correctly filtered.""" - # Create test data - game = Game.objects.create( - id="123", - slug="test-game", - display_name="Test Game", - ) - - organization = Organization.objects.create( - id="456", - name="Test Organization", - ) - - now = timezone.now() - - # Create an active campaign - active_campaign = DropCampaign.objects.create( - id="active-campaign", - name="Active Campaign", - game=game, - owner=organization, - start_at=now - timezone.timedelta(days=1), - end_at=now + timezone.timedelta(days=1), - status="ACTIVE", - ) - - # Create an expired campaign (end date in the past) - expired_by_date = DropCampaign.objects.create( - id="expired-by-date", - name="Expired By Date", - game=game, - owner=organization, - start_at=now - timezone.timedelta(days=3), - end_at=now - timezone.timedelta(days=1), - status="ACTIVE", # Still marked as active but date is expired - ) - - # Create an expired campaign (status is EXPIRED) - expired_by_status = DropCampaign.objects.create( - id="expired-by-status", - name="Expired By Status", - game=game, - owner=organization, - start_at=now - timezone.timedelta(days=3), - end_at=now + timezone.timedelta(days=1), - status="EXPIRED", # Explicitly expired - ) - - # Get the view context - url = reverse("twitch:game_detail", kwargs={"pk": game.id}) - response = client.get(url) - - # Check that active_campaigns only contains the active campaign - active_campaigns = response.context["active_campaigns"] - assert len(active_campaigns) == 1 - assert active_campaigns[0].id == active_campaign.id - - # Check that expired_campaigns contains only the expired campaigns - expired_campaigns = response.context["expired_campaigns"] - assert len(expired_campaigns) == 2 - expired_campaign_ids = [c.id for c in expired_campaigns] - assert expired_by_date.id in expired_campaign_ids - assert expired_by_status.id in expired_campaign_ids - assert active_campaign.id not in expired_campaign_ids diff --git a/twitch/views.py b/twitch/views.py index 5ed16ae..ed2a1fb 100644 --- a/twitch/views.py +++ b/twitch/views.py @@ -26,7 +26,7 @@ class DropCampaignListView(ListView): context_object_name = "campaigns" paginate_by = 96 # Default pagination size - def get_paginate_by(self, queryset) -> int: + def get_paginate_by(self, queryset) -> int: # noqa: ANN001, ARG002 """Get the pagination size, allowing override via URL parameter. Args: @@ -50,13 +50,8 @@ class DropCampaignListView(ListView): QuerySet: Filtered drop campaigns. """ queryset: QuerySet[DropCampaign] = super().get_queryset() - status_filter: str | None = self.request.GET.get("status") game_filter: str | None = self.request.GET.get("game") - # Apply filters - if status_filter: - queryset = queryset.filter(status=status_filter) - if game_filter: queryset = queryset.filter(game__id=game_filter) @@ -78,8 +73,7 @@ class DropCampaignListView(ListView): context["games"] = Game.objects.all().order_by("display_name") # Add status options for filtering - context["status_options"] = [status[0] for status in DropCampaign.STATUS_CHOICES] - + context["status_options"] = [] # Add selected filters context["selected_status"] = self.request.GET.get("status", "") context["selected_game"] = self.request.GET.get("game", "") @@ -177,7 +171,6 @@ class GameListView(ListView): filter=Q( drop_campaigns__start_at__lte=now, drop_campaigns__end_at__gte=now, - drop_campaigns__status="ACTIVE", ), distinct=True, ), @@ -208,7 +201,7 @@ class GameListView(ListView): # This query gets all games with their campaign counts and organization info game_org_relations = DropCampaign.objects.values("game_id", "owner_id", "owner__name").annotate( campaign_count=Count("id", distinct=True), - active_count=Count("id", filter=Q(start_at__lte=now, end_at__gte=now, status="ACTIVE"), distinct=True), + active_count=Count("id", filter=Q(start_at__lte=now, end_at__gte=now), distinct=True), ) # Step 3: Get all games in a single query with their display names @@ -279,16 +272,14 @@ class GameDetailView(DetailView): all_campaigns = DropCampaign.objects.filter(game=game).select_related("owner").order_by("-end_at") # Filter the campaigns in Python instead of making multiple queries - active_campaigns = [ - campaign for campaign in all_campaigns if campaign.start_at <= now and campaign.end_at >= now and campaign.status == "ACTIVE" - ] + active_campaigns = [campaign for campaign in all_campaigns if campaign.start_at <= now and campaign.end_at >= now] active_campaigns.sort(key=lambda c: c.end_at) # Sort by end_at ascending - upcoming_campaigns = [campaign for campaign in all_campaigns if campaign.start_at > now and campaign.status == "UPCOMING"] + upcoming_campaigns = [campaign for campaign in all_campaigns if campaign.start_at > now] upcoming_campaigns.sort(key=lambda c: c.start_at) # Sort by start_at ascending # Filter for expired campaigns - expired_campaigns = [campaign for campaign in all_campaigns if campaign.end_at < now or campaign.status == "EXPIRED"] + expired_campaigns = [campaign for campaign in all_campaigns if campaign.end_at < now] context.update({ "active_campaigns": active_campaigns, @@ -312,7 +303,7 @@ def dashboard(request: HttpRequest) -> HttpResponse: # Get active campaigns with prefetching to reduce queries now = timezone.now() active_campaigns = ( - DropCampaign.objects.filter(start_at__lte=now, end_at__gte=now, status="ACTIVE") + DropCampaign.objects.filter(start_at__lte=now, end_at__gte=now) .select_related("game", "owner") # Prefetch the time-based drops with their benefits to avoid N+1 queries .prefetch_related(Prefetch("time_based_drops", queryset=TimeBasedDrop.objects.prefetch_related("benefits")))