diff --git a/.vscode/settings.json b/.vscode/settings.json index b03b054..e834d89 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -35,6 +35,7 @@ "IGDB", "Inoreader", "isort", + "iterdump", "Joakim", "kwargs", "lovinator", diff --git a/README.md b/README.md index 10b915a..ace0d5b 100644 --- a/README.md +++ b/README.md @@ -28,3 +28,17 @@ uv run python manage.py import_chat_badges ``` Requires `TWITCH_CLIENT_ID` and `TWITCH_CLIENT_SECRET` environment variables to be set. + +## Create DB Backup + +Create a zstd-compressed SQL dump (only `twitch_` tables) in the datasets directory: + +```bash +uv run python manage.py backup_db +``` + +Optional arguments: + +```bash +uv run python manage.py backup_db --output-dir "" --prefix "ttvdrops" +``` diff --git a/pyproject.toml b/pyproject.toml index d558a8c..f1c5b48 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,6 @@ dev = [ [tool.pytest.ini_options] DJANGO_SETTINGS_MODULE = "config.settings" python_files = ["test_*.py", "*_test.py"] -addopts = ["--reuse-db", "--no-migrations"] filterwarnings = [ "ignore:Parsing dates involving a day of month without a year specified is ambiguous:DeprecationWarning", ] diff --git a/scripts/__init__.py b/scripts/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/templates/base.html b/templates/base.html index 492ff0b..6b86ac6 100644 --- a/templates/base.html +++ b/templates/base.html @@ -155,6 +155,7 @@ +

ttvdrops

Twitch: Dashboard | Campaigns | @@ -165,11 +166,14 @@ Badges | Emotes
- RSS | Debug -
+ Other: + RSS | + Debug | + Dataset | + Donate | + GitHub +
+ +

Dataset Backups

+

Scanning {{ data_dir }} for database backups.

+ {% if datasets %} + + + + + + + + + + + + {% for dataset in datasets %} + + + + + + + + {% endfor %} + +
NamePathSizeUpdatedDownload
{{ dataset.name }}{{ dataset.display_path }}{{ dataset.size }} + + + {% if dataset.download_path %} + Download + {% else %} + - + {% endif %} +
+

Found {{ dataset_count }} datasets.

+ {% else %} +

No dataset backups found.

+ {% endif %} + +{% endblock content %} diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/test_manage.py b/tests/test_manage.py deleted file mode 100644 index 829b309..0000000 --- a/tests/test_manage.py +++ /dev/null @@ -1,45 +0,0 @@ -from __future__ import annotations - -import sys -import types -from typing import Never - -import pytest - -import manage - - -def test_main_importerror(monkeypatch: pytest.MonkeyPatch) -> None: - """Test main raises ImportError if django cannot be imported.""" - monkeypatch.setenv("DJANGO_SETTINGS_MODULE", "") - - def import_fail(*args, **kwargs) -> Never: - msg = "No Django" - raise ImportError(msg) - - monkeypatch.setitem(sys.modules, "django.core.management", None) - monkeypatch.setattr("builtins.__import__", import_fail) - with pytest.raises(ImportError) as excinfo: - manage.main() - assert "Couldn't import Django" in str(excinfo.value) - - -def test_main_executes_command(monkeypatch: pytest.MonkeyPatch) -> None: - """Test main calls execute_from_command_line with sys.argv.""" - called: dict[str, list[str]] = {} - - def fake_execute(argv: list[str]) -> None: - called["argv"] = argv - - fake_module = types.SimpleNamespace(execute_from_command_line=fake_execute) - monkeypatch.setenv("DJANGO_SETTINGS_MODULE", "") - monkeypatch.setitem(sys.modules, "django.core.management", fake_module) - original_import = __import__ - monkeypatch.setattr( - "builtins.__import__", - lambda name, *a, **kw: fake_module if name == "django.core.management" else original_import(name, *a, **kw), - ) - test_argv: list[str] = ["manage.py", "check"] - monkeypatch.setattr(sys, "argv", test_argv) - manage.main() - assert called["argv"] == test_argv diff --git a/twitch/management/commands/backup_db.py b/twitch/management/commands/backup_db.py new file mode 100644 index 0000000..101fe49 --- /dev/null +++ b/twitch/management/commands/backup_db.py @@ -0,0 +1,183 @@ +from __future__ import annotations + +import io +from compression import zstd +from datetime import datetime +from pathlib import Path +from typing import TYPE_CHECKING + +from django.conf import settings +from django.core.management.base import BaseCommand +from django.db import connection as django_connection +from django.utils import timezone + +if TYPE_CHECKING: + import sqlite3 + from argparse import ArgumentParser + + +class Command(BaseCommand): + """Create a compressed SQL dump of the Twitch dataset tables.""" + + help = "Create a compressed SQL dump of the Twitch dataset tables." + + def add_arguments(self, parser: ArgumentParser) -> None: + """Define arguments for the backup command.""" + parser.add_argument( + "--output-dir", + default=str(settings.DATA_DIR / "datasets"), + help="Directory where the backup will be written.", + ) + parser.add_argument( + "--prefix", + default="ttvdrops", + help="Filename prefix for the backup file.", + ) + + def handle(self, **options: str) -> None: + """Run the backup command and write a zstd SQL dump.""" + output_dir: Path = Path(options["output_dir"]).expanduser() + prefix: str = str(options["prefix"]).strip() or "ttvdrops" + output_dir.mkdir(parents=True, exist_ok=True) + + # Use Django's database connection to ensure we connect to the test DB during tests + connection = django_connection.connection + if connection is None: + # Force connection if not already established + django_connection.ensure_connection() + connection = django_connection.connection + + timestamp: str = timezone.localtime(timezone.now()).strftime("%Y%m%d-%H%M%S") + output_path: Path = output_dir / f"{prefix}-{timestamp}.sql.zst" + + allowed_tables = _get_allowed_tables(connection, "twitch_") + if not allowed_tables: + self.stdout.write(self.style.WARNING("No twitch tables found to back up.")) + return + + with ( + output_path.open("wb") as raw_handle, + zstd.open(raw_handle, "w") as compressed, + io.TextIOWrapper(compressed, encoding="utf-8") as handle, + ): + _write_dump(handle, connection, allowed_tables) + + created_at: datetime = datetime.fromtimestamp(output_path.stat().st_mtime, tz=timezone.get_current_timezone()) + self.stdout.write( + self.style.SUCCESS( + f"Backup created: {output_path} (updated {created_at.isoformat()})", + ), + ) + self.stdout.write(self.style.SUCCESS(f"Included tables: {len(allowed_tables)}")) + + +def _get_allowed_tables(connection: sqlite3.Connection, prefix: str) -> list[str]: + """Fetch table names that match the allowed prefix. + + Args: + connection: SQLite connection. + prefix: Table name prefix to include. + + Returns: + List of table names. + """ + cursor = connection.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ? ORDER BY name", + (f"{prefix}%",), + ) + return [row[0] for row in cursor.fetchall()] + + +def _write_dump(handle: io.TextIOBase, connection: sqlite3.Connection, tables: list[str]) -> None: + """Write a SQL dump containing schema and data for the requested tables. + + Args: + handle: Text handle for output. + connection: SQLite connection. + tables: Table names to include. + """ + handle.write("PRAGMA foreign_keys=OFF;\n") + handle.write("BEGIN TRANSACTION;\n") + + for table in tables: + create_sql = _get_table_schema(connection, table) + if not create_sql: + continue + handle.write(f'DROP TABLE IF EXISTS "{table}";\n') + handle.write(f"{create_sql};\n") + _write_table_rows(handle, connection, table) + + _write_indexes(handle, connection, tables) + + handle.write("COMMIT;\n") + handle.write("PRAGMA foreign_keys=ON;\n") + + +def _get_table_schema(connection: sqlite3.Connection, table: str) -> str: + """Fetch the CREATE TABLE statement for a table. + + Args: + connection: SQLite connection. + table: Table name. + + Returns: + The SQL string or an empty string when unavailable. + """ + cursor = connection.execute( + "SELECT sql FROM sqlite_master WHERE type='table' AND name=?", + (table,), + ) + row = cursor.fetchone() + return row[0] if row and row[0] else "" + + +def _write_table_rows(handle: io.TextIOBase, connection: sqlite3.Connection, table: str) -> None: + """Write INSERT statements for a table. + + Args: + handle: Text handle for output. + connection: SQLite connection. + table: Table name. + """ + cursor = connection.execute(f'SELECT * FROM "{table}"') # noqa: S608 + columns = [description[0] for description in cursor.description] + for row in cursor.fetchall(): + values = ", ".join(_sql_literal(row[idx]) for idx in range(len(columns))) + handle.write(f'INSERT INTO "{table}" VALUES ({values});\n') # noqa: S608 + + +def _write_indexes(handle: io.TextIOBase, connection: sqlite3.Connection, tables: list[str]) -> None: + """Write CREATE INDEX statements for included tables. + + Args: + handle: Text handle for output. + connection: SQLite connection. + tables: Table names to include. + """ + table_set = set(tables) + cursor = connection.execute( + "SELECT tbl_name, sql FROM sqlite_master WHERE type='index' AND sql IS NOT NULL ORDER BY name", + ) + for tbl_name, sql in cursor.fetchall(): + if tbl_name in table_set and sql: + handle.write(f"{sql};\n") + + +def _sql_literal(value: object) -> str: + """Convert a Python value to a SQL literal. + + Args: + value: Value to convert. + + Returns: + SQL literal string. + """ + if value is None: + return "NULL" + if isinstance(value, bool): + return "1" if value else "0" + if isinstance(value, (int, float)): + return str(value) + if isinstance(value, bytes): + return "X'" + value.hex() + "'" + return "'" + str(value).replace("'", "''") + "'" diff --git a/twitch/tests/test_backup.py b/twitch/tests/test_backup.py new file mode 100644 index 0000000..66a27a8 --- /dev/null +++ b/twitch/tests/test_backup.py @@ -0,0 +1,451 @@ +from __future__ import annotations + +import io +import math +import os +from compression import zstd +from typing import TYPE_CHECKING + +import pytest +from django.conf import settings +from django.core.management import call_command +from django.db import connection +from django.urls import reverse + +from twitch.management.commands.backup_db import _get_allowed_tables +from twitch.management.commands.backup_db import _sql_literal +from twitch.management.commands.backup_db import _write_dump +from twitch.models import Game +from twitch.models import Organization + +if TYPE_CHECKING: + from pathlib import Path + + from django.test import Client + from django.test.client import _MonkeyPatchedWSGIResponse + + +@pytest.mark.django_db +class TestBackupCommand: + """Tests for the backup_db management command.""" + + def test_backup_creates_file(self, tmp_path: Path) -> None: + """Test that backup command creates a zstd compressed file.""" + # Create test data so tables exist + Organization.objects.create(twitch_id="test000", name="Test Org") + + output_dir = tmp_path / "backups" + output_dir.mkdir() + + call_command("backup_db", output_dir=str(output_dir), prefix="test") + + backup_files = list(output_dir.glob("test-*.sql.zst")) + assert len(backup_files) == 1 + assert backup_files[0].exists() + assert backup_files[0].stat().st_size > 0 + + def test_backup_contains_sql_content(self, tmp_path: Path) -> None: + """Test that backup file contains valid SQL content.""" + output_dir = tmp_path / "backups" + output_dir.mkdir() + + # Create some test data + org = Organization.objects.create(twitch_id="test123", name="Test Org") + game = Game.objects.create(twitch_id="game456", display_name="Test Game") + game.owners.add(org) + + call_command("backup_db", output_dir=str(output_dir), prefix="test") + + backup_file = next(iter(output_dir.glob("test-*.sql.zst"))) + + # Decompress and read content + with ( + backup_file.open("rb") as raw_handle, + zstd.open(raw_handle, "r") as compressed, + io.TextIOWrapper(compressed, encoding="utf-8") as handle, + ): + content = handle.read() + + assert "PRAGMA foreign_keys=OFF;" in content + assert "BEGIN TRANSACTION;" in content + assert "COMMIT;" in content + assert "twitch_organization" in content + assert "twitch_game" in content + assert "Test Org" in content + + def test_backup_excludes_non_twitch_tables(self, tmp_path: Path) -> None: + """Test that backup only includes twitch_ prefixed tables.""" + # Create test data so tables exist + Organization.objects.create(twitch_id="test001", name="Test Org") + + output_dir = tmp_path / "backups" + output_dir.mkdir() + + call_command("backup_db", output_dir=str(output_dir), prefix="test") + + backup_file = next(iter(output_dir.glob("test-*.sql.zst"))) + + with ( + backup_file.open("rb") as raw_handle, + zstd.open(raw_handle, "r") as compressed, + io.TextIOWrapper(compressed, encoding="utf-8") as handle, + ): + content = handle.read() + + # Should NOT contain django admin, silk, or debug toolbar tables + assert "django_session" not in content + assert "silk_" not in content + assert "debug_toolbar_" not in content + assert "django_admin_log" not in content + + # Should contain twitch tables + assert "twitch_" in content + + def test_backup_with_custom_prefix(self, tmp_path: Path) -> None: + """Test that custom prefix is used in filename.""" + # Create test data so tables exist + Organization.objects.create(twitch_id="test002", name="Test Org") + + output_dir = tmp_path / "backups" + output_dir.mkdir() + + call_command("backup_db", output_dir=str(output_dir), prefix="custom") + + backup_files = list(output_dir.glob("custom-*.sql.zst")) + assert len(backup_files) == 1 + + def test_backup_creates_output_directory(self, tmp_path: Path) -> None: + """Test that backup command creates output directory if missing.""" + # Create test data so tables exist + Organization.objects.create(twitch_id="test003", name="Test Org") + + output_dir = tmp_path / "nonexistent" / "backups" + + call_command("backup_db", output_dir=str(output_dir), prefix="test") + + assert output_dir.exists() + assert len(list(output_dir.glob("test-*.sql.zst"))) == 1 + + def test_backup_uses_default_directory(self) -> None: + """Test that backup uses DATA_DIR/datasets by default.""" + # Create test data so tables exist + Organization.objects.create(twitch_id="test004", name="Test Org") + + datasets_dir = settings.DATA_DIR / "datasets" + datasets_dir.mkdir(exist_ok=True, parents=True) + + # Clean up any existing test backups + for old_backup in datasets_dir.glob("ttvdrops-*.sql.zst"): + old_backup.unlink() + + call_command("backup_db") + + backup_files = list(datasets_dir.glob("ttvdrops-*.sql.zst")) + assert len(backup_files) >= 1 + + # Clean up + for backup in backup_files: + backup.unlink() + + +@pytest.mark.django_db +class TestBackupHelperFunctions: + """Tests for backup command helper functions.""" + + def test_get_allowed_tables_filters_by_prefix(self) -> None: + """Test that _get_allowed_tables returns only matching tables.""" + # Use Django's connection to access the test database + db_connection = connection.connection + tables = _get_allowed_tables(db_connection, "twitch_") + + assert len(tables) > 0 + assert all(table.startswith("twitch_") for table in tables) + assert "twitch_organization" in tables + assert "twitch_game" in tables + + def test_get_allowed_tables_excludes_non_matching(self) -> None: + """Test that _get_allowed_tables excludes non-matching tables.""" + # Use Django's connection to access the test database + db_connection = connection.connection + tables = _get_allowed_tables(db_connection, "twitch_") + + # Should not include django, silk, or debug toolbar tables + assert not any(table.startswith("django_") for table in tables) + assert not any(table.startswith("silk_") for table in tables) + assert not any(table.startswith("debug_toolbar_") for table in tables) + + def test_sql_literal_handles_none(self) -> None: + """Test _sql_literal converts None to NULL.""" + assert _sql_literal(None) == "NULL" + + def test_sql_literal_handles_booleans(self) -> None: + """Test _sql_literal converts booleans to 1/0.""" + assert _sql_literal(True) == "1" + assert _sql_literal(False) == "0" + + def test_sql_literal_handles_numbers(self) -> None: + """Test _sql_literal handles int and float.""" + assert _sql_literal(42) == "42" + assert _sql_literal(math.pi) == str(math.pi) + + def test_sql_literal_handles_strings(self) -> None: + """Test _sql_literal quotes and escapes strings.""" + assert _sql_literal("test") == "'test'" + assert _sql_literal("o'reilly") == "'o''reilly'" + assert _sql_literal("test\nline") == "'test\nline'" + + def test_sql_literal_handles_bytes(self) -> None: + """Test _sql_literal converts bytes to hex notation.""" + assert _sql_literal(b"\x00\x01\x02") == "X'000102'" + assert _sql_literal(b"hello") == "X'68656c6c6f'" + + def test_write_dump_includes_schema_and_data(self) -> None: + """Test _write_dump writes complete SQL dump.""" + # Create test data + Organization.objects.create(twitch_id="test789", name="Write Test Org") + + # Use Django's connection to access the test database + db_connection = connection.connection + output = io.StringIO() + + tables = _get_allowed_tables(db_connection, "twitch_") + _write_dump(output, db_connection, tables) + + content = output.getvalue() + + # Check for SQL structure + assert "PRAGMA foreign_keys=OFF;" in content + assert "BEGIN TRANSACTION;" in content + assert "COMMIT;" in content + assert "PRAGMA foreign_keys=ON;" in content + + # Check for schema + assert "CREATE TABLE" in content + assert "twitch_organization" in content + + # Check for data + assert "INSERT INTO" in content + assert "Write Test Org" in content + + +@pytest.mark.django_db +class TestDatasetBackupViews: + """Tests for dataset backup list and download views.""" + + @pytest.fixture + def datasets_dir(self, tmp_path: Path) -> Path: + """Create a temporary datasets directory. + + Returns: + Path to the created datasets directory. + """ + datasets_dir = tmp_path / "datasets" + datasets_dir.mkdir() + return datasets_dir + + @pytest.fixture + def sample_backup(self, datasets_dir: Path) -> Path: + """Create a sample backup file. + + Returns: + Path to the created backup file. + """ + backup_file = datasets_dir / "ttvdrops-20260210-120000.sql.zst" + with ( + backup_file.open("wb") as raw_handle, + zstd.open(raw_handle, "w") as compressed, + io.TextIOWrapper(compressed, encoding="utf-8") as handle, + ): + handle.write("-- Sample backup content\n") + return backup_file + + def test_dataset_list_view_shows_backups( + self, + client: Client, + datasets_dir: Path, + sample_backup: Path, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Test that dataset list view displays backup files.""" + monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent) + + response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:dataset_backups")) + + assert response.status_code == 200 + assert b"ttvdrops-20260210-120000.sql.zst" in response.content + assert b"1 datasets" in response.content or b"1 dataset" in response.content + + def test_dataset_list_view_empty_directory( + self, + client: Client, + datasets_dir: Path, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Test dataset list view with empty directory.""" + monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent) + + response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:dataset_backups")) + + assert response.status_code == 200 + assert b"No dataset backups found" in response.content + + def test_dataset_list_view_sorts_by_date( + self, + client: Client, + datasets_dir: Path, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Test that backups are sorted by modification time.""" + monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent) + + # Create multiple backup files with different timestamps + older_backup = datasets_dir / "ttvdrops-20260210-100000.sql.zst" + newer_backup = datasets_dir / "ttvdrops-20260210-140000.sql.zst" + + for backup in [older_backup, newer_backup]: + with ( + backup.open("wb") as raw_handle, + zstd.open(raw_handle, "w") as compressed, + io.TextIOWrapper(compressed, encoding="utf-8") as handle, + ): + handle.write("-- Test\n") + + # Set explicit modification times to ensure proper sorting + older_time = 1707561600 # 2024-02-10 10:00:00 UTC + newer_time = 1707575400 # 2024-02-10 14:00:00 UTC + os.utime(older_backup, (older_time, older_time)) + os.utime(newer_backup, (newer_time, newer_time)) + + response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:dataset_backups")) + + content = response.content.decode() + newer_pos = content.find("20260210-140000") + older_pos = content.find("20260210-100000") + + # Newer backup should appear first (sorted descending) + assert 0 < newer_pos < older_pos + + def test_dataset_download_view_success( + self, + client: Client, + datasets_dir: Path, + sample_backup: Path, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Test successful backup download.""" + monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent) + + response: _MonkeyPatchedWSGIResponse = client.get( + reverse("twitch:dataset_backup_download", args=["ttvdrops-20260210-120000.sql.zst"]), + ) + + assert response.status_code == 200 + # FileResponse may use application/x-compressed for .zst files + assert "attachment" in response["Content-Disposition"] + assert "ttvdrops-20260210-120000.sql.zst" in response["Content-Disposition"] + + def test_dataset_download_prevents_path_traversal( + self, + client: Client, + datasets_dir: Path, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Test that path traversal attempts are blocked.""" + monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent) + + # Attempt path traversal + response = client.get(reverse("twitch:dataset_backup_download", args=["../../../etc/passwd"])) + assert response.status_code == 404 + + def test_dataset_download_rejects_invalid_extensions( + self, + client: Client, + datasets_dir: Path, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Test that files with invalid extensions cannot be downloaded.""" + monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent) + + # Create a file with invalid extension + invalid_file = datasets_dir / "malicious.exe" + invalid_file.write_text("not a backup") + + response = client.get(reverse("twitch:dataset_backup_download", args=["malicious.exe"])) + assert response.status_code == 404 + + def test_dataset_download_file_not_found( + self, + client: Client, + datasets_dir: Path, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Test download returns 404 for non-existent file.""" + monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent) + + response = client.get(reverse("twitch:dataset_backup_download", args=["nonexistent.sql.zst"])) + assert response.status_code == 404 + + def test_dataset_list_view_shows_file_sizes( + self, + client: Client, + datasets_dir: Path, + sample_backup: Path, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Test that file sizes are displayed in human-readable format.""" + monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent) + + response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:dataset_backups")) + + assert response.status_code == 200 + # Should contain size information (bytes, KB, MB, or GB) + content = response.content.decode() + assert any(unit in content for unit in ["bytes", "KB", "MB", "GB"]) + + def test_dataset_list_ignores_non_zst_files( + self, + client: Client, + datasets_dir: Path, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Test that non-zst files are ignored in listing.""" + monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent) + + # Create various file types + (datasets_dir / "backup.sql.zst").write_bytes(b"valid") + (datasets_dir / "readme.txt").write_text("should be ignored") + (datasets_dir / "old_backup.gz").write_bytes(b"should be ignored") + + response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:dataset_backups")) + + content = response.content.decode() + assert "backup.sql.zst" in content + assert "readme.txt" not in content + assert "old_backup.gz" not in content + + def test_dataset_download_view_handles_subdirectories( + self, + client: Client, + datasets_dir: Path, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Test download works with files in subdirectories.""" + monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent) + + # Create subdirectory with backup + subdir = datasets_dir / "2026" / "02" + subdir.mkdir(parents=True) + backup_file = subdir / "backup.sql.zst" + with ( + backup_file.open("wb") as raw_handle, + zstd.open(raw_handle, "w") as compressed, + io.TextIOWrapper(compressed, encoding="utf-8") as handle, + ): + handle.write("-- Test\n") + + response: _MonkeyPatchedWSGIResponse = client.get( + reverse("twitch:dataset_backup_download", args=["2026/02/backup.sql.zst"]), + ) + + assert response.status_code == 200 + assert "attachment" in response["Content-Disposition"] diff --git a/twitch/urls.py b/twitch/urls.py index efa18ef..f580cc9 100644 --- a/twitch/urls.py +++ b/twitch/urls.py @@ -27,6 +27,12 @@ urlpatterns: list[URLPattern] = [ path("channels/", views.ChannelListView.as_view(), name="channel_list"), path("channels//", views.ChannelDetailView.as_view(), name="channel_detail"), path("debug/", views.debug_view, name="debug"), + path("datasets/", views.dataset_backups_view, name="dataset_backups"), + path( + "datasets/download//", + views.dataset_backup_download_view, + name="dataset_backup_download", + ), path("docs/rss/", views.docs_rss_view, name="docs_rss"), path("emotes/", views.emote_gallery_view, name="emote_gallery"), path("games/", views.GamesGridView.as_view(), name="game_list"), diff --git a/twitch/views.py b/twitch/views.py index 68959df..6516213 100644 --- a/twitch/views.py +++ b/twitch/views.py @@ -3,6 +3,7 @@ from __future__ import annotations import datetime import json import logging +import operator from collections import OrderedDict from collections import defaultdict from copy import copy @@ -10,6 +11,7 @@ from typing import TYPE_CHECKING from typing import Any from typing import Literal +from django.conf import settings from django.core.paginator import EmptyPage from django.core.paginator import Page from django.core.paginator import PageNotAnInteger @@ -22,10 +24,12 @@ from django.db.models import Prefetch from django.db.models import Q from django.db.models import Subquery from django.db.models.functions import Trim +from django.http import FileResponse from django.http import Http404 from django.http import HttpRequest from django.http import HttpResponse from django.shortcuts import render +from django.template.defaultfilters import filesizeformat from django.urls import reverse from django.utils import timezone from django.views.generic import DetailView @@ -52,10 +56,13 @@ from twitch.models import TimeBasedDrop if TYPE_CHECKING: from collections.abc import Callable + from os import stat_result + from pathlib import Path from debug_toolbar.utils import QueryDict from django.db.models.query import QuerySet + logger: logging.Logger = logging.getLogger("ttvdrops.views") MIN_QUERY_LENGTH_FOR_FTS = 3 @@ -319,6 +326,99 @@ def format_and_color_json(data: dict[str, Any] | list[dict] | str) -> str: return highlight(formatted_code, JsonLexer(), HtmlFormatter()) +# MARK: /datasets/ +def dataset_backups_view(request: HttpRequest) -> HttpResponse: + """View to list database backup datasets on disk. + + Args: + request: The HTTP request. + + Returns: + HttpResponse: The rendered dataset backups page. + """ + datasets_root: Path = settings.DATA_DIR / "datasets" + search_dirs: list[Path] = [datasets_root] + seen_paths: set[str] = set() + datasets: list[dict[str, Any]] = [] + + for folder in search_dirs: + if not folder.exists() or not folder.is_dir(): + continue + + # Only include .zst files + for path in folder.glob("*.zst"): + if not path.is_file(): + continue + key = str(path.resolve()) + if key in seen_paths: + continue + seen_paths.add(key) + stat: stat_result = path.stat() + updated_at: datetime.datetime = datetime.datetime.fromtimestamp( + stat.st_mtime, + tz=timezone.get_current_timezone(), + ) + try: + display_path = str(path.relative_to(datasets_root)) + download_path: str | None = display_path + except ValueError: + display_path: str = path.name + download_path: str | None = None + datasets.append({ + "name": path.name, + "display_path": display_path, + "download_path": download_path, + "size": filesizeformat(stat.st_size), + "updated_at": updated_at, + }) + + datasets.sort(key=operator.itemgetter("updated_at"), reverse=True) + + context: dict[str, Any] = { + "datasets": datasets, + "data_dir": str(datasets_root), + "dataset_count": len(datasets), + } + return render(request, "twitch/dataset_backups.html", context) + + +def dataset_backup_download_view(request: HttpRequest, relative_path: str) -> FileResponse: # noqa: ARG001 + """Download a dataset backup from the data directory. + + Args: + request: The HTTP request. + relative_path: The path relative to the data directory. + + Returns: + FileResponse: The file response for the requested dataset. + + Raises: + Http404: When the file is not found or is outside the data directory. + """ + allowed_endings = (".zst",) + datasets_root: Path = settings.DATA_DIR / "datasets" + requested_path: Path = (datasets_root / relative_path).resolve() + data_root: Path = datasets_root.resolve() + + try: + requested_path.relative_to(data_root) + except ValueError as exc: + msg = "File not found" + raise Http404(msg) from exc + if not requested_path.exists() or not requested_path.is_file(): + msg = "File not found" + raise Http404(msg) + if not requested_path.name.endswith(allowed_endings): + msg = "File not found" + raise Http404(msg) + + return FileResponse( + requested_path.open("rb"), + as_attachment=True, + filename=requested_path.name, + ) + + def _enhance_drops_with_context(drops: QuerySet[TimeBasedDrop], now: datetime.datetime) -> list[dict[str, Any]]: """Helper to enhance drops with countdown and context.