diff --git a/pyproject.toml b/pyproject.toml index cb0892f..3fd3c18 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ lint.isort.required-imports = ["from __future__ import annotations"] lint.ignore = [ "ANN002", # Checks that function *args arguments have type annotations. "ANN003", # Checks that function **kwargs arguments have type annotations. + "C901", # Checks for functions with a high McCabe complexity. "CPY001", # Checks for the absence of copyright notices within Python files. "D100", # Checks for undocumented public module definitions. "D104", # Checks for undocumented public package definitions. @@ -46,6 +47,8 @@ lint.ignore = [ "D106", # Checks for undocumented public class definitions, for nested classes. "ERA001", # Checks for commented-out Python code. "FIX002", # Checks for "TODO" comments. + "PLR0911", # Checks for functions or methods with too many return statements. + "PLR0912", # Checks for functions or methods with too many branches, including (nested) if, elif, and else branches, for loops, try-except clauses, and match and case statements. "PLR6301", # Checks for the presence of unused self parameter in methods definitions. "RUF012", # Checks for mutable default values in class attributes. @@ -69,7 +72,7 @@ lint.ignore = [ preview = true unsafe-fixes = true fix = true -line-length = 140 +line-length = 160 [tool.ruff.lint.per-file-ignores] "**/tests/**" = [ diff --git a/twitch/management/commands/import_drops.py b/twitch/management/commands/import_drops.py index cf7cb79..56d296e 100644 --- a/twitch/management/commands/import_drops.py +++ b/twitch/management/commands/import_drops.py @@ -35,8 +35,17 @@ class Command(BaseCommand): Args: parser: The command argument parser. """ - parser.add_argument("path", type=str, help="Path to the JSON file or directory containing JSON files.") - parser.add_argument("--processed-dir", type=str, default="processed", help="Subdirectory to move processed files to") + parser.add_argument( + "path", + type=str, + help="Path to the JSON file or directory containing JSON files.", + ) + parser.add_argument( + "--processed-dir", + type=str, + default="processed", + help="Subdirectory to move processed files to", + ) def handle(self, **options) -> None: """Execute the command. @@ -98,9 +107,7 @@ class Command(BaseCommand): # Still invalid after cleanup, move to broken_json broken_json_dir: Path = processed_path / "broken_json" broken_json_dir.mkdir(parents=True, exist_ok=True) - self.stdout.write( - self.style.WARNING(f"Invalid JSON in '{json_file}', even after cleanup. Moving to '{broken_json_dir}'.") - ) + self.stdout.write(self.style.WARNING(f"Invalid JSON in '{json_file}', even after cleanup. Moving to '{broken_json_dir}'.")) self.move_file(json_file, broken_json_dir / json_file.name) except (ValueError, TypeError, AttributeError, KeyError, IndexError): self.stdout.write(self.style.ERROR(f"Data error processing {json_file}")) @@ -109,7 +116,7 @@ class Command(BaseCommand): msg: str = f"Processed {total_files} JSON files in {directory}. Moved processed files to {processed_path}." self.stdout.write(self.style.SUCCESS(msg)) - def _process_file(self, file_path: Path, processed_path: Path) -> None: # noqa: C901, PLR0911, PLR0912 + def _process_file(self, file_path: Path, processed_path: Path) -> None: """Process a single JSON file. Args: @@ -191,10 +198,7 @@ class Command(BaseCommand): return # If file has "__typename": "DropCurrentSession" move it to the "drop_current_session" directory so we can process it separately. - if ( - isinstance(data, dict) - and data.get("data", {}).get("currentUser", {}).get("dropCurrentSession", {}).get("__typename") == "DropCurrentSession" - ): + if isinstance(data, dict) and data.get("data", {}).get("currentUser", {}).get("dropCurrentSession", {}).get("__typename") == "DropCurrentSession": drop_current_session_dir: Path = processed_path / "drop_current_session" drop_current_session_dir.mkdir(parents=True, exist_ok=True) self.move_file(file_path, drop_current_session_dir / file_path.name) @@ -226,7 +230,10 @@ class Command(BaseCommand): shutil.move(str(file_path), str(processed_path)) except FileExistsError: # Rename the file if contents is different than the existing one - with file_path.open("rb") as f1, (processed_path / file_path.name).open("rb") as f2: + with ( + file_path.open("rb") as f1, + (processed_path / file_path.name).open("rb") as f2, + ): if f1.read() != f2.read(): new_name: Path = processed_path / f"{file_path.stem}_duplicate{file_path.suffix}" shutil.move(str(file_path), str(new_name)) @@ -348,7 +355,7 @@ class Command(BaseCommand): ) self.stdout.write(self.style.SUCCESS(f"Successfully imported drop campaign {drop_campaign.name} (ID: {drop_campaign.id})")) - def create_time_based_drop(self, drop_campaign: DropCampaign, drop_data: dict[str, Any]) -> TimeBasedDrop: # noqa: C901 + def create_time_based_drop(self, drop_campaign: DropCampaign, drop_data: dict[str, Any]) -> TimeBasedDrop: """Creates or updates a TimeBasedDrop instance based on the provided drop data. Args: @@ -409,13 +416,16 @@ class Command(BaseCommand): time_based_drop, created = TimeBasedDrop.objects.update_or_create(id=drop_data["id"], defaults=defaults) if created: - self.stdout.write( - self.style.SUCCESS(f"Successfully imported time-based drop {time_based_drop.name} (ID: {time_based_drop.id})") - ) + self.stdout.write(self.style.SUCCESS(f"Successfully imported time-based drop {time_based_drop.name} (ID: {time_based_drop.id})")) return time_based_drop - def drop_campaign_update_or_get(self, campaign_data: dict[str, Any], game: Game, organization: Organization | None) -> DropCampaign: # noqa: C901 + def drop_campaign_update_or_get( + self, + campaign_data: dict[str, Any], + game: Game, + organization: Organization | None, + ) -> DropCampaign: """Update or create a drop campaign. Args: diff --git a/twitch/views.py b/twitch/views.py index 20a85bb..e0dc409 100644 --- a/twitch/views.py +++ b/twitch/views.py @@ -15,14 +15,7 @@ from django.shortcuts import get_object_or_404, redirect, render from django.utils import timezone from django.views.generic import DetailView, ListView -from twitch.models import ( - DropBenefit, - DropCampaign, - Game, - NotificationSubscription, - Organization, - TimeBasedDrop, -) +from twitch.models import DropBenefit, DropCampaign, Game, NotificationSubscription, Organization, TimeBasedDrop if TYPE_CHECKING: from django.db.models import QuerySet @@ -155,10 +148,7 @@ class DropCampaignDetailView(DetailView): context["now"] = timezone.now() context["drops"] = ( - TimeBasedDrop.objects.filter(campaign=campaign) - .select_related("campaign") - .prefetch_related("benefits") - .order_by("required_minutes_watched") + TimeBasedDrop.objects.filter(campaign=campaign).select_related("campaign").prefetch_related("benefits").order_by("required_minutes_watched") ) return context @@ -215,13 +205,9 @@ class GameListView(ListView): games_by_org: dict[OrganizationData, list[dict[str, Game | dict[str, int]]]] = {} now: datetime.datetime = timezone.now() - organizations_with_games: QuerySet[Organization, Organization] = ( - Organization.objects.filter(drop_campaigns__isnull=False).distinct().order_by("name") - ) + organizations_with_games: QuerySet[Organization, Organization] = Organization.objects.filter(drop_campaigns__isnull=False).distinct().order_by("name") - game_org_relations: QuerySet[DropCampaign, dict[str, Any]] = DropCampaign.objects.values( - "game_id", "owner_id", "owner__name" - ).annotate( + game_org_relations: QuerySet[DropCampaign, dict[str, Any]] = DropCampaign.objects.values("game_id", "owner_id", "owner__name").annotate( campaign_count=Count("id", distinct=True), active_count=Count("id", filter=Q(start_at__lte=now, end_at__gte=now), distinct=True), ) @@ -289,9 +275,7 @@ class GameDetailView(DetailView): subscription = NotificationSubscription.objects.filter(user=user, game=game).first() now: datetime.datetime = timezone.now() - all_campaigns: QuerySet[DropCampaign, DropCampaign] = ( - DropCampaign.objects.filter(game=game).select_related("owner").order_by("-end_at") - ) + all_campaigns: QuerySet[DropCampaign, DropCampaign] = DropCampaign.objects.filter(game=game).select_related("owner").order_by("-end_at") active_campaigns: list[DropCampaign] = [ campaign @@ -300,15 +284,11 @@ class GameDetailView(DetailView): ] active_campaigns.sort(key=lambda c: c.end_at if c.end_at is not None else datetime.datetime.max.replace(tzinfo=datetime.UTC)) - upcoming_campaigns: list[DropCampaign] = [ - campaign for campaign in all_campaigns if campaign.start_at is not None and campaign.start_at > now - ] + upcoming_campaigns: list[DropCampaign] = [campaign for campaign in all_campaigns if campaign.start_at is not None and campaign.start_at > now] upcoming_campaigns.sort(key=lambda c: c.start_at if c.start_at is not None else datetime.datetime.max.replace(tzinfo=datetime.UTC)) - expired_campaigns: list[DropCampaign] = [ - campaign for campaign in all_campaigns if campaign.end_at is not None and campaign.end_at < now - ] + expired_campaigns: list[DropCampaign] = [campaign for campaign in all_campaigns if campaign.end_at is not None and campaign.end_at < now] context.update({ "active_campaigns": active_campaigns, @@ -335,7 +315,12 @@ def dashboard(request: HttpRequest) -> HttpResponse: active_campaigns: QuerySet[DropCampaign, DropCampaign] = ( DropCampaign.objects.filter(start_at__lte=now, end_at__gte=now) .select_related("game", "owner") - .prefetch_related(Prefetch("time_based_drops", queryset=TimeBasedDrop.objects.prefetch_related("benefits"))) + .prefetch_related( + Prefetch( + "time_based_drops", + queryset=TimeBasedDrop.objects.prefetch_related("benefits"), + ) + ) ) campaigns_by_org_game: dict[str, Any] = {} @@ -350,19 +335,19 @@ def dashboard(request: HttpRequest) -> HttpResponse: campaigns_by_org_game[org_id] = {"name": org_name, "games": {}} if game_id not in campaigns_by_org_game[org_id]["games"]: - campaigns_by_org_game[org_id]["games"][game_id] = {"name": game_name, "campaigns": []} + campaigns_by_org_game[org_id]["games"][game_id] = { + "name": game_name, + "campaigns": [], + } campaigns_by_org_game[org_id]["games"][game_id]["campaigns"].append(campaign) sorted_campaigns_by_org_game: dict[str, Any] = { - org_id: campaigns_by_org_game[org_id] - for org_id in sorted(campaigns_by_org_game.keys(), key=lambda k: campaigns_by_org_game[k]["name"]) + org_id: campaigns_by_org_game[org_id] for org_id in sorted(campaigns_by_org_game.keys(), key=lambda k: campaigns_by_org_game[k]["name"]) } for org_data in sorted_campaigns_by_org_game.values(): - org_data["games"] = { - game_id: org_data["games"][game_id] for game_id in sorted(org_data["games"].keys(), key=lambda k: org_data["games"][k]["name"]) - } + org_data["games"] = {game_id: org_data["games"][game_id] for game_id in sorted(org_data["games"].keys(), key=lambda k: org_data["games"][k]["name"])} return render( request, @@ -402,9 +387,7 @@ def debug_view(request: HttpRequest) -> HttpResponse: ).select_related("game", "owner_organization") # Time-based drops without any benefits - drops_without_benefits: QuerySet[TimeBasedDrop, TimeBasedDrop] = TimeBasedDrop.objects.filter(benefits__isnull=True).select_related( - "campaign" - ) + drops_without_benefits: QuerySet[TimeBasedDrop, TimeBasedDrop] = TimeBasedDrop.objects.filter(benefits__isnull=True).select_related("campaign") # Campaigns with invalid dates (start after end or missing either) invalid_date_campaigns: QuerySet[DropCampaign, DropCampaign] = DropCampaign.objects.filter( @@ -412,14 +395,10 @@ def debug_view(request: HttpRequest) -> HttpResponse: ).select_related("game", "owner") # Duplicate campaign names per game - duplicate_name_campaigns = ( - DropCampaign.objects.values("game_id", "name").annotate(name_count=Count("id")).filter(name_count__gt=1).order_by("-name_count") - ) + duplicate_name_campaigns = DropCampaign.objects.values("game_id", "name").annotate(name_count=Count("id")).filter(name_count__gt=1).order_by("-name_count") # Campaigns currently active but image missing - active_missing_image = DropCampaign.objects.filter(start_at__lte=now, end_at__gte=now).filter( - Q(image_url__isnull=True) | Q(image_url__exact="") - ) + active_missing_image = DropCampaign.objects.filter(start_at__lte=now, end_at__gte=now).filter(Q(image_url__isnull=True) | Q(image_url__exact="")) context: dict[str, Any] = { "now": now,