diff --git a/twitch/management/commands/import_drops.py b/twitch/management/commands/import_drops.py index 9c48815..e937ff6 100644 --- a/twitch/management/commands/import_drops.py +++ b/twitch/management/commands/import_drops.py @@ -1,5 +1,6 @@ from __future__ import annotations +import concurrent.futures import logging import shutil import traceback @@ -160,18 +161,17 @@ class Command(BaseCommand): """Process all JSON files in a directory using parallel processing. Args: - directory: Path to the directory. - processed_path: Name of subdirectory to move processed files to. - continue_on_error: Continue processing if an error occurs. + directory: Path to the directory containing JSON files. + processed_path: Path to the subdirectory where processed files will be moved. + continue_on_error: Whether to continue processing remaining files if an error occurs. Raises: - CommandError: If the file/directory doesn't exist, isn't a JSON file, - or has an invalid JSON structure. - ValueError: If the JSON file has an invalid structure. - TypeError: If the JSON file has an invalid structure. - AttributeError: If the JSON file has an invalid structure. - KeyError: If the JSON file has an invalid structure. - IndexError: If the JSON file has an invalid structure. + CommandError: If the path is invalid or moving files fails. + ValueError: If a JSON file has an invalid structure. + TypeError: If a JSON file has an invalid structure. + AttributeError: If a JSON file has an invalid structure. + KeyError: If a JSON file has an invalid structure. + IndexError: If a JSON file has an invalid structure. """ json_files: list[Path] = list(directory.glob("*.json")) if not json_files: @@ -181,19 +181,24 @@ class Command(BaseCommand): total_files: int = len(json_files) self.stdout.write(f"Found {total_files} JSON files to process") - for json_file in json_files: - self.stdout.write(f"Processing file {json_file.name}...") - try: - self._process_file(json_file, processed_path) - except CommandError as e: - if not continue_on_error: - raise - self.stdout.write(self.style.ERROR(f"Error processing {json_file}: {e}")) - except (ValueError, TypeError, AttributeError, KeyError, IndexError): - if not continue_on_error: - raise - self.stdout.write(self.style.ERROR(f"Data error processing {json_file}")) - self.stdout.write(self.style.ERROR(traceback.format_exc())) + with concurrent.futures.ThreadPoolExecutor() as executor: + future_to_file: dict[concurrent.futures.Future[None], Path] = { + executor.submit(self._process_file, json_file, processed_path): json_file for json_file in json_files + } + for future in concurrent.futures.as_completed(future_to_file): + json_file: Path = future_to_file[future] + self.stdout.write(f"Processing file {json_file.name}...") + try: + future.result() + except CommandError as e: + if not continue_on_error: + raise + self.stdout.write(self.style.ERROR(f"Error processing {json_file}: {e}")) + except (ValueError, TypeError, AttributeError, KeyError, IndexError): + if not continue_on_error: + raise + self.stdout.write(self.style.ERROR(f"Data error processing {json_file}")) + self.stdout.write(self.style.ERROR(traceback.format_exc())) msg: str = f"Processed {total_files} JSON files in {directory}. Moved processed files to {processed_path}." self.stdout.write(self.style.SUCCESS(msg))