Use json_repair instead of json

This commit is contained in:
Joakim Hellsén 2026-01-06 22:04:41 +01:00
commit 4562991ad2
No known key found for this signature in database

View file

@ -9,6 +9,7 @@ from pathlib import Path
from typing import Any from typing import Any
from typing import Literal from typing import Literal
import json_repair
from colorama import Fore from colorama import Fore
from colorama import Style from colorama import Style
from colorama import init as colorama_init from colorama import init as colorama_init
@ -16,6 +17,7 @@ from django.core.management.base import BaseCommand
from django.core.management.base import CommandError from django.core.management.base import CommandError
from django.core.management.base import CommandParser from django.core.management.base import CommandParser
from django.db import DatabaseError from django.db import DatabaseError
from json_repair import JSONReturnType
from pydantic import ValidationError from pydantic import ValidationError
from tqdm import tqdm from tqdm import tqdm
@ -220,7 +222,7 @@ def detect_non_campaign_keyword(raw_text: str) -> str | None:
def extract_operation_name_from_parsed( def extract_operation_name_from_parsed(
payload: dict[str, Any] | list[Any], payload: JSONReturnType | tuple[JSONReturnType, list[dict[str, str]]] | str,
) -> str | None: ) -> str | None:
"""Extract GraphQL operationName from an already parsed JSON payload. """Extract GraphQL operationName from an already parsed JSON payload.
@ -997,6 +999,30 @@ class Command(BaseCommand):
json_files = [f for f in input_path.iterdir() if f.is_file() and f.suffix == ".json"] json_files = [f for f in input_path.iterdir() if f.is_file() and f.suffix == ".json"]
return json_files return json_files
def _normalize_responses(
self,
parsed_json: JSONReturnType | tuple[JSONReturnType, list[dict[str, str]]] | str,
) -> list[dict[str, Any]]:
"""Normalize various parsed JSON shapes into a list of dict responses.
Args:
parsed_json: The parsed JSON data from the file.
Raises:
TypeError: If the parsed JSON is a tuple, which is unsupported.
Returns:
A list of response dictionaries.
"""
if isinstance(parsed_json, dict):
return [parsed_json]
if isinstance(parsed_json, list):
return [item for item in parsed_json if isinstance(item, dict)]
if isinstance(parsed_json, tuple):
msg = "Tuple responses are not supported in this context."
raise TypeError(msg)
return []
def process_file_worker( def process_file_worker(
self, self,
file_path: Path, file_path: Path,
@ -1019,7 +1045,9 @@ class Command(BaseCommand):
raw_text: str = file_path.read_text(encoding="utf-8", errors="ignore") raw_text: str = file_path.read_text(encoding="utf-8", errors="ignore")
# Parse JSON early to extract operation name for better directory organization # Parse JSON early to extract operation name for better directory organization
parsed_json: dict[str, Any] = json.loads(raw_text) parsed_json: JSONReturnType | tuple[JSONReturnType, list[dict[str, str]]] | str = json_repair.loads(
raw_text,
)
operation_name: str | None = extract_operation_name_from_parsed(parsed_json) operation_name: str | None = extract_operation_name_from_parsed(parsed_json)
matched: str | None = detect_non_campaign_keyword(raw_text) matched: str | None = detect_non_campaign_keyword(raw_text)
@ -1042,8 +1070,8 @@ class Command(BaseCommand):
return {"success": False, "broken_dir": str(broken_dir), "reason": "no dropCampaign present"} return {"success": False, "broken_dir": str(broken_dir), "reason": "no dropCampaign present"}
return {"success": False, "broken_dir": "(skipped)", "reason": "no dropCampaign present"} return {"success": False, "broken_dir": "(skipped)", "reason": "no dropCampaign present"}
# Wrap single response in list for consistent processing # Normalize and filter to dict responses only
responses: list[dict[str, Any]] = parsed_json if isinstance(parsed_json, list) else [parsed_json] responses: list[dict[str, Any]] = self._normalize_responses(parsed_json)
processed, broken_dir = self.process_responses( processed, broken_dir = self.process_responses(
responses=responses, responses=responses,
file_path=file_path, file_path=file_path,
@ -1059,7 +1087,7 @@ class Command(BaseCommand):
} }
campaign_structure: str | None = self._detect_campaign_structure( campaign_structure: str | None = self._detect_campaign_structure(
parsed_json if isinstance(parsed_json, dict) else (parsed_json[0] if parsed_json else {}), responses[0] if responses else {},
) )
move_completed_file( move_completed_file(
file_path=file_path, file_path=file_path,
@ -1106,7 +1134,9 @@ class Command(BaseCommand):
raw_text: str = file_path.read_text(encoding="utf-8", errors="ignore") raw_text: str = file_path.read_text(encoding="utf-8", errors="ignore")
# Parse JSON early to extract operation name for better directory organization # Parse JSON early to extract operation name for better directory organization
parsed_json: dict[str, Any] = json.loads(raw_text) parsed_json: JSONReturnType | tuple[JSONReturnType, list[dict[str, str]]] | str = json_repair.loads(
raw_text,
)
operation_name: str | None = extract_operation_name_from_parsed(parsed_json) operation_name: str | None = extract_operation_name_from_parsed(parsed_json)
matched: str | None = detect_non_campaign_keyword(raw_text) matched: str | None = detect_non_campaign_keyword(raw_text)
@ -1146,8 +1176,8 @@ class Command(BaseCommand):
) )
return return
# Wrap single response in list for consistent processing # Normalize and filter to dict responses only
responses: list[dict[str, Any]] = parsed_json if isinstance(parsed_json, list) else [parsed_json] responses: list[dict[str, Any]] = self._normalize_responses(parsed_json)
processed, broken_dir = self.process_responses( processed, broken_dir = self.process_responses(
responses=responses, responses=responses,
@ -1164,7 +1194,7 @@ class Command(BaseCommand):
return return
campaign_structure: str | None = self._detect_campaign_structure( campaign_structure: str | None = self._detect_campaign_structure(
parsed_json if isinstance(parsed_json, dict) else (parsed_json[0] if parsed_json else {}), responses[0] if responses else {},
) )
move_completed_file( move_completed_file(
file_path=file_path, file_path=file_path,