Remove CSV backup functionality and related tests, retaining only JSON backup support
This commit is contained in:
parent
76b1cd70a5
commit
942672ac48
2 changed files with 0 additions and 113 deletions
|
|
@ -1,4 +1,3 @@
|
|||
import csv
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
|
|
@ -87,13 +86,6 @@ class Command(BaseCommand):
|
|||
json_path: Path = output_dir / f"{prefix}-{timestamp}.json.zst"
|
||||
_write_json_dump(json_path, allowed_tables)
|
||||
|
||||
csv_path: Path = _write_csv_dump(
|
||||
output_dir,
|
||||
prefix,
|
||||
timestamp,
|
||||
allowed_tables,
|
||||
)
|
||||
|
||||
created_at: datetime = datetime.fromtimestamp(
|
||||
output_path.stat().st_mtime,
|
||||
tz=timezone.get_current_timezone(),
|
||||
|
|
@ -104,7 +96,6 @@ class Command(BaseCommand):
|
|||
),
|
||||
)
|
||||
self.stdout.write(self.style.SUCCESS(f"JSON backup created: {json_path}"))
|
||||
self.stdout.write(self.style.SUCCESS(f"CSV backup created: {csv_path}"))
|
||||
self.stdout.write(self.style.SUCCESS(f"Included tables: {len(allowed_tables)}"))
|
||||
|
||||
|
||||
|
|
@ -349,46 +340,3 @@ def _write_json_dump(output_path: Path, tables: list[str]) -> None:
|
|||
io.TextIOWrapper(compressed, encoding="utf-8") as handle,
|
||||
):
|
||||
json.dump(data, handle, default=_json_default)
|
||||
|
||||
|
||||
def _write_csv_dump(
|
||||
output_dir: Path,
|
||||
prefix: str,
|
||||
timestamp: str,
|
||||
tables: list[str],
|
||||
) -> Path:
|
||||
"""Write a combined CSV file containing rows from all tables.
|
||||
|
||||
Args:
|
||||
output_dir: Directory where CSV files will be written.
|
||||
prefix: Filename prefix.
|
||||
timestamp: Timestamp string for filenames.
|
||||
tables: Table names to include.
|
||||
|
||||
Returns:
|
||||
Created file path.
|
||||
"""
|
||||
output_path: Path = output_dir / f"{prefix}-{timestamp}.csv.zst"
|
||||
|
||||
with (
|
||||
output_path.open("wb") as raw_handle,
|
||||
zstd.open(raw_handle, "w") as compressed,
|
||||
io.TextIOWrapper(compressed, encoding="utf-8") as handle,
|
||||
):
|
||||
writer: csv.Writer = csv.writer(handle)
|
||||
writer.writerow(["table", "row_json"])
|
||||
|
||||
with django_connection.cursor() as cursor:
|
||||
for table in tables:
|
||||
cursor.execute(f'SELECT * FROM "{table}"') # noqa: S608
|
||||
columns: list[str] = [col[0] for col in cursor.description]
|
||||
rows: list[tuple] = cursor.fetchall()
|
||||
|
||||
for row in rows:
|
||||
row_dict = dict(zip(columns, row, strict=False))
|
||||
writer.writerow([
|
||||
table,
|
||||
json.dumps(row_dict, default=_json_default),
|
||||
])
|
||||
|
||||
return output_path
|
||||
|
|
|
|||
|
|
@ -1,4 +1,3 @@
|
|||
import csv
|
||||
import io
|
||||
import json
|
||||
import math
|
||||
|
|
@ -17,7 +16,6 @@ from django.urls import reverse
|
|||
from twitch.management.commands.backup_db import _get_allowed_tables
|
||||
from twitch.management.commands.backup_db import _json_default
|
||||
from twitch.management.commands.backup_db import _sql_literal
|
||||
from twitch.management.commands.backup_db import _write_csv_dump
|
||||
from twitch.management.commands.backup_db import _write_json_dump
|
||||
from twitch.management.commands.backup_db import _write_postgres_dump
|
||||
from twitch.management.commands.backup_db import _write_sqlite_dump
|
||||
|
|
@ -25,7 +23,6 @@ from twitch.models import Game
|
|||
from twitch.models import Organization
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from csv import Reader
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
|
|
@ -198,34 +195,6 @@ class TestBackupCommand:
|
|||
row.get("name") == "Test Org JSON" for row in data["twitch_organization"]
|
||||
)
|
||||
|
||||
def test_backup_creates_single_csv_file(self, tmp_path: Path) -> None:
|
||||
"""Test that backup command creates a single CSV file alongside the SQL dump."""
|
||||
_skip_if_pg_dump_missing()
|
||||
Organization.objects.create(twitch_id="test_csv", name="Test Org CSV")
|
||||
|
||||
output_dir: Path = tmp_path / "backups"
|
||||
output_dir.mkdir()
|
||||
|
||||
call_command("backup_db", output_dir=str(output_dir), prefix="test")
|
||||
|
||||
csv_files: list[Path] = list(output_dir.glob("test-*.csv.zst"))
|
||||
assert len(csv_files) == 1
|
||||
|
||||
with (
|
||||
csv_files[0].open("rb") as raw_handle,
|
||||
zstd.open(raw_handle, "r") as compressed,
|
||||
io.TextIOWrapper(compressed, encoding="utf-8") as handle,
|
||||
):
|
||||
reader: Reader = csv.reader(handle)
|
||||
rows: list[list[str]] = list(reader)
|
||||
|
||||
assert len(rows) >= 2 # header + at least one data row
|
||||
assert rows[0] == ["table", "row_json"]
|
||||
data_rows: list[list[str]] = [
|
||||
row for row in rows[1:] if row and row[0] == "twitch_organization"
|
||||
]
|
||||
assert any("Test Org CSV" in row[1] for row in data_rows)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestBackupHelperFunctions:
|
||||
|
|
@ -337,36 +306,6 @@ class TestBackupHelperFunctions:
|
|||
row.get("name") == "JSON Helper Org" for row in data["twitch_organization"]
|
||||
)
|
||||
|
||||
def test_write_csv_dump_creates_single_file(self, tmp_path: Path) -> None:
|
||||
"""Test _write_csv_dump creates one combined compressed CSV file."""
|
||||
Organization.objects.create(twitch_id="test_csv_helper", name="CSV Helper Org")
|
||||
|
||||
tables: list[str] = _get_allowed_tables("twitch_")
|
||||
path: Path = _write_csv_dump(
|
||||
tmp_path,
|
||||
"test",
|
||||
"20260317-120000",
|
||||
tables,
|
||||
)
|
||||
|
||||
assert path.exists()
|
||||
assert path.name == "test-20260317-120000.csv.zst"
|
||||
|
||||
with (
|
||||
path.open("rb") as raw_handle,
|
||||
zstd.open(raw_handle, "r") as compressed,
|
||||
io.TextIOWrapper(compressed, encoding="utf-8") as handle,
|
||||
):
|
||||
reader: Reader = csv.reader(handle)
|
||||
rows: list[list[str]] = list(reader)
|
||||
|
||||
assert len(rows) >= 2 # header + at least one data row
|
||||
assert rows[0] == ["table", "row_json"]
|
||||
data_rows: list[list[str]] = [
|
||||
row for row in rows[1:] if row and row[0] == "twitch_organization"
|
||||
]
|
||||
assert any("CSV Helper Org" in row[1] for row in data_rows)
|
||||
|
||||
def test_json_default_handles_bytes(self) -> None:
|
||||
"""Test _json_default converts bytes to hex string."""
|
||||
assert _json_default(b"\x00\x01") == "0001"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue