Improve importer, add channels and import both drop_campaigns and drop_campaign

This commit is contained in:
Joakim Hellsén 2026-01-06 20:36:18 +01:00
commit adc6deb314
No known key found for this signature in database
8 changed files with 401 additions and 78 deletions

View file

@ -88,6 +88,7 @@ line-length = 120
"S106", "S106",
"PLR6301", "PLR6301",
"S105", "S105",
"SLF001",
] ]
"**/migrations/**" = ["RUF012"] "**/migrations/**" = ["RUF012"]

View file

@ -33,24 +33,24 @@ Hover over the end time to see the exact date and time.
alt="Box art for {{ game_data.name }}" alt="Box art for {{ game_data.name }}"
width="200" width="200"
height="267" height="267"
style="border-radius: 8px"> style="border-radius: 8px" />
</div> </div>
<div style="flex: 1; overflow-x: auto;"> <div style="flex: 1; overflow-x: auto;">
<div style="display: flex; gap: 1rem; min-width: max-content;"> <div style="display: flex; gap: 1rem; min-width: max-content;">
{% for campaign in game_data.campaigns %} {% for campaign in game_data.campaigns %}
<article id="campaign-article-{{ campaign.id }}" <article id="campaign-article-{{ campaign.twitch_id }}"
style="display: flex; style="display: flex;
flex-direction: column; flex-direction: column;
align-items: center; align-items: center;
padding: 0.5rem; padding: 0.5rem;
flex-shrink: 0"> flex-shrink: 0">
<div> <div>
<a href="{% url 'twitch:campaign_detail' campaign.id %}"> <a href="{% url 'twitch:campaign_detail' campaign.twitch_id %}">
<img src="{{ campaign.image_url }}" <img src="{{ campaign.image_url }}"
alt="Image for {{ campaign.name }}" alt="Image for {{ campaign.name }}"
width="120" width="120"
height="120" height="120"
style="border-radius: 4px"> style="border-radius: 4px" />
<h4 style="margin: 0.5rem 0; text-align: left;">{{ campaign.clean_name }}</h4> <h4 style="margin: 0.5rem 0; text-align: left;">{{ campaign.clean_name }}</h4>
</a> </a>
<time datetime="{{ campaign.end_at|date:'c' }}" <time datetime="{{ campaign.end_at|date:'c' }}"

View file

@ -26,8 +26,12 @@ from twitch.models import DropCampaign
from twitch.models import Game from twitch.models import Game
from twitch.models import Organization from twitch.models import Organization
from twitch.models import TimeBasedDrop from twitch.models import TimeBasedDrop
from twitch.schemas import ChannelInfoSchema
from twitch.schemas import CurrentUserSchema
from twitch.schemas import DropBenefitEdgeSchema from twitch.schemas import DropBenefitEdgeSchema
from twitch.schemas import DropBenefitSchema from twitch.schemas import DropBenefitSchema
from twitch.schemas import DropCampaignACLSchema
from twitch.schemas import DropCampaignSchema
from twitch.schemas import GameSchema from twitch.schemas import GameSchema
from twitch.schemas import GraphQLResponse from twitch.schemas import GraphQLResponse
from twitch.schemas import OrganizationSchema from twitch.schemas import OrganizationSchema
@ -148,20 +152,30 @@ def move_file_to_broken_subdir(
return broken_dir return broken_dir
def move_completed_file(file_path: Path, operation_name: str | None = None) -> Path: def move_completed_file(
file_path: Path,
operation_name: str | None = None,
campaign_structure: str | None = None,
) -> Path:
"""Move a successfully processed file into an operation-named directory. """Move a successfully processed file into an operation-named directory.
Moves to <imported_root>/<operation_name>/ Moves to <imported_root>/<operation_name>/ or
<imported_root>/<operation_name>/<campaign_structure>/ if campaign_structure is provided.
Args: Args:
file_path: Path to the processed JSON file. file_path: Path to the processed JSON file.
operation_name: GraphQL operationName extracted from the payload. operation_name: GraphQL operationName extracted from the payload.
campaign_structure: Optional campaign structure type (e.g., "user_drop_campaign").
Returns: Returns:
Path to the directory where the file was moved. Path to the directory where the file was moved.
""" """
safe_op: str = (operation_name or "unknown_op").replace(" ", "_").replace("/", "_").replace("\\", "_") safe_op: str = (operation_name or "unknown_op").replace(" ", "_").replace("/", "_").replace("\\", "_")
target_dir: Path = get_imported_directory_root() / safe_op target_dir: Path = get_imported_directory_root() / safe_op
if campaign_structure:
target_dir /= campaign_structure
target_dir.mkdir(parents=True, exist_ok=True) target_dir.mkdir(parents=True, exist_ok=True)
target_file: Path = target_dir / file_path.name target_file: Path = target_dir / file_path.name
@ -331,16 +345,16 @@ class Command(BaseCommand):
setattr(self, cache_attr, {}) setattr(self, cache_attr, {})
def _validate_campaigns( def _validate_responses(
self, self,
campaigns_found: list[dict[str, Any]], responses: list[dict[str, Any]],
file_path: Path, file_path: Path,
options: dict[str, Any], options: dict[str, Any],
) -> tuple[list[GraphQLResponse], Path | None]: ) -> tuple[list[GraphQLResponse], Path | None]:
"""Validate campaign data using Pydantic schema. """Validate GraphQL response data using Pydantic schema.
Args: Args:
campaigns_found: List of raw campaign dictionaries. responses: List of raw GraphQL response dictionaries.
file_path: Path to the file being processed. file_path: Path to the file being processed.
options: Command options. options: Command options.
@ -349,19 +363,18 @@ class Command(BaseCommand):
broken directory path when the file was moved during validation. broken directory path when the file was moved during validation.
Raises: Raises:
ValidationError: If campaign data fails Pydantic validation ValidationError: If response data fails Pydantic validation
and crash-on-error is enabled. and crash-on-error is enabled.
""" """
valid_campaigns: list[GraphQLResponse] = [] valid_responses: list[GraphQLResponse] = []
broken_dir: Path | None = None broken_dir: Path | None = None
if isinstance(campaigns_found, list): if isinstance(responses, list):
for campaign in campaigns_found: for response_data in responses:
if isinstance(campaign, dict): if isinstance(response_data, dict):
try: try:
response: GraphQLResponse = GraphQLResponse.model_validate(campaign) response: GraphQLResponse = GraphQLResponse.model_validate(response_data)
if response.data.current_user and response.data.current_user.drop_campaigns: valid_responses.append(response)
valid_campaigns.append(response)
except ValidationError as e: except ValidationError as e:
tqdm.write( tqdm.write(
@ -370,7 +383,7 @@ class Command(BaseCommand):
# Move invalid inputs out of the hot path so future runs can progress. # Move invalid inputs out of the hot path so future runs can progress.
if not options.get("skip_broken_moves"): if not options.get("skip_broken_moves"):
op_name: str | None = extract_operation_name_from_parsed(campaign) op_name: str | None = extract_operation_name_from_parsed(response_data)
broken_dir = move_failed_validation_file(file_path, operation_name=op_name) broken_dir = move_failed_validation_file(file_path, operation_name=op_name)
# Once the file has been moved, bail out so we don't try to move it again later. # Once the file has been moved, bail out so we don't try to move it again later.
@ -382,7 +395,7 @@ class Command(BaseCommand):
continue continue
return valid_campaigns, broken_dir return valid_responses, broken_dir
def _get_or_create_organization( def _get_or_create_organization(
self, self,
@ -456,6 +469,37 @@ class Command(BaseCommand):
self.game_cache[game_data.twitch_id] = game_obj self.game_cache[game_data.twitch_id] = game_obj
return game_obj return game_obj
def _get_or_create_channel(self, channel_info: ChannelInfoSchema) -> Channel:
"""Get or create a channel from cache or database.
Args:
channel_info: Channel info from Pydantic model.
Returns:
Channel instance.
"""
# Prefer cache hits to avoid hitting the DB on every campaign item.
if channel_info.twitch_id in self.channel_cache:
return self.channel_cache[channel_info.twitch_id]
# Use name as display_name fallback if displayName is None
display_name: str = channel_info.display_name or channel_info.name
channel_obj, created = Channel.objects.update_or_create(
twitch_id=channel_info.twitch_id,
defaults={
"name": channel_info.name,
"display_name": display_name,
},
)
if created:
tqdm.write(f"{Fore.GREEN}{Style.RESET_ALL} Created new channel: {display_name}")
# Cache the channel for future lookups.
self.channel_cache[channel_info.twitch_id] = channel_obj
return channel_obj
def _should_skip_campaign_update( def _should_skip_campaign_update(
self, self,
cached_obj: DropCampaign, cached_obj: DropCampaign,
@ -491,22 +535,21 @@ class Command(BaseCommand):
and cached_obj.end_at == defaults["end_at"] and cached_obj.end_at == defaults["end_at"]
and cached_obj.details_url == defaults["details_url"] and cached_obj.details_url == defaults["details_url"]
and cached_obj.account_link_url == defaults["account_link_url"] and cached_obj.account_link_url == defaults["account_link_url"]
and cached_game_id == game_id and cached_game_id == game_id,
and cached_obj.is_account_connected == defaults["is_account_connected"],
) )
def process_campaigns( def process_responses( # noqa: PLR0914
self, self,
campaigns_found: list[dict[str, Any]], responses: list[dict[str, Any]],
file_path: Path, file_path: Path,
options: dict[str, Any], options: dict[str, Any],
) -> tuple[bool, Path | None]: ) -> tuple[bool, Path | None]:
"""Process, validate, and import campaign data. """Process, validate, and import campaign data from GraphQL responses.
With dependency resolution and caching. With dependency resolution and caching.
Args: Args:
campaigns_found: List of raw campaign dictionaries to process. responses: List of raw GraphQL response dictionaries to process.
file_path: Path to the file being processed. file_path: Path to the file being processed.
options: Command options dictionary. options: Command options dictionary.
@ -517,8 +560,8 @@ class Command(BaseCommand):
Returns: Returns:
Tuple of (success flag, broken directory path if moved). Tuple of (success flag, broken directory path if moved).
""" """
valid_campaigns, broken_dir = self._validate_campaigns( valid_responses, broken_dir = self._validate_responses(
campaigns_found=campaigns_found, responses=responses,
file_path=file_path, file_path=file_path,
options=options, options=options,
) )
@ -527,18 +570,31 @@ class Command(BaseCommand):
# File already moved due to validation failure; signal caller to skip further handling. # File already moved due to validation failure; signal caller to skip further handling.
return False, broken_dir return False, broken_dir
for response in valid_campaigns: for response in valid_responses:
if not response.data.current_user: campaigns_to_process: list[DropCampaignSchema] = []
# Source 1: User or CurrentUser field (handles plural, singular, inventory)
user_obj: CurrentUserSchema | None = response.data.current_user or response.data.user
if user_obj and user_obj.drop_campaigns:
campaigns_to_process.extend(user_obj.drop_campaigns)
# Source 2: Channel field (viewer drop campaigns)
channel_obj = response.data.channel
if channel_obj and channel_obj.viewer_drop_campaigns:
if isinstance(channel_obj.viewer_drop_campaigns, list):
campaigns_to_process.extend(channel_obj.viewer_drop_campaigns)
else:
campaigns_to_process.append(channel_obj.viewer_drop_campaigns)
if not campaigns_to_process:
continue continue
if not response.data.current_user.drop_campaigns: for drop_campaign in campaigns_to_process:
continue
for drop_campaign in response.data.current_user.drop_campaigns:
# Handle campaigns without owner (e.g., from Inventory operation) # Handle campaigns without owner (e.g., from Inventory operation)
if drop_campaign.owner: owner_data: OrganizationSchema | None = getattr(drop_campaign, "owner", None)
if owner_data:
org_obj: Organization = self._get_or_create_organization( org_obj: Organization = self._get_or_create_organization(
org_data=drop_campaign.owner, org_data=owner_data,
) )
else: else:
# Create a default organization for campaigns without owner # Create a default organization for campaigns without owner
@ -566,13 +622,12 @@ class Command(BaseCommand):
defaults: dict[str, str | datetime | Game | bool] = { defaults: dict[str, str | datetime | Game | bool] = {
"name": drop_campaign.name, "name": drop_campaign.name,
"description": drop_campaign.description, "description": drop_campaign.description,
"image_url": getattr(drop_campaign, "image_url", ""), "image_url": drop_campaign.image_url,
"game": game_obj, "game": game_obj,
"start_at": start_at_dt, "start_at": start_at_dt,
"end_at": end_at_dt, "end_at": end_at_dt,
"details_url": drop_campaign.details_url, "details_url": drop_campaign.details_url,
"account_link_url": drop_campaign.account_link_url, "account_link_url": drop_campaign.account_link_url,
"is_account_connected": (drop_campaign.self.is_account_connected),
} }
if drop_campaign.twitch_id in self.drop_campaign_cache: if drop_campaign.twitch_id in self.drop_campaign_cache:
@ -608,6 +663,13 @@ class Command(BaseCommand):
campaign_obj=campaign_obj, campaign_obj=campaign_obj,
) )
# Process allowed channels from the campaign's ACL
if drop_campaign.allow and drop_campaign.allow.channels:
self._process_allowed_channels(
campaign_obj=campaign_obj,
allow_schema=drop_campaign.allow,
)
return True, None return True, None
def _process_time_based_drops( def _process_time_based_drops(
@ -714,6 +776,35 @@ class Command(BaseCommand):
if created: if created:
tqdm.write(f"{Fore.GREEN}{Style.RESET_ALL} Linked benefit: {benefit_schema.name}{drop_obj.name}") tqdm.write(f"{Fore.GREEN}{Style.RESET_ALL} Linked benefit: {benefit_schema.name}{drop_obj.name}")
def _process_allowed_channels(
self,
campaign_obj: DropCampaign,
allow_schema: DropCampaignACLSchema,
) -> None:
"""Process allowed channels for a drop campaign.
Updates the campaign's allow_is_enabled flag and M2M relationship
with allowed channels from the ACL schema.
Args:
campaign_obj: The DropCampaign database object.
allow_schema: The DropCampaignACL Pydantic schema.
"""
# Update the allow_is_enabled flag if changed
if campaign_obj.allow_is_enabled != allow_schema.is_enabled:
campaign_obj.allow_is_enabled = allow_schema.is_enabled
campaign_obj.save(update_fields=["allow_is_enabled"])
# Get or create all channels and collect them
channel_objects: list[Channel] = []
if allow_schema.channels:
for channel_schema in allow_schema.channels:
channel_obj: Channel = self._get_or_create_channel(channel_info=channel_schema)
channel_objects.append(channel_obj)
# Update the M2M relationship with the allowed channels
campaign_obj.allow_channels.set(channel_objects)
def handle(self, *args, **options) -> None: # noqa: ARG002 def handle(self, *args, **options) -> None: # noqa: ARG002
"""Main entry point for the command. """Main entry point for the command.
@ -826,6 +917,63 @@ class Command(BaseCommand):
tqdm.write(f"Total: {len(json_files)}") tqdm.write(f"Total: {len(json_files)}")
tqdm.write("=" * 50) tqdm.write("=" * 50)
def _detect_campaign_structure(self, response: dict[str, Any]) -> str | None:
"""Detect which campaign structure is present in the response.
Used for organizing/categorizing files by their response type.
Supported structures:
- "user_drop_campaign": {"data": {"user": {"dropCampaign": {...}}}}
- "current_user_drop_campaigns": {"data": {"currentUser": {"dropCampaigns": [...]}}}
- "inventory_campaigns": {"data": {"currentUser": {"inventory": {"dropCampaignsInProgress": [...]}}}}
- "channel_viewer_campaigns": {"data": {"channel": {"viewerDropCampaigns": [...] or {...}}}}
Args:
response: The parsed JSON response from Twitch API.
Returns:
String identifier of the structure type, or None if no campaign structure found.
"""
if not isinstance(response, dict) or "data" not in response:
return None
data: dict[str, Any] = response["data"]
# Check structures in order of specificity
# Structure: {"data": {"user": {"dropCampaign": {...}}}}
if (
"user" in data
and isinstance(data["user"], dict)
and "dropCampaign" in data["user"]
and data["user"]["dropCampaign"]
):
return "user_drop_campaign"
# Structure: {"data": {"currentUser": {...}}}
if "currentUser" in data and isinstance(data["currentUser"], dict):
current_user: dict[str, Any] = data["currentUser"]
# Structure: {"data": {"currentUser": {"inventory": {"dropCampaignsInProgress": [...]}}}}
if (
"inventory" in current_user
and isinstance(current_user["inventory"], dict)
and "dropCampaignsInProgress" in current_user["inventory"]
and current_user["inventory"]["dropCampaignsInProgress"]
):
return "inventory_campaigns"
# Structure: {"data": {"currentUser": {"dropCampaigns": [...]}}}
if "dropCampaigns" in current_user and isinstance(current_user["dropCampaigns"], list):
return "current_user_drop_campaigns"
# Structure: {"data": {"channel": {"viewerDropCampaigns": [...] or {...}}}}
if "channel" in data and isinstance(data["channel"], dict):
channel: dict[str, Any] = data["channel"]
if channel.get("viewerDropCampaigns"):
return "channel_viewer_campaigns"
return None
def collect_json_files( def collect_json_files(
self, self,
options: dict, options: dict,
@ -886,16 +1034,18 @@ class Command(BaseCommand):
return {"success": False, "broken_dir": "(skipped)", "reason": f"matched '{matched}'"} return {"success": False, "broken_dir": "(skipped)", "reason": f"matched '{matched}'"}
if "dropCampaign" not in raw_text: if "dropCampaign" not in raw_text:
if not options.get("skip_broken_moves"): if not options.get("skip_broken_moves"):
broken_dir = move_file_to_broken_subdir( broken_dir: Path | None = move_file_to_broken_subdir(
file_path, file_path,
"no_dropCampaign", "no_dropCampaign",
operation_name=operation_name, operation_name=operation_name,
) )
return {"success": False, "broken_dir": str(broken_dir), "reason": "no dropCampaign present"} return {"success": False, "broken_dir": str(broken_dir), "reason": "no dropCampaign present"}
return {"success": False, "broken_dir": "(skipped)", "reason": "no dropCampaign present"} return {"success": False, "broken_dir": "(skipped)", "reason": "no dropCampaign present"}
campaigns_found: list[dict[str, Any]] = [parsed_json]
processed, broken_dir = self.process_campaigns( # Wrap single response in list for consistent processing
campaigns_found=campaigns_found, responses: list[dict[str, Any]] = parsed_json if isinstance(parsed_json, list) else [parsed_json]
processed, broken_dir = self.process_responses(
responses=responses,
file_path=file_path, file_path=file_path,
options=options, options=options,
) )
@ -908,7 +1058,14 @@ class Command(BaseCommand):
"reason": "validation failed", "reason": "validation failed",
} }
move_completed_file(file_path=file_path, operation_name=operation_name) campaign_structure: str | None = self._detect_campaign_structure(
parsed_json if isinstance(parsed_json, dict) else (parsed_json[0] if parsed_json else {}),
)
move_completed_file(
file_path=file_path,
operation_name=operation_name,
campaign_structure=campaign_structure,
)
except (ValidationError, json.JSONDecodeError): except (ValidationError, json.JSONDecodeError):
if options["crash_on_error"]: if options["crash_on_error"]:
@ -989,10 +1146,11 @@ class Command(BaseCommand):
) )
return return
campaigns_found: list[dict[str, Any]] = [parsed_json] # Wrap single response in list for consistent processing
responses: list[dict[str, Any]] = parsed_json if isinstance(parsed_json, list) else [parsed_json]
processed, broken_dir = self.process_campaigns( processed, broken_dir = self.process_responses(
campaigns_found=campaigns_found, responses=responses,
file_path=file_path, file_path=file_path,
options=options, options=options,
) )
@ -1005,7 +1163,14 @@ class Command(BaseCommand):
) )
return return
move_completed_file(file_path=file_path, operation_name=operation_name) campaign_structure: str | None = self._detect_campaign_structure(
parsed_json if isinstance(parsed_json, dict) else (parsed_json[0] if parsed_json else {}),
)
move_completed_file(
file_path=file_path,
operation_name=operation_name,
campaign_structure=campaign_structure,
)
progress_bar.update(1) progress_bar.update(1)
progress_bar.write(f"{Fore.GREEN}{Style.RESET_ALL} {file_path.name}") progress_bar.write(f"{Fore.GREEN}{Style.RESET_ALL} {file_path.name}")

View file

@ -0,0 +1,23 @@
# Generated by Django 6.0 on 2026-01-05 22:29
from __future__ import annotations
from django.db import migrations
class Migration(migrations.Migration):
"""Remove is_account_connected field and its index from DropCampaign."""
dependencies = [
("twitch", "0002_alter_game_box_art"),
]
operations = [
migrations.RemoveIndex(
model_name="dropcampaign",
name="twitch_drop_is_acco_7e9078_idx",
),
migrations.RemoveField(
model_name="dropcampaign",
name="is_account_connected",
),
]

View file

@ -328,10 +328,6 @@ class DropCampaign(models.Model):
blank=True, blank=True,
help_text="Datetime when the campaign ends.", help_text="Datetime when the campaign ends.",
) )
is_account_connected = models.BooleanField(
default=False,
help_text="Indicates if the user account is linked.",
)
allow_is_enabled = models.BooleanField( allow_is_enabled = models.BooleanField(
default=True, default=True,
help_text="Whether the campaign allows participation.", help_text="Whether the campaign allows participation.",
@ -375,7 +371,6 @@ class DropCampaign(models.Model):
models.Index(fields=["twitch_id"]), models.Index(fields=["twitch_id"]),
models.Index(fields=["name"]), models.Index(fields=["name"]),
models.Index(fields=["description"]), models.Index(fields=["description"]),
models.Index(fields=["is_account_connected"]),
models.Index(fields=["allow_is_enabled"]), models.Index(fields=["allow_is_enabled"]),
models.Index(fields=["operation_name"]), models.Index(fields=["operation_name"]),
models.Index(fields=["added_at"]), models.Index(fields=["added_at"]),

View file

@ -62,7 +62,7 @@ class GameSchema(BaseModel):
return data return data
class DropCampaignSelfEdge(BaseModel): class DropCampaignSelfEdgeSchema(BaseModel):
"""Schema for the 'self' edge on DropCampaign objects.""" """Schema for the 'self' edge on DropCampaign objects."""
is_account_connected: bool = Field(alias="isAccountConnected") is_account_connected: bool = Field(alias="isAccountConnected")
@ -76,6 +76,43 @@ class DropCampaignSelfEdge(BaseModel):
} }
class ChannelInfoSchema(BaseModel):
"""Schema for allowed channel info in DropCampaignACL.
Represents individual channels that are allowed to participate in a campaign.
"""
twitch_id: str = Field(alias="id")
display_name: str | None = Field(default=None, alias="displayName")
name: str # Channel login name
type_name: Literal["Channel"] = Field(alias="__typename")
model_config = {
"extra": "forbid",
"validate_assignment": True,
"strict": True,
"populate_by_name": True,
}
class DropCampaignACLSchema(BaseModel):
"""Schema for DropCampaign ACL (Access Control List).
Defines which channels are allowed to participate in a campaign.
"""
channels: list[ChannelInfoSchema] | None = None
is_enabled: bool = Field(alias="isEnabled")
type_name: Literal["DropCampaignACL"] = Field(alias="__typename")
model_config = {
"extra": "forbid",
"validate_assignment": True,
"strict": True,
"populate_by_name": True,
}
class DropBenefitSchema(BaseModel): class DropBenefitSchema(BaseModel):
"""Schema for a benefit in a DropBenefitEdge. """Schema for a benefit in a DropBenefitEdge.
@ -149,28 +186,29 @@ class TimeBasedDropSchema(BaseModel):
} }
class DropCampaign(BaseModel): class DropCampaignSchema(BaseModel):
"""Schema for Twitch DropCampaign objects. """Schema for Twitch DropCampaign objects.
Handles both ViewerDropsDashboard and Inventory operation formats. Handles both ViewerDropsDashboard and Inventory operation formats.
""" """
twitch_id: str = Field(alias="id")
name: str
owner: OrganizationSchema | None = None
game: GameSchema
status: Literal["ACTIVE", "EXPIRED", "UPCOMING"]
start_at: str = Field(alias="startAt")
end_at: str = Field(alias="endAt")
details_url: str = Field(alias="detailsURL")
account_link_url: str = Field(alias="accountLinkURL") account_link_url: str = Field(alias="accountLinkURL")
description: str = Field(default="", alias="description") description: str = Field(default="", alias="description")
self: DropCampaignSelfEdge details_url: str = Field(alias="detailsURL")
end_at: str = Field(alias="endAt")
game: GameSchema
image_url: str = Field(default="", alias="imageURL")
name: str
owner: OrganizationSchema | None = None
self: DropCampaignSelfEdgeSchema
start_at: str = Field(alias="startAt")
status: Literal["ACTIVE", "EXPIRED", "UPCOMING"]
time_based_drops: list[TimeBasedDropSchema] = Field(default=[], alias="timeBasedDrops") time_based_drops: list[TimeBasedDropSchema] = Field(default=[], alias="timeBasedDrops")
twitch_id: str = Field(alias="id")
type_name: Literal["DropCampaign"] = Field(alias="__typename") type_name: Literal["DropCampaign"] = Field(alias="__typename")
# Campaign access control list - defines which channels can participate
allow: DropCampaignACLSchema | None = None
# Inventory-specific fields # Inventory-specific fields
image_url: str | None = Field(default=None, alias="imageURL")
allow: dict | None = None
event_based_drops: list | None = Field(default=None, alias="eventBasedDrops") event_based_drops: list | None = Field(default=None, alias="eventBasedDrops")
model_config = { model_config = {
@ -184,7 +222,7 @@ class DropCampaign(BaseModel):
class InventorySchema(BaseModel): class InventorySchema(BaseModel):
"""Schema for the inventory field in Inventory operation responses.""" """Schema for the inventory field in Inventory operation responses."""
drop_campaigns_in_progress: list[DropCampaign] = Field(alias="dropCampaignsInProgress") drop_campaigns_in_progress: list[DropCampaignSchema] = Field(alias="dropCampaignsInProgress")
type_name: Literal["Inventory"] = Field(alias="__typename") type_name: Literal["Inventory"] = Field(alias="__typename")
# gameEventDrops field is present in Inventory but we don't process it yet # gameEventDrops field is present in Inventory but we don't process it yet
game_event_drops: list | None = Field(default=None, alias="gameEventDrops") game_event_drops: list | None = Field(default=None, alias="gameEventDrops")
@ -197,7 +235,7 @@ class InventorySchema(BaseModel):
} }
class CurrentUser(BaseModel): class CurrentUserSchema(BaseModel):
"""Schema for Twitch User objects. """Schema for Twitch User objects.
Handles both ViewerDropsDashboard and Inventory operation formats. Handles both ViewerDropsDashboard and Inventory operation formats.
@ -206,8 +244,8 @@ class CurrentUser(BaseModel):
twitch_id: str = Field(alias="id") twitch_id: str = Field(alias="id")
login: str | None = None login: str | None = None
drop_campaigns: list[DropCampaign] | None = Field(default=None, alias="dropCampaigns") drop_campaigns: list[DropCampaignSchema] | None = Field(default=None, alias="dropCampaigns")
drop_campaign: DropCampaign | None = Field(default=None, alias="dropCampaign") drop_campaign: DropCampaignSchema | None = Field(default=None, alias="dropCampaign")
inventory: InventorySchema | None = None inventory: InventorySchema | None = None
type_name: Literal["User"] = Field(alias="__typename") type_name: Literal["User"] = Field(alias="__typename")
@ -219,7 +257,7 @@ class CurrentUser(BaseModel):
} }
@model_validator(mode="after") @model_validator(mode="after")
def normalize_from_inventory(self) -> CurrentUser: def normalize_from_inventory(self) -> CurrentUserSchema:
"""Normalize drop_campaigns from inventory or singular dropCampaign. """Normalize drop_campaigns from inventory or singular dropCampaign.
Handles three sources in order of priority: Handles three sources in order of priority:
@ -239,14 +277,57 @@ class CurrentUser(BaseModel):
return self return self
class Data(BaseModel): class ChannelSchema(BaseModel):
"""Schema for the data field in Twitch API responses. """Schema for Twitch Channel objects with viewer drop campaigns.
Handles both currentUser (standard) and user (legacy) field names. Handles the channel.viewerDropCampaigns structure from viewer-focused API responses.
""" """
current_user: CurrentUser | None = Field(default=None, alias="currentUser") twitch_id: str = Field(alias="id")
user: CurrentUser | None = Field(default=None, alias="user") login: str | None = None
viewer_drop_campaigns: list[DropCampaignSchema] | DropCampaignSchema | None = Field(
default=None,
alias="viewerDropCampaigns",
)
type_name: Literal["Channel"] = Field(alias="__typename")
model_config = {
"extra": "forbid",
"validate_assignment": True,
"strict": True,
"populate_by_name": True,
}
@field_validator("viewer_drop_campaigns", mode="before")
@classmethod
def normalize_viewer_campaigns(cls, v: list | dict | None) -> list[dict] | None:
"""Normalize viewer_drop_campaigns to a list.
Handles both list and dict formats from API responses.
Args:
v: The raw viewer_drop_campaigns value (list, dict, or None).
Returns:
Normalized list of campaign dicts, or None if empty.
"""
if isinstance(v, dict):
return [v]
if isinstance(v, list):
return v or None
return None
class DataSchema(BaseModel):
"""Schema for the data field in Twitch API responses.
Handles both currentUser (standard) and user (legacy) field names,
as well as channel-based campaign structures.
"""
current_user: CurrentUserSchema | None = Field(default=None, alias="currentUser")
user: CurrentUserSchema | None = Field(default=None, alias="user")
channel: ChannelSchema | None = Field(default=None, alias="channel")
model_config = { model_config = {
"extra": "forbid", "extra": "forbid",
@ -271,7 +352,7 @@ class Data(BaseModel):
return v return v
@model_validator(mode="after") @model_validator(mode="after")
def normalize_user_field(self) -> Data: def normalize_user_field(self) -> DataSchema:
"""Normalize user field to current_user if needed. """Normalize user field to current_user if needed.
If current_user is None but user exists, use user as current_user. If current_user is None but user exists, use user as current_user.
@ -315,7 +396,7 @@ class GraphQLError(BaseModel):
class GraphQLResponse(BaseModel): class GraphQLResponse(BaseModel):
"""Schema for the complete GraphQL response from Twitch API.""" """Schema for the complete GraphQL response from Twitch API."""
data: Data data: DataSchema
extensions: Extensions | None = None extensions: Extensions | None = None
errors: list[GraphQLError] | None = None errors: list[GraphQLError] | None = None

View file

@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from django.test import TestCase from django.test import TestCase
@ -30,7 +31,64 @@ class GetOrUpdateBenefitTests(TestCase):
}, },
) )
benefit: DropBenefit = command._get_or_update_benefit(benefit_schema) # noqa: SLF001 benefit: DropBenefit = command._get_or_update_benefit(benefit_schema)
benefit.refresh_from_db() benefit.refresh_from_db()
assert not benefit.distribution_type assert not benefit.distribution_type
class ExtractCampaignsTests(TestCase):
"""Tests for response validation and campaign extraction."""
def test_validates_top_level_response_with_nested_campaign(self) -> None:
"""Ensure validation handles full responses correctly."""
command = Command()
command.pre_fill_cache()
payload = {
"data": {
"user": {
"id": "123",
"dropCampaign": {
"id": "c1",
"name": "Test Campaign",
"description": "",
"startAt": "2025-01-01T00:00:00Z",
"endAt": "2025-01-02T00:00:00Z",
"accountLinkURL": "http://example.com",
"detailsURL": "http://example.com",
"imageURL": "",
"status": "ACTIVE",
"self": {"isAccountConnected": False, "__typename": "DropCampaignSelfEdge"},
"game": {
"id": "g1",
"displayName": "Test Game",
"boxArtURL": "http://example.com/art.png",
"__typename": "Game",
},
"owner": {
"id": "o1",
"name": "Test Org",
"__typename": "Organization",
},
"timeBasedDrops": [],
"__typename": "DropCampaign",
},
"__typename": "User",
},
},
"extensions": {
"operationName": "TestOp",
},
}
# Validate response
valid_responses, broken_dir = command._validate_responses(
responses=[payload],
file_path=Path("test.json"),
options={},
)
assert len(valid_responses) == 1
assert broken_dir is None
assert valid_responses[0].data.user is not None

View file

@ -295,6 +295,7 @@ def drop_campaign_detail_view(request: HttpRequest, twitch_id: str) -> HttpRespo
"json", "json",
[campaign], [campaign],
fields=( fields=(
"twitch_id",
"name", "name",
"description", "description",
"details_url", "details_url",
@ -302,7 +303,6 @@ def drop_campaign_detail_view(request: HttpRequest, twitch_id: str) -> HttpRespo
"image_url", "image_url",
"start_at", "start_at",
"end_at", "end_at",
"is_account_connected",
"game", "game",
"created_at", "created_at",
"updated_at", "updated_at",
@ -555,6 +555,7 @@ class GameDetailView(DetailView):
"json", "json",
all_campaigns, all_campaigns,
fields=( fields=(
"twitch_id",
"name", "name",
"description", "description",
"details_url", "details_url",
@ -562,7 +563,6 @@ class GameDetailView(DetailView):
"image_url", "image_url",
"start_at", "start_at",
"end_at", "end_at",
"is_account_connected",
), ),
) )
campaigns_data: list[dict[str, Any]] = json.loads( campaigns_data: list[dict[str, Any]] = json.loads(
@ -933,6 +933,7 @@ class ChannelDetailView(DetailView):
"json", "json",
all_campaigns, all_campaigns,
fields=( fields=(
"twitch_id",
"name", "name",
"description", "description",
"details_url", "details_url",
@ -940,7 +941,6 @@ class ChannelDetailView(DetailView):
"image_url", "image_url",
"start_at", "start_at",
"end_at", "end_at",
"is_account_connected",
), ),
) )
campaigns_data = json.loads(serialized_campaigns) campaigns_data = json.loads(serialized_campaigns)