diff --git a/twitch/management/commands/import_drops.py b/twitch/management/commands/import_drops.py index 9ef22a3..e6b8c76 100644 --- a/twitch/management/commands/import_drops.py +++ b/twitch/management/commands/import_drops.py @@ -86,10 +86,10 @@ class Command(BaseCommand): CommandError: If the file/directory doesn't exist, isn't a JSON file, or has an invalid JSON structure. ValueError: If the JSON file has an invalid structure. - TypeError: If the JSON file has an invalid structure. - AttributeError: If the JSON file has an invalid structure. - KeyError: If the JSON file has an invalid structure. - IndexError: If the JSON file has an invalid structure. + TypeError: If the JSON file has an invalid JSON structure. + AttributeError: If the JSON file has an invalid JSON structure. + KeyError: If the JSON file has an invalid JSON structure. + IndexError: If the JSON file has an invalid JSON structure. """ paths: list[str] = options["paths"] processed_dir: str = options["processed_dir"] @@ -118,6 +118,10 @@ class Command(BaseCommand): raise self.stdout.write(self.style.ERROR(f"Data error processing path {p}")) self.stdout.write(self.style.ERROR(traceback.format_exc())) + except KeyboardInterrupt: + # Gracefully handle Ctrl+C + self.stdout.write(self.style.WARNING("Interrupted by user, exiting import.")) + return def process_drops(self, *, continue_on_error: bool, path: Path, processed_path: Path) -> None: """Process drops from a file or directory. @@ -171,6 +175,7 @@ class Command(BaseCommand): AttributeError: If a JSON file has an invalid structure. KeyError: If a JSON file has an invalid structure. IndexError: If a JSON file has an invalid structure. + KeyboardInterrupt: If processing is interrupted by the user. """ json_files: list[Path] = list(directory.glob("*.json")) if not json_files: @@ -182,29 +187,33 @@ class Command(BaseCommand): start_time: float = time.time() processed = 0 - with concurrent.futures.ThreadPoolExecutor() as executor: - future_to_file: dict[concurrent.futures.Future[None], Path] = { - executor.submit(self._process_file, json_file, processed_path): json_file for json_file in json_files - } - for future in concurrent.futures.as_completed(future_to_file): - json_file: Path = future_to_file[future] - self.stdout.write(f"Processing file {json_file.name}...") - try: - future.result() - except CommandError as e: - if not continue_on_error: - raise - self.stdout.write(self.style.ERROR(f"Error processing {json_file}: {e}")) - except (ValueError, TypeError, AttributeError, KeyError, IndexError): - if not continue_on_error: - raise - self.stdout.write(self.style.ERROR(f"Data error processing {json_file}")) - self.stdout.write(self.style.ERROR(traceback.format_exc())) + try: + with concurrent.futures.ThreadPoolExecutor() as executor: + future_to_file: dict[concurrent.futures.Future[None], Path] = { + executor.submit(self._process_file, json_file, processed_path): json_file for json_file in json_files + } + for future in concurrent.futures.as_completed(future_to_file): + json_file: Path = future_to_file[future] + self.stdout.write(f"Processing file {json_file.name}...") + try: + future.result() + except CommandError as e: + if not continue_on_error: + raise + self.stdout.write(self.style.ERROR(f"Error processing {json_file}: {e}")) + except (ValueError, TypeError, AttributeError, KeyError, IndexError): + if not continue_on_error: + raise + self.stdout.write(self.style.ERROR(f"Data error processing {json_file}")) + self.stdout.write(self.style.ERROR(traceback.format_exc())) - self.update_processing_progress(total_files=total_files, start_time=start_time, processed=processed) - - msg: str = f"Processed {total_files} JSON files in {directory}. Moved processed files to {processed_path}." - self.stdout.write(self.style.SUCCESS(msg)) + self.update_processing_progress(total_files=total_files, start_time=start_time, processed=processed) + except KeyboardInterrupt: + self.stdout.write(self.style.WARNING("Interrupted by user, exiting import.")) + raise + else: + msg: str = f"Processed {total_files} JSON files in {directory}. Moved processed files to {processed_path}." + self.stdout.write(self.style.SUCCESS(msg)) def update_processing_progress(self, total_files: int, start_time: float, processed: int) -> None: """Update and display processing progress.