Ignore more linting stuff

This commit is contained in:
Joakim Hellsén 2025-08-12 20:11:49 +02:00
commit 0de04dc8e7
3 changed files with 52 additions and 60 deletions

View file

@ -39,6 +39,7 @@ lint.isort.required-imports = ["from __future__ import annotations"]
lint.ignore = [ lint.ignore = [
"ANN002", # Checks that function *args arguments have type annotations. "ANN002", # Checks that function *args arguments have type annotations.
"ANN003", # Checks that function **kwargs arguments have type annotations. "ANN003", # Checks that function **kwargs arguments have type annotations.
"C901", # Checks for functions with a high McCabe complexity.
"CPY001", # Checks for the absence of copyright notices within Python files. "CPY001", # Checks for the absence of copyright notices within Python files.
"D100", # Checks for undocumented public module definitions. "D100", # Checks for undocumented public module definitions.
"D104", # Checks for undocumented public package definitions. "D104", # Checks for undocumented public package definitions.
@ -46,6 +47,8 @@ lint.ignore = [
"D106", # Checks for undocumented public class definitions, for nested classes. "D106", # Checks for undocumented public class definitions, for nested classes.
"ERA001", # Checks for commented-out Python code. "ERA001", # Checks for commented-out Python code.
"FIX002", # Checks for "TODO" comments. "FIX002", # Checks for "TODO" comments.
"PLR0911", # Checks for functions or methods with too many return statements.
"PLR0912", # Checks for functions or methods with too many branches, including (nested) if, elif, and else branches, for loops, try-except clauses, and match and case statements.
"PLR6301", # Checks for the presence of unused self parameter in methods definitions. "PLR6301", # Checks for the presence of unused self parameter in methods definitions.
"RUF012", # Checks for mutable default values in class attributes. "RUF012", # Checks for mutable default values in class attributes.
@ -69,7 +72,7 @@ lint.ignore = [
preview = true preview = true
unsafe-fixes = true unsafe-fixes = true
fix = true fix = true
line-length = 140 line-length = 160
[tool.ruff.lint.per-file-ignores] [tool.ruff.lint.per-file-ignores]
"**/tests/**" = [ "**/tests/**" = [

View file

@ -35,8 +35,17 @@ class Command(BaseCommand):
Args: Args:
parser: The command argument parser. parser: The command argument parser.
""" """
parser.add_argument("path", type=str, help="Path to the JSON file or directory containing JSON files.") parser.add_argument(
parser.add_argument("--processed-dir", type=str, default="processed", help="Subdirectory to move processed files to") "path",
type=str,
help="Path to the JSON file or directory containing JSON files.",
)
parser.add_argument(
"--processed-dir",
type=str,
default="processed",
help="Subdirectory to move processed files to",
)
def handle(self, **options) -> None: def handle(self, **options) -> None:
"""Execute the command. """Execute the command.
@ -98,9 +107,7 @@ class Command(BaseCommand):
# Still invalid after cleanup, move to broken_json # Still invalid after cleanup, move to broken_json
broken_json_dir: Path = processed_path / "broken_json" broken_json_dir: Path = processed_path / "broken_json"
broken_json_dir.mkdir(parents=True, exist_ok=True) broken_json_dir.mkdir(parents=True, exist_ok=True)
self.stdout.write( self.stdout.write(self.style.WARNING(f"Invalid JSON in '{json_file}', even after cleanup. Moving to '{broken_json_dir}'."))
self.style.WARNING(f"Invalid JSON in '{json_file}', even after cleanup. Moving to '{broken_json_dir}'.")
)
self.move_file(json_file, broken_json_dir / json_file.name) self.move_file(json_file, broken_json_dir / json_file.name)
except (ValueError, TypeError, AttributeError, KeyError, IndexError): except (ValueError, TypeError, AttributeError, KeyError, IndexError):
self.stdout.write(self.style.ERROR(f"Data error processing {json_file}")) self.stdout.write(self.style.ERROR(f"Data error processing {json_file}"))
@ -109,7 +116,7 @@ class Command(BaseCommand):
msg: str = f"Processed {total_files} JSON files in {directory}. Moved processed files to {processed_path}." msg: str = f"Processed {total_files} JSON files in {directory}. Moved processed files to {processed_path}."
self.stdout.write(self.style.SUCCESS(msg)) self.stdout.write(self.style.SUCCESS(msg))
def _process_file(self, file_path: Path, processed_path: Path) -> None: # noqa: C901, PLR0911, PLR0912 def _process_file(self, file_path: Path, processed_path: Path) -> None:
"""Process a single JSON file. """Process a single JSON file.
Args: Args:
@ -191,10 +198,7 @@ class Command(BaseCommand):
return return
# If file has "__typename": "DropCurrentSession" move it to the "drop_current_session" directory so we can process it separately. # If file has "__typename": "DropCurrentSession" move it to the "drop_current_session" directory so we can process it separately.
if ( if isinstance(data, dict) and data.get("data", {}).get("currentUser", {}).get("dropCurrentSession", {}).get("__typename") == "DropCurrentSession":
isinstance(data, dict)
and data.get("data", {}).get("currentUser", {}).get("dropCurrentSession", {}).get("__typename") == "DropCurrentSession"
):
drop_current_session_dir: Path = processed_path / "drop_current_session" drop_current_session_dir: Path = processed_path / "drop_current_session"
drop_current_session_dir.mkdir(parents=True, exist_ok=True) drop_current_session_dir.mkdir(parents=True, exist_ok=True)
self.move_file(file_path, drop_current_session_dir / file_path.name) self.move_file(file_path, drop_current_session_dir / file_path.name)
@ -226,7 +230,10 @@ class Command(BaseCommand):
shutil.move(str(file_path), str(processed_path)) shutil.move(str(file_path), str(processed_path))
except FileExistsError: except FileExistsError:
# Rename the file if contents is different than the existing one # Rename the file if contents is different than the existing one
with file_path.open("rb") as f1, (processed_path / file_path.name).open("rb") as f2: with (
file_path.open("rb") as f1,
(processed_path / file_path.name).open("rb") as f2,
):
if f1.read() != f2.read(): if f1.read() != f2.read():
new_name: Path = processed_path / f"{file_path.stem}_duplicate{file_path.suffix}" new_name: Path = processed_path / f"{file_path.stem}_duplicate{file_path.suffix}"
shutil.move(str(file_path), str(new_name)) shutil.move(str(file_path), str(new_name))
@ -348,7 +355,7 @@ class Command(BaseCommand):
) )
self.stdout.write(self.style.SUCCESS(f"Successfully imported drop campaign {drop_campaign.name} (ID: {drop_campaign.id})")) self.stdout.write(self.style.SUCCESS(f"Successfully imported drop campaign {drop_campaign.name} (ID: {drop_campaign.id})"))
def create_time_based_drop(self, drop_campaign: DropCampaign, drop_data: dict[str, Any]) -> TimeBasedDrop: # noqa: C901 def create_time_based_drop(self, drop_campaign: DropCampaign, drop_data: dict[str, Any]) -> TimeBasedDrop:
"""Creates or updates a TimeBasedDrop instance based on the provided drop data. """Creates or updates a TimeBasedDrop instance based on the provided drop data.
Args: Args:
@ -409,13 +416,16 @@ class Command(BaseCommand):
time_based_drop, created = TimeBasedDrop.objects.update_or_create(id=drop_data["id"], defaults=defaults) time_based_drop, created = TimeBasedDrop.objects.update_or_create(id=drop_data["id"], defaults=defaults)
if created: if created:
self.stdout.write( self.stdout.write(self.style.SUCCESS(f"Successfully imported time-based drop {time_based_drop.name} (ID: {time_based_drop.id})"))
self.style.SUCCESS(f"Successfully imported time-based drop {time_based_drop.name} (ID: {time_based_drop.id})")
)
return time_based_drop return time_based_drop
def drop_campaign_update_or_get(self, campaign_data: dict[str, Any], game: Game, organization: Organization | None) -> DropCampaign: # noqa: C901 def drop_campaign_update_or_get(
self,
campaign_data: dict[str, Any],
game: Game,
organization: Organization | None,
) -> DropCampaign:
"""Update or create a drop campaign. """Update or create a drop campaign.
Args: Args:

View file

@ -15,14 +15,7 @@ from django.shortcuts import get_object_or_404, redirect, render
from django.utils import timezone from django.utils import timezone
from django.views.generic import DetailView, ListView from django.views.generic import DetailView, ListView
from twitch.models import ( from twitch.models import DropBenefit, DropCampaign, Game, NotificationSubscription, Organization, TimeBasedDrop
DropBenefit,
DropCampaign,
Game,
NotificationSubscription,
Organization,
TimeBasedDrop,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from django.db.models import QuerySet from django.db.models import QuerySet
@ -155,10 +148,7 @@ class DropCampaignDetailView(DetailView):
context["now"] = timezone.now() context["now"] = timezone.now()
context["drops"] = ( context["drops"] = (
TimeBasedDrop.objects.filter(campaign=campaign) TimeBasedDrop.objects.filter(campaign=campaign).select_related("campaign").prefetch_related("benefits").order_by("required_minutes_watched")
.select_related("campaign")
.prefetch_related("benefits")
.order_by("required_minutes_watched")
) )
return context return context
@ -215,13 +205,9 @@ class GameListView(ListView):
games_by_org: dict[OrganizationData, list[dict[str, Game | dict[str, int]]]] = {} games_by_org: dict[OrganizationData, list[dict[str, Game | dict[str, int]]]] = {}
now: datetime.datetime = timezone.now() now: datetime.datetime = timezone.now()
organizations_with_games: QuerySet[Organization, Organization] = ( organizations_with_games: QuerySet[Organization, Organization] = Organization.objects.filter(drop_campaigns__isnull=False).distinct().order_by("name")
Organization.objects.filter(drop_campaigns__isnull=False).distinct().order_by("name")
)
game_org_relations: QuerySet[DropCampaign, dict[str, Any]] = DropCampaign.objects.values( game_org_relations: QuerySet[DropCampaign, dict[str, Any]] = DropCampaign.objects.values("game_id", "owner_id", "owner__name").annotate(
"game_id", "owner_id", "owner__name"
).annotate(
campaign_count=Count("id", distinct=True), campaign_count=Count("id", distinct=True),
active_count=Count("id", filter=Q(start_at__lte=now, end_at__gte=now), distinct=True), active_count=Count("id", filter=Q(start_at__lte=now, end_at__gte=now), distinct=True),
) )
@ -289,9 +275,7 @@ class GameDetailView(DetailView):
subscription = NotificationSubscription.objects.filter(user=user, game=game).first() subscription = NotificationSubscription.objects.filter(user=user, game=game).first()
now: datetime.datetime = timezone.now() now: datetime.datetime = timezone.now()
all_campaigns: QuerySet[DropCampaign, DropCampaign] = ( all_campaigns: QuerySet[DropCampaign, DropCampaign] = DropCampaign.objects.filter(game=game).select_related("owner").order_by("-end_at")
DropCampaign.objects.filter(game=game).select_related("owner").order_by("-end_at")
)
active_campaigns: list[DropCampaign] = [ active_campaigns: list[DropCampaign] = [
campaign campaign
@ -300,15 +284,11 @@ class GameDetailView(DetailView):
] ]
active_campaigns.sort(key=lambda c: c.end_at if c.end_at is not None else datetime.datetime.max.replace(tzinfo=datetime.UTC)) active_campaigns.sort(key=lambda c: c.end_at if c.end_at is not None else datetime.datetime.max.replace(tzinfo=datetime.UTC))
upcoming_campaigns: list[DropCampaign] = [ upcoming_campaigns: list[DropCampaign] = [campaign for campaign in all_campaigns if campaign.start_at is not None and campaign.start_at > now]
campaign for campaign in all_campaigns if campaign.start_at is not None and campaign.start_at > now
]
upcoming_campaigns.sort(key=lambda c: c.start_at if c.start_at is not None else datetime.datetime.max.replace(tzinfo=datetime.UTC)) upcoming_campaigns.sort(key=lambda c: c.start_at if c.start_at is not None else datetime.datetime.max.replace(tzinfo=datetime.UTC))
expired_campaigns: list[DropCampaign] = [ expired_campaigns: list[DropCampaign] = [campaign for campaign in all_campaigns if campaign.end_at is not None and campaign.end_at < now]
campaign for campaign in all_campaigns if campaign.end_at is not None and campaign.end_at < now
]
context.update({ context.update({
"active_campaigns": active_campaigns, "active_campaigns": active_campaigns,
@ -335,7 +315,12 @@ def dashboard(request: HttpRequest) -> HttpResponse:
active_campaigns: QuerySet[DropCampaign, DropCampaign] = ( active_campaigns: QuerySet[DropCampaign, DropCampaign] = (
DropCampaign.objects.filter(start_at__lte=now, end_at__gte=now) DropCampaign.objects.filter(start_at__lte=now, end_at__gte=now)
.select_related("game", "owner") .select_related("game", "owner")
.prefetch_related(Prefetch("time_based_drops", queryset=TimeBasedDrop.objects.prefetch_related("benefits"))) .prefetch_related(
Prefetch(
"time_based_drops",
queryset=TimeBasedDrop.objects.prefetch_related("benefits"),
)
)
) )
campaigns_by_org_game: dict[str, Any] = {} campaigns_by_org_game: dict[str, Any] = {}
@ -350,19 +335,19 @@ def dashboard(request: HttpRequest) -> HttpResponse:
campaigns_by_org_game[org_id] = {"name": org_name, "games": {}} campaigns_by_org_game[org_id] = {"name": org_name, "games": {}}
if game_id not in campaigns_by_org_game[org_id]["games"]: if game_id not in campaigns_by_org_game[org_id]["games"]:
campaigns_by_org_game[org_id]["games"][game_id] = {"name": game_name, "campaigns": []} campaigns_by_org_game[org_id]["games"][game_id] = {
"name": game_name,
"campaigns": [],
}
campaigns_by_org_game[org_id]["games"][game_id]["campaigns"].append(campaign) campaigns_by_org_game[org_id]["games"][game_id]["campaigns"].append(campaign)
sorted_campaigns_by_org_game: dict[str, Any] = { sorted_campaigns_by_org_game: dict[str, Any] = {
org_id: campaigns_by_org_game[org_id] org_id: campaigns_by_org_game[org_id] for org_id in sorted(campaigns_by_org_game.keys(), key=lambda k: campaigns_by_org_game[k]["name"])
for org_id in sorted(campaigns_by_org_game.keys(), key=lambda k: campaigns_by_org_game[k]["name"])
} }
for org_data in sorted_campaigns_by_org_game.values(): for org_data in sorted_campaigns_by_org_game.values():
org_data["games"] = { org_data["games"] = {game_id: org_data["games"][game_id] for game_id in sorted(org_data["games"].keys(), key=lambda k: org_data["games"][k]["name"])}
game_id: org_data["games"][game_id] for game_id in sorted(org_data["games"].keys(), key=lambda k: org_data["games"][k]["name"])
}
return render( return render(
request, request,
@ -402,9 +387,7 @@ def debug_view(request: HttpRequest) -> HttpResponse:
).select_related("game", "owner_organization") ).select_related("game", "owner_organization")
# Time-based drops without any benefits # Time-based drops without any benefits
drops_without_benefits: QuerySet[TimeBasedDrop, TimeBasedDrop] = TimeBasedDrop.objects.filter(benefits__isnull=True).select_related( drops_without_benefits: QuerySet[TimeBasedDrop, TimeBasedDrop] = TimeBasedDrop.objects.filter(benefits__isnull=True).select_related("campaign")
"campaign"
)
# Campaigns with invalid dates (start after end or missing either) # Campaigns with invalid dates (start after end or missing either)
invalid_date_campaigns: QuerySet[DropCampaign, DropCampaign] = DropCampaign.objects.filter( invalid_date_campaigns: QuerySet[DropCampaign, DropCampaign] = DropCampaign.objects.filter(
@ -412,14 +395,10 @@ def debug_view(request: HttpRequest) -> HttpResponse:
).select_related("game", "owner") ).select_related("game", "owner")
# Duplicate campaign names per game # Duplicate campaign names per game
duplicate_name_campaigns = ( duplicate_name_campaigns = DropCampaign.objects.values("game_id", "name").annotate(name_count=Count("id")).filter(name_count__gt=1).order_by("-name_count")
DropCampaign.objects.values("game_id", "name").annotate(name_count=Count("id")).filter(name_count__gt=1).order_by("-name_count")
)
# Campaigns currently active but image missing # Campaigns currently active but image missing
active_missing_image = DropCampaign.objects.filter(start_at__lte=now, end_at__gte=now).filter( active_missing_image = DropCampaign.objects.filter(start_at__lte=now, end_at__gte=now).filter(Q(image_url__isnull=True) | Q(image_url__exact=""))
Q(image_url__isnull=True) | Q(image_url__exact="")
)
context: dict[str, Any] = { context: dict[str, Any] = {
"now": now, "now": now,