diff --git a/twitch/management/commands/import_drop_campaign.py b/twitch/management/commands/import_drop_campaign.py index 83124a5..fbd5832 100644 --- a/twitch/management/commands/import_drop_campaign.py +++ b/twitch/management/commands/import_drop_campaign.py @@ -22,17 +22,8 @@ class Command(BaseCommand): Args: parser: The command argument parser. """ - parser.add_argument( - "path", - type=str, - help="Path to the JSON file or directory containing JSON files with drop campaign data", - ) - parser.add_argument( - "--processed-dir", - type=str, - default="processed", - help="Name of subdirectory to move processed files to (default: 'processed')", - ) + parser.add_argument("path", type=str, help="Path to the JSON file or directory containing JSON files.") + parser.add_argument("--processed-dir", type=str, default="processed", help="Subdirectory to move processed files to") def handle(self, **options) -> None: """Execute the command. @@ -44,59 +35,58 @@ class Command(BaseCommand): CommandError: If the file/directory doesn't exist, isn't a JSON file, or has an invalid JSON structure. """ - path: str = options["path"] + path: Path = Path(options["path"]) processed_dir: str = options["processed_dir"] - path_obj = Path(path) - if not path_obj.exists(): + processed_path: Path = path / processed_dir + processed_path.mkdir(exist_ok=True) + + if not path.exists(): msg: str = f"Path {path} does not exist" raise CommandError(msg) - if path_obj.is_file(): - self._process_file(path_obj, processed_dir) - elif path_obj.is_dir(): - self._process_directory(path_obj, processed_dir) + if path.is_file(): + self._process_file(file_path=path, processed_path=processed_path) + elif path.is_dir(): + self._process_directory(directory=path, processed_path=processed_path) else: msg = f"Path {path} is neither a file nor a directory" raise CommandError(msg) - def _process_directory(self, directory: Path, processed_dir: str) -> None: + def _process_directory(self, directory: Path, processed_path: Path) -> None: """Process all JSON files in a directory using parallel processing. Args: directory: Path to the directory. - processed_dir: Name of subdirectory to move processed files to. + processed_path: Name of subdirectory to move processed files to. """ - processed_path: Path = directory / processed_dir - processed_path.mkdir(exist_ok=True) - json_files: list[Path] = list(directory.glob("*.json")) if not json_files: self.stdout.write(self.style.WARNING(f"No JSON files found in {directory}")) return - total_files = len(json_files) + total_files: int = len(json_files) self.stdout.write(f"Found {total_files} JSON files to process") for json_file in json_files: self.stdout.write(f"Processing file {json_file.name}...") try: - self._process_file(json_file, processed_dir) + self._process_file(json_file, processed_path) except CommandError as e: self.stdout.write(self.style.ERROR(f"Error processing {json_file}: {e}")) except (ValueError, TypeError, AttributeError, KeyError, IndexError, json.JSONDecodeError) as e: self.stdout.write(self.style.ERROR(f"Data error processing {json_file}: {e!s}")) self.stdout.write( - self.style.SUCCESS(f"Completed processing {total_files} JSON files in {directory}. Processed files moved to {processed_dir}.") + self.style.SUCCESS(f"Completed processing {total_files} JSON files in {directory}. Processed files moved to {processed_path}.") ) - def _process_file(self, file_path: Path, processed_dir: str) -> None: + def _process_file(self, file_path: Path, processed_path: Path) -> None: """Process a single JSON file. Args: file_path: Path to the JSON file. - processed_dir: Name of subdirectory to move processed files to. + processed_path: Subdirectory to move processed files to. Raises: CommandError: If the file isn't a JSON file or has invalid JSON structure. @@ -106,10 +96,8 @@ class Command(BaseCommand): if isinstance(data, list): for item in data: - if "data" in item and "user" in item["data"] and "dropCampaign" in item["data"]["user"]: - drop_campaign_data = item["data"]["user"]["dropCampaign"] - self._import_drop_campaign_with_retry(drop_campaign_data) - + drop_campaign_data = item["data"]["user"]["dropCampaign"] + self._import_drop_campaign_with_retry(drop_campaign_data) else: if "data" not in data or "user" not in data["data"] or "dropCampaign" not in data["data"]["user"]: msg = "Invalid JSON structure: Missing data.user.dropCampaign" @@ -118,12 +106,7 @@ class Command(BaseCommand): drop_campaign_data = data["data"]["user"]["dropCampaign"] self._import_drop_campaign_with_retry(drop_campaign_data) - if processed_dir: - processed_path: Path = file_path.parent / processed_dir - processed_path.mkdir(exist_ok=True) - - new_path: Path = processed_path / file_path.name - shutil.move(str(file_path), str(new_path)) + shutil.move(str(file_path), str(processed_path)) def _import_drop_campaign_with_retry(self, campaign_data: dict[str, Any]) -> None: """Import drop campaign data into the database with retry logic for SQLite locks.