diff --git a/pyproject.toml b/pyproject.toml index ed7b5f4..3655370 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,8 @@ dependencies = [ "pygments>=2.19.2", "httpx>=0.28.1", "pydantic>=2.12.5", + "tqdm>=4.67.1", + "colorama>=0.4.6", ] [dependency-groups] diff --git a/twitch/management/commands/better_import_drops.py b/twitch/management/commands/better_import_drops.py index 18a391e..a3c9e13 100644 --- a/twitch/management/commands/better_import_drops.py +++ b/twitch/management/commands/better_import_drops.py @@ -3,13 +3,18 @@ from __future__ import annotations import os import sys from concurrent.futures import ProcessPoolExecutor -from concurrent.futures import as_completed +from itertools import repeat from pathlib import Path +from typing import Any +from colorama import Fore +from colorama import Style +from colorama import init as colorama_init from django.core.management.base import BaseCommand from django.core.management.base import CommandError from django.core.management.base import CommandParser from pydantic import ValidationError +from tqdm import tqdm from twitch.models import Channel from twitch.models import DropBenefit @@ -54,24 +59,26 @@ class Command(BaseCommand): parser.add_argument("path", type=str, help="Path to JSON file or directory") parser.add_argument("--recursive", action="store_true", help="Recursively search directories for JSON files") parser.add_argument("--crash-on-error", action="store_true", help="Crash the command on first error instead of continuing") + parser.add_argument("--verbose", action="store_true", help="Print per-file success messages (very chatty)") def pre_fill_cache(self) -> None: """Load all existing IDs from DB into memory to avoid N+1 queries.""" - self.stdout.write("Pre-filling caches...") - self.game_cache = {str(g.twitch_id): g for g in Game.objects.all()} - self.stdout.write(f"\tGames: {len(self.game_cache)}") + cache_operations: list[tuple[str, type, str]] = [ + ("Games", Game, "game_cache"), + ("Organizations", Organization, "organization_cache"), + ("Drop Campaigns", DropCampaign, "drop_campaign_cache"), + ("Channels", Channel, "channel_cache"), + ("Benefits", DropBenefit, "benefit_cache"), + ] - self.organization_cache = {str(o.twitch_id): o for o in Organization.objects.all()} - self.stdout.write(f"\tOrganizations: {len(self.organization_cache)}") + with tqdm(cache_operations, desc="Loading caches", unit="cache", colour="cyan") as progress_bar: + for name, model, cache_attr in progress_bar: + progress_bar.set_description(f"Loading {name}") + cache: dict[str, Any] = {str(obj.twitch_id): obj for obj in model.objects.all()} + setattr(self, cache_attr, cache) + progress_bar.write(f" {Fore.GREEN}✓{Style.RESET_ALL} {name}: {len(cache):,}") - self.drop_campaign_cache = {str(c.twitch_id): c for c in DropCampaign.objects.all()} - self.stdout.write(f"\tDrop Campaigns: {len(self.drop_campaign_cache)}") - - self.channel_cache = {str(ch.twitch_id): ch for ch in Channel.objects.all()} - self.stdout.write(f"\tChannels: {len(self.channel_cache)}") - - self.benefit_cache = {str(b.twitch_id): b for b in DropBenefit.objects.all()} - self.stdout.write(f"\tBenefits: {len(self.benefit_cache)}") + tqdm.write("") def handle(self, *args, **options) -> None: # noqa: ARG002 """Main entry point for the command. @@ -79,6 +86,8 @@ class Command(BaseCommand): Raises: CommandError: If the provided path does not exist. """ + colorama_init(autoreset=True) + input_path: Path = Path(options["path"]).resolve() self.pre_fill_cache() @@ -92,8 +101,8 @@ class Command(BaseCommand): msg: str = f"Path does not exist: {input_path}" raise CommandError(msg) except KeyboardInterrupt: - self.stdout.write(self.style.WARNING("\n\nInterrupted by user!")) - self.stdout.write(self.style.WARNING("Shutting down gracefully...")) + tqdm.write(self.style.WARNING("\n\nInterrupted by user!")) + tqdm.write(self.style.WARNING("Shutting down gracefully...")) sys.exit(130) def process_json_files(self, input_path: Path, options: dict) -> None: @@ -104,28 +113,66 @@ class Command(BaseCommand): options: Command options """ json_files: list[Path] = self.collect_json_files(options, input_path) - self.stdout.write(f"Found {len(json_files)} JSON files to process") + tqdm.write(f"Found {len(json_files)} JSON files to process\n") - completed_count = 0 - with ProcessPoolExecutor() as executor: - futures = {executor.submit(self.process_file_worker, file_path, options): file_path for file_path in json_files} + success_count = 0 + failed_count = 0 + error_count = 0 - for future in as_completed(futures): - file_path: Path = futures[future] + with ( + ProcessPoolExecutor() as executor, + tqdm( + total=len(json_files), + desc="Processing", + unit="file", + bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]", + colour="green", + dynamic_ncols=True, + ) as progress_bar, + ): + # Choose a reasonable chunk_size to reduce overhead for huge file counts + cpu_count = os.cpu_count() or 1 + chunk_size = max(1, min(1000, len(json_files) // (cpu_count * 8 or 1))) + + results_iter = executor.map(self.process_file_worker, json_files, repeat(options), chunksize=chunk_size) + + for file_path in json_files: try: - result: dict[str, bool | str] = future.result() + result: dict[str, bool | str] = next(results_iter) if result["success"]: - self.stdout.write(f"✓ {file_path}") + success_count += 1 + if options.get("verbose"): + progress_bar.write(f"{Fore.GREEN}✓{Style.RESET_ALL} {file_path.name}") else: - self.stdout.write(f"✗ {file_path} -> {result['broken_dir']}/{file_path.name}") - - completed_count += 1 + failed_count += 1 + progress_bar.write(f"{Fore.RED}✗{Style.RESET_ALL} {file_path.name} → {result['broken_dir']}/{file_path.name}") except (OSError, ValueError, KeyError) as e: - self.stdout.write(f"✗ {file_path} (error: {e})") - completed_count += 1 + error_count += 1 + progress_bar.write(f"{Fore.RED}✗{Style.RESET_ALL} {file_path.name} (error: {e})") - self.stdout.write(f"Progress: {completed_count}/{len(json_files)} files processed") - self.stdout.write("") + # Update postfix with statistics + progress_bar.set_postfix_str(f"✓ {success_count} | ✗ {failed_count + error_count}", refresh=True) + progress_bar.update(1) + + self.print_processing_summary(json_files, success_count, failed_count, error_count) + + def print_processing_summary(self, json_files: list[Path], success_count: int, failed_count: int, error_count: int) -> None: + """Print a summary of the batch processing results. + + Args: + json_files: List of JSON file paths that were processed. + success_count: Number of files processed successfully. + failed_count: Number of files that failed validation and were moved. + error_count: Number of files that encountered unexpected errors. + """ + tqdm.write("\n" + "=" * 50) + tqdm.write(self.style.SUCCESS(f"✓ Successfully processed: {success_count}")) + if failed_count > 0: + tqdm.write(self.style.WARNING(f"✗ Validation failed: {failed_count}")) + if error_count > 0: + tqdm.write(self.style.ERROR(f"✗ Errors: {error_count}")) + tqdm.write(f"Total: {len(json_files)}") + tqdm.write("=" * 50) def collect_json_files(self, options: dict, input_path: Path) -> list[Path]: """Collect JSON files from the specified directory. @@ -181,14 +228,20 @@ class Command(BaseCommand): Raises: ValidationError: If the JSON file fails validation """ - self.stdout.write(f"Processing file: {file_path}") + with tqdm( + total=1, + desc=f"Processing {file_path.name}", + unit="file", + colour="green", + dynamic_ncols=True, + ) as progress_bar: + try: + _: ViewerDropsDashboardPayload = ViewerDropsDashboardPayload.model_validate_json(file_path.read_text(encoding="utf-8")) + progress_bar.update(1) + progress_bar.write(f"{Fore.GREEN}✓{Style.RESET_ALL} {file_path.name}") + except ValidationError: + if options["crash_on_error"]: + raise - try: - _: ViewerDropsDashboardPayload = ViewerDropsDashboardPayload.model_validate_json(file_path.read_text(encoding="utf-8")) - self.stdout.write("\tProcessed drop campaigns") - except ValidationError: - if options["crash_on_error"]: - raise - - broken_dir: Path = move_failed_validation_file(file_path) - self.stdout.write(f"\tMoved to {broken_dir} (validation failed)") + broken_dir: Path = move_failed_validation_file(file_path) + progress_bar.write(f"{Fore.RED}✗{Style.RESET_ALL} {file_path.name} → {broken_dir}/{file_path.name}")