Add progress tracking for JSON file processing

This commit is contained in:
Joakim Hellsén 2025-09-24 03:06:46 +02:00
commit bececd6ac4

View file

@ -1,9 +1,10 @@
from __future__ import annotations from __future__ import annotations
import concurrent.futures import concurrent.futures
import logging
import shutil import shutil
import time
import traceback import traceback
from datetime import timedelta
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
@ -18,9 +19,7 @@ from twitch.utils.images import cache_remote_image
if TYPE_CHECKING: if TYPE_CHECKING:
from datetime import datetime from datetime import datetime
from typing import Literal
logger: logging.Logger = logging.getLogger(__name__)
def parse_date(value: str | None) -> datetime | None: def parse_date(value: str | None) -> datetime | None:
@ -180,6 +179,8 @@ class Command(BaseCommand):
total_files: int = len(json_files) total_files: int = len(json_files)
self.stdout.write(f"Found {total_files} JSON files to process") self.stdout.write(f"Found {total_files} JSON files to process")
start_time: float = time.time()
processed = 0
with concurrent.futures.ThreadPoolExecutor() as executor: with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_file: dict[concurrent.futures.Future[None], Path] = { future_to_file: dict[concurrent.futures.Future[None], Path] = {
@ -200,9 +201,26 @@ class Command(BaseCommand):
self.stdout.write(self.style.ERROR(f"Data error processing {json_file}")) self.stdout.write(self.style.ERROR(f"Data error processing {json_file}"))
self.stdout.write(self.style.ERROR(traceback.format_exc())) self.stdout.write(self.style.ERROR(traceback.format_exc()))
self.update_processing_progress(total_files=total_files, start_time=start_time, processed=processed)
msg: str = f"Processed {total_files} JSON files in {directory}. Moved processed files to {processed_path}." msg: str = f"Processed {total_files} JSON files in {directory}. Moved processed files to {processed_path}."
self.stdout.write(self.style.SUCCESS(msg)) self.stdout.write(self.style.SUCCESS(msg))
def update_processing_progress(self, total_files: int, start_time: float, processed: int) -> None:
"""Update and display processing progress.
Args:
total_files: Total number of files to process.
start_time: Timestamp when processing started.
processed: Number of files processed so far.
"""
processed += 1
elapsed: float = time.time() - start_time
rate: float | Literal[0] = processed / elapsed if elapsed > 0 else 0
remaining: int = total_files - processed
eta: timedelta = timedelta(seconds=int(remaining / rate)) if rate > 0 else timedelta(seconds=0)
self.stdout.write(f"Progress: {processed}/{total_files} files - {rate:.2f} files/sec - ETA {eta}")
def _process_file(self, file_path: Path, processed_path: Path) -> None: def _process_file(self, file_path: Path, processed_path: Path) -> None:
"""Process a single JSON file. """Process a single JSON file.