Refactor import command
This commit is contained in:
parent
998c6703d8
commit
fe86aabd71
1 changed files with 21 additions and 38 deletions
|
|
@ -22,17 +22,8 @@ class Command(BaseCommand):
|
||||||
Args:
|
Args:
|
||||||
parser: The command argument parser.
|
parser: The command argument parser.
|
||||||
"""
|
"""
|
||||||
parser.add_argument(
|
parser.add_argument("path", type=str, help="Path to the JSON file or directory containing JSON files.")
|
||||||
"path",
|
parser.add_argument("--processed-dir", type=str, default="processed", help="Subdirectory to move processed files to")
|
||||||
type=str,
|
|
||||||
help="Path to the JSON file or directory containing JSON files with drop campaign data",
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"--processed-dir",
|
|
||||||
type=str,
|
|
||||||
default="processed",
|
|
||||||
help="Name of subdirectory to move processed files to (default: 'processed')",
|
|
||||||
)
|
|
||||||
|
|
||||||
def handle(self, **options) -> None:
|
def handle(self, **options) -> None:
|
||||||
"""Execute the command.
|
"""Execute the command.
|
||||||
|
|
@ -44,59 +35,58 @@ class Command(BaseCommand):
|
||||||
CommandError: If the file/directory doesn't exist, isn't a JSON file,
|
CommandError: If the file/directory doesn't exist, isn't a JSON file,
|
||||||
or has an invalid JSON structure.
|
or has an invalid JSON structure.
|
||||||
"""
|
"""
|
||||||
path: str = options["path"]
|
path: Path = Path(options["path"])
|
||||||
processed_dir: str = options["processed_dir"]
|
processed_dir: str = options["processed_dir"]
|
||||||
path_obj = Path(path)
|
|
||||||
|
|
||||||
if not path_obj.exists():
|
processed_path: Path = path / processed_dir
|
||||||
|
processed_path.mkdir(exist_ok=True)
|
||||||
|
|
||||||
|
if not path.exists():
|
||||||
msg: str = f"Path {path} does not exist"
|
msg: str = f"Path {path} does not exist"
|
||||||
raise CommandError(msg)
|
raise CommandError(msg)
|
||||||
|
|
||||||
if path_obj.is_file():
|
if path.is_file():
|
||||||
self._process_file(path_obj, processed_dir)
|
self._process_file(file_path=path, processed_path=processed_path)
|
||||||
elif path_obj.is_dir():
|
elif path.is_dir():
|
||||||
self._process_directory(path_obj, processed_dir)
|
self._process_directory(directory=path, processed_path=processed_path)
|
||||||
else:
|
else:
|
||||||
msg = f"Path {path} is neither a file nor a directory"
|
msg = f"Path {path} is neither a file nor a directory"
|
||||||
raise CommandError(msg)
|
raise CommandError(msg)
|
||||||
|
|
||||||
def _process_directory(self, directory: Path, processed_dir: str) -> None:
|
def _process_directory(self, directory: Path, processed_path: Path) -> None:
|
||||||
"""Process all JSON files in a directory using parallel processing.
|
"""Process all JSON files in a directory using parallel processing.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
directory: Path to the directory.
|
directory: Path to the directory.
|
||||||
processed_dir: Name of subdirectory to move processed files to.
|
processed_path: Name of subdirectory to move processed files to.
|
||||||
"""
|
"""
|
||||||
processed_path: Path = directory / processed_dir
|
|
||||||
processed_path.mkdir(exist_ok=True)
|
|
||||||
|
|
||||||
json_files: list[Path] = list(directory.glob("*.json"))
|
json_files: list[Path] = list(directory.glob("*.json"))
|
||||||
if not json_files:
|
if not json_files:
|
||||||
self.stdout.write(self.style.WARNING(f"No JSON files found in {directory}"))
|
self.stdout.write(self.style.WARNING(f"No JSON files found in {directory}"))
|
||||||
return
|
return
|
||||||
|
|
||||||
total_files = len(json_files)
|
total_files: int = len(json_files)
|
||||||
self.stdout.write(f"Found {total_files} JSON files to process")
|
self.stdout.write(f"Found {total_files} JSON files to process")
|
||||||
|
|
||||||
for json_file in json_files:
|
for json_file in json_files:
|
||||||
self.stdout.write(f"Processing file {json_file.name}...")
|
self.stdout.write(f"Processing file {json_file.name}...")
|
||||||
try:
|
try:
|
||||||
self._process_file(json_file, processed_dir)
|
self._process_file(json_file, processed_path)
|
||||||
except CommandError as e:
|
except CommandError as e:
|
||||||
self.stdout.write(self.style.ERROR(f"Error processing {json_file}: {e}"))
|
self.stdout.write(self.style.ERROR(f"Error processing {json_file}: {e}"))
|
||||||
except (ValueError, TypeError, AttributeError, KeyError, IndexError, json.JSONDecodeError) as e:
|
except (ValueError, TypeError, AttributeError, KeyError, IndexError, json.JSONDecodeError) as e:
|
||||||
self.stdout.write(self.style.ERROR(f"Data error processing {json_file}: {e!s}"))
|
self.stdout.write(self.style.ERROR(f"Data error processing {json_file}: {e!s}"))
|
||||||
|
|
||||||
self.stdout.write(
|
self.stdout.write(
|
||||||
self.style.SUCCESS(f"Completed processing {total_files} JSON files in {directory}. Processed files moved to {processed_dir}.")
|
self.style.SUCCESS(f"Completed processing {total_files} JSON files in {directory}. Processed files moved to {processed_path}.")
|
||||||
)
|
)
|
||||||
|
|
||||||
def _process_file(self, file_path: Path, processed_dir: str) -> None:
|
def _process_file(self, file_path: Path, processed_path: Path) -> None:
|
||||||
"""Process a single JSON file.
|
"""Process a single JSON file.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
file_path: Path to the JSON file.
|
file_path: Path to the JSON file.
|
||||||
processed_dir: Name of subdirectory to move processed files to.
|
processed_path: Subdirectory to move processed files to.
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
CommandError: If the file isn't a JSON file or has invalid JSON structure.
|
CommandError: If the file isn't a JSON file or has invalid JSON structure.
|
||||||
|
|
@ -106,10 +96,8 @@ class Command(BaseCommand):
|
||||||
|
|
||||||
if isinstance(data, list):
|
if isinstance(data, list):
|
||||||
for item in data:
|
for item in data:
|
||||||
if "data" in item and "user" in item["data"] and "dropCampaign" in item["data"]["user"]:
|
|
||||||
drop_campaign_data = item["data"]["user"]["dropCampaign"]
|
drop_campaign_data = item["data"]["user"]["dropCampaign"]
|
||||||
self._import_drop_campaign_with_retry(drop_campaign_data)
|
self._import_drop_campaign_with_retry(drop_campaign_data)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if "data" not in data or "user" not in data["data"] or "dropCampaign" not in data["data"]["user"]:
|
if "data" not in data or "user" not in data["data"] or "dropCampaign" not in data["data"]["user"]:
|
||||||
msg = "Invalid JSON structure: Missing data.user.dropCampaign"
|
msg = "Invalid JSON structure: Missing data.user.dropCampaign"
|
||||||
|
|
@ -118,12 +106,7 @@ class Command(BaseCommand):
|
||||||
drop_campaign_data = data["data"]["user"]["dropCampaign"]
|
drop_campaign_data = data["data"]["user"]["dropCampaign"]
|
||||||
self._import_drop_campaign_with_retry(drop_campaign_data)
|
self._import_drop_campaign_with_retry(drop_campaign_data)
|
||||||
|
|
||||||
if processed_dir:
|
shutil.move(str(file_path), str(processed_path))
|
||||||
processed_path: Path = file_path.parent / processed_dir
|
|
||||||
processed_path.mkdir(exist_ok=True)
|
|
||||||
|
|
||||||
new_path: Path = processed_path / file_path.name
|
|
||||||
shutil.move(str(file_path), str(new_path))
|
|
||||||
|
|
||||||
def _import_drop_campaign_with_retry(self, campaign_data: dict[str, Any]) -> None:
|
def _import_drop_campaign_with_retry(self, campaign_data: dict[str, Any]) -> None:
|
||||||
"""Import drop campaign data into the database with retry logic for SQLite locks.
|
"""Import drop campaign data into the database with retry logic for SQLite locks.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue