Implement dataset functionality with views, URLs, and management command

This commit is contained in:
Joakim Hellsén 2026-02-10 16:47:54 +01:00
commit a12b34a665
Signed by: Joakim Hellsén
SSH key fingerprint: SHA256:/9h/CsExpFp+PRhsfA0xznFx2CGfTT5R/kpuFfUgEQk
12 changed files with 812 additions and 51 deletions

View file

@ -35,6 +35,7 @@
"IGDB", "IGDB",
"Inoreader", "Inoreader",
"isort", "isort",
"iterdump",
"Joakim", "Joakim",
"kwargs", "kwargs",
"lovinator", "lovinator",

View file

@ -28,3 +28,17 @@ uv run python manage.py import_chat_badges
``` ```
Requires `TWITCH_CLIENT_ID` and `TWITCH_CLIENT_SECRET` environment variables to be set. Requires `TWITCH_CLIENT_ID` and `TWITCH_CLIENT_SECRET` environment variables to be set.
## Create DB Backup
Create a zstd-compressed SQL dump (only `twitch_` tables) in the datasets directory:
```bash
uv run python manage.py backup_db
```
Optional arguments:
```bash
uv run python manage.py backup_db --output-dir "<path>" --prefix "ttvdrops"
```

View file

@ -34,7 +34,6 @@ dev = [
[tool.pytest.ini_options] [tool.pytest.ini_options]
DJANGO_SETTINGS_MODULE = "config.settings" DJANGO_SETTINGS_MODULE = "config.settings"
python_files = ["test_*.py", "*_test.py"] python_files = ["test_*.py", "*_test.py"]
addopts = ["--reuse-db", "--no-migrations"]
filterwarnings = [ filterwarnings = [
"ignore:Parsing dates involving a day of month without a year specified is ambiguous:DeprecationWarning", "ignore:Parsing dates involving a day of month without a year specified is ambiguous:DeprecationWarning",
] ]

View file

View file

@ -155,6 +155,7 @@
</style> </style>
</head> </head>
<body> <body>
<h1 style="margin-top: 0.5em; margin-bottom: 0.5em; ">ttvdrops</h1>
<strong>Twitch:</strong> <strong>Twitch:</strong>
<a href="{% url 'twitch:dashboard' %}">Dashboard</a> | <a href="{% url 'twitch:dashboard' %}">Dashboard</a> |
<a href="{% url 'twitch:campaign_list' %}">Campaigns</a> | <a href="{% url 'twitch:campaign_list' %}">Campaigns</a> |
@ -165,11 +166,14 @@
<a href="{% url 'twitch:badge_list' %}">Badges</a> | <a href="{% url 'twitch:badge_list' %}">Badges</a> |
<a href="{% url 'twitch:emote_gallery' %}">Emotes</a> <a href="{% url 'twitch:emote_gallery' %}">Emotes</a>
<br /> <br />
<a href="{% url 'twitch:docs_rss' %}">RSS</a> | <a href="{% url 'twitch:debug' %}">Debug</a> <strong>Other:</strong>
<form action="{% url 'twitch:search' %}" <a href="{% url 'twitch:docs_rss' %}">RSS</a> |
method="get" <a href="{% url 'twitch:debug' %}">Debug</a> |
style="display: inline; <a href="{% url 'twitch:dataset_backups' %}">Dataset</a> |
margin-left: 1rem"> <a href="https://github.com/sponsors/TheLovinator1">Donate</a> |
<a href="https://github.com/TheLovinator1/ttvdrops">GitHub</a>
<br />
<form action="{% url 'twitch:search' %}" method="get">
<input type="search" <input type="search"
name="q" name="q"
placeholder="Search..." placeholder="Search..."

View file

@ -0,0 +1,48 @@
{% extends "base.html" %}
{% block title %}
Dataset Backups
{% endblock title %}
{% block content %}
<main>
<h1 id="page-title">Dataset Backups</h1>
<p>Scanning {{ data_dir }} for database backups.</p>
{% if datasets %}
<table>
<thead>
<tr>
<th>Name</th>
<th>Path</th>
<th>Size</th>
<th>Updated</th>
<th>Download</th>
</tr>
</thead>
<tbody>
{% for dataset in datasets %}
<tr id="dataset-row-{{ forloop.counter }}">
<td>{{ dataset.name }}</td>
<td>{{ dataset.display_path }}</td>
<td>{{ dataset.size }}</td>
<td>
<time datetime="{{ dataset.updated_at|date:'c' }}"
title="{{ dataset.updated_at|date:'DATETIME_FORMAT' }}">
{{ dataset.updated_at|timesince }} ago
</time>
</td>
<td>
{% if dataset.download_path %}
<a href="{% url 'twitch:dataset_backup_download' dataset.download_path %}">Download</a>
{% else %}
-
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
<p>Found {{ dataset_count }} datasets.</p>
{% else %}
<p>No dataset backups found.</p>
{% endif %}
</main>
{% endblock content %}

View file

View file

@ -1,45 +0,0 @@
from __future__ import annotations
import sys
import types
from typing import Never
import pytest
import manage
def test_main_importerror(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test main raises ImportError if django cannot be imported."""
monkeypatch.setenv("DJANGO_SETTINGS_MODULE", "")
def import_fail(*args, **kwargs) -> Never:
msg = "No Django"
raise ImportError(msg)
monkeypatch.setitem(sys.modules, "django.core.management", None)
monkeypatch.setattr("builtins.__import__", import_fail)
with pytest.raises(ImportError) as excinfo:
manage.main()
assert "Couldn't import Django" in str(excinfo.value)
def test_main_executes_command(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test main calls execute_from_command_line with sys.argv."""
called: dict[str, list[str]] = {}
def fake_execute(argv: list[str]) -> None:
called["argv"] = argv
fake_module = types.SimpleNamespace(execute_from_command_line=fake_execute)
monkeypatch.setenv("DJANGO_SETTINGS_MODULE", "")
monkeypatch.setitem(sys.modules, "django.core.management", fake_module)
original_import = __import__
monkeypatch.setattr(
"builtins.__import__",
lambda name, *a, **kw: fake_module if name == "django.core.management" else original_import(name, *a, **kw),
)
test_argv: list[str] = ["manage.py", "check"]
monkeypatch.setattr(sys, "argv", test_argv)
manage.main()
assert called["argv"] == test_argv

View file

@ -0,0 +1,183 @@
from __future__ import annotations
import io
from compression import zstd
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING
from django.conf import settings
from django.core.management.base import BaseCommand
from django.db import connection as django_connection
from django.utils import timezone
if TYPE_CHECKING:
import sqlite3
from argparse import ArgumentParser
class Command(BaseCommand):
"""Create a compressed SQL dump of the Twitch dataset tables."""
help = "Create a compressed SQL dump of the Twitch dataset tables."
def add_arguments(self, parser: ArgumentParser) -> None:
"""Define arguments for the backup command."""
parser.add_argument(
"--output-dir",
default=str(settings.DATA_DIR / "datasets"),
help="Directory where the backup will be written.",
)
parser.add_argument(
"--prefix",
default="ttvdrops",
help="Filename prefix for the backup file.",
)
def handle(self, **options: str) -> None:
"""Run the backup command and write a zstd SQL dump."""
output_dir: Path = Path(options["output_dir"]).expanduser()
prefix: str = str(options["prefix"]).strip() or "ttvdrops"
output_dir.mkdir(parents=True, exist_ok=True)
# Use Django's database connection to ensure we connect to the test DB during tests
connection = django_connection.connection
if connection is None:
# Force connection if not already established
django_connection.ensure_connection()
connection = django_connection.connection
timestamp: str = timezone.localtime(timezone.now()).strftime("%Y%m%d-%H%M%S")
output_path: Path = output_dir / f"{prefix}-{timestamp}.sql.zst"
allowed_tables = _get_allowed_tables(connection, "twitch_")
if not allowed_tables:
self.stdout.write(self.style.WARNING("No twitch tables found to back up."))
return
with (
output_path.open("wb") as raw_handle,
zstd.open(raw_handle, "w") as compressed,
io.TextIOWrapper(compressed, encoding="utf-8") as handle,
):
_write_dump(handle, connection, allowed_tables)
created_at: datetime = datetime.fromtimestamp(output_path.stat().st_mtime, tz=timezone.get_current_timezone())
self.stdout.write(
self.style.SUCCESS(
f"Backup created: {output_path} (updated {created_at.isoformat()})",
),
)
self.stdout.write(self.style.SUCCESS(f"Included tables: {len(allowed_tables)}"))
def _get_allowed_tables(connection: sqlite3.Connection, prefix: str) -> list[str]:
"""Fetch table names that match the allowed prefix.
Args:
connection: SQLite connection.
prefix: Table name prefix to include.
Returns:
List of table names.
"""
cursor = connection.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ? ORDER BY name",
(f"{prefix}%",),
)
return [row[0] for row in cursor.fetchall()]
def _write_dump(handle: io.TextIOBase, connection: sqlite3.Connection, tables: list[str]) -> None:
"""Write a SQL dump containing schema and data for the requested tables.
Args:
handle: Text handle for output.
connection: SQLite connection.
tables: Table names to include.
"""
handle.write("PRAGMA foreign_keys=OFF;\n")
handle.write("BEGIN TRANSACTION;\n")
for table in tables:
create_sql = _get_table_schema(connection, table)
if not create_sql:
continue
handle.write(f'DROP TABLE IF EXISTS "{table}";\n')
handle.write(f"{create_sql};\n")
_write_table_rows(handle, connection, table)
_write_indexes(handle, connection, tables)
handle.write("COMMIT;\n")
handle.write("PRAGMA foreign_keys=ON;\n")
def _get_table_schema(connection: sqlite3.Connection, table: str) -> str:
"""Fetch the CREATE TABLE statement for a table.
Args:
connection: SQLite connection.
table: Table name.
Returns:
The SQL string or an empty string when unavailable.
"""
cursor = connection.execute(
"SELECT sql FROM sqlite_master WHERE type='table' AND name=?",
(table,),
)
row = cursor.fetchone()
return row[0] if row and row[0] else ""
def _write_table_rows(handle: io.TextIOBase, connection: sqlite3.Connection, table: str) -> None:
"""Write INSERT statements for a table.
Args:
handle: Text handle for output.
connection: SQLite connection.
table: Table name.
"""
cursor = connection.execute(f'SELECT * FROM "{table}"') # noqa: S608
columns = [description[0] for description in cursor.description]
for row in cursor.fetchall():
values = ", ".join(_sql_literal(row[idx]) for idx in range(len(columns)))
handle.write(f'INSERT INTO "{table}" VALUES ({values});\n') # noqa: S608
def _write_indexes(handle: io.TextIOBase, connection: sqlite3.Connection, tables: list[str]) -> None:
"""Write CREATE INDEX statements for included tables.
Args:
handle: Text handle for output.
connection: SQLite connection.
tables: Table names to include.
"""
table_set = set(tables)
cursor = connection.execute(
"SELECT tbl_name, sql FROM sqlite_master WHERE type='index' AND sql IS NOT NULL ORDER BY name",
)
for tbl_name, sql in cursor.fetchall():
if tbl_name in table_set and sql:
handle.write(f"{sql};\n")
def _sql_literal(value: object) -> str:
"""Convert a Python value to a SQL literal.
Args:
value: Value to convert.
Returns:
SQL literal string.
"""
if value is None:
return "NULL"
if isinstance(value, bool):
return "1" if value else "0"
if isinstance(value, (int, float)):
return str(value)
if isinstance(value, bytes):
return "X'" + value.hex() + "'"
return "'" + str(value).replace("'", "''") + "'"

451
twitch/tests/test_backup.py Normal file
View file

@ -0,0 +1,451 @@
from __future__ import annotations
import io
import math
import os
from compression import zstd
from typing import TYPE_CHECKING
import pytest
from django.conf import settings
from django.core.management import call_command
from django.db import connection
from django.urls import reverse
from twitch.management.commands.backup_db import _get_allowed_tables
from twitch.management.commands.backup_db import _sql_literal
from twitch.management.commands.backup_db import _write_dump
from twitch.models import Game
from twitch.models import Organization
if TYPE_CHECKING:
from pathlib import Path
from django.test import Client
from django.test.client import _MonkeyPatchedWSGIResponse
@pytest.mark.django_db
class TestBackupCommand:
"""Tests for the backup_db management command."""
def test_backup_creates_file(self, tmp_path: Path) -> None:
"""Test that backup command creates a zstd compressed file."""
# Create test data so tables exist
Organization.objects.create(twitch_id="test000", name="Test Org")
output_dir = tmp_path / "backups"
output_dir.mkdir()
call_command("backup_db", output_dir=str(output_dir), prefix="test")
backup_files = list(output_dir.glob("test-*.sql.zst"))
assert len(backup_files) == 1
assert backup_files[0].exists()
assert backup_files[0].stat().st_size > 0
def test_backup_contains_sql_content(self, tmp_path: Path) -> None:
"""Test that backup file contains valid SQL content."""
output_dir = tmp_path / "backups"
output_dir.mkdir()
# Create some test data
org = Organization.objects.create(twitch_id="test123", name="Test Org")
game = Game.objects.create(twitch_id="game456", display_name="Test Game")
game.owners.add(org)
call_command("backup_db", output_dir=str(output_dir), prefix="test")
backup_file = next(iter(output_dir.glob("test-*.sql.zst")))
# Decompress and read content
with (
backup_file.open("rb") as raw_handle,
zstd.open(raw_handle, "r") as compressed,
io.TextIOWrapper(compressed, encoding="utf-8") as handle,
):
content = handle.read()
assert "PRAGMA foreign_keys=OFF;" in content
assert "BEGIN TRANSACTION;" in content
assert "COMMIT;" in content
assert "twitch_organization" in content
assert "twitch_game" in content
assert "Test Org" in content
def test_backup_excludes_non_twitch_tables(self, tmp_path: Path) -> None:
"""Test that backup only includes twitch_ prefixed tables."""
# Create test data so tables exist
Organization.objects.create(twitch_id="test001", name="Test Org")
output_dir = tmp_path / "backups"
output_dir.mkdir()
call_command("backup_db", output_dir=str(output_dir), prefix="test")
backup_file = next(iter(output_dir.glob("test-*.sql.zst")))
with (
backup_file.open("rb") as raw_handle,
zstd.open(raw_handle, "r") as compressed,
io.TextIOWrapper(compressed, encoding="utf-8") as handle,
):
content = handle.read()
# Should NOT contain django admin, silk, or debug toolbar tables
assert "django_session" not in content
assert "silk_" not in content
assert "debug_toolbar_" not in content
assert "django_admin_log" not in content
# Should contain twitch tables
assert "twitch_" in content
def test_backup_with_custom_prefix(self, tmp_path: Path) -> None:
"""Test that custom prefix is used in filename."""
# Create test data so tables exist
Organization.objects.create(twitch_id="test002", name="Test Org")
output_dir = tmp_path / "backups"
output_dir.mkdir()
call_command("backup_db", output_dir=str(output_dir), prefix="custom")
backup_files = list(output_dir.glob("custom-*.sql.zst"))
assert len(backup_files) == 1
def test_backup_creates_output_directory(self, tmp_path: Path) -> None:
"""Test that backup command creates output directory if missing."""
# Create test data so tables exist
Organization.objects.create(twitch_id="test003", name="Test Org")
output_dir = tmp_path / "nonexistent" / "backups"
call_command("backup_db", output_dir=str(output_dir), prefix="test")
assert output_dir.exists()
assert len(list(output_dir.glob("test-*.sql.zst"))) == 1
def test_backup_uses_default_directory(self) -> None:
"""Test that backup uses DATA_DIR/datasets by default."""
# Create test data so tables exist
Organization.objects.create(twitch_id="test004", name="Test Org")
datasets_dir = settings.DATA_DIR / "datasets"
datasets_dir.mkdir(exist_ok=True, parents=True)
# Clean up any existing test backups
for old_backup in datasets_dir.glob("ttvdrops-*.sql.zst"):
old_backup.unlink()
call_command("backup_db")
backup_files = list(datasets_dir.glob("ttvdrops-*.sql.zst"))
assert len(backup_files) >= 1
# Clean up
for backup in backup_files:
backup.unlink()
@pytest.mark.django_db
class TestBackupHelperFunctions:
"""Tests for backup command helper functions."""
def test_get_allowed_tables_filters_by_prefix(self) -> None:
"""Test that _get_allowed_tables returns only matching tables."""
# Use Django's connection to access the test database
db_connection = connection.connection
tables = _get_allowed_tables(db_connection, "twitch_")
assert len(tables) > 0
assert all(table.startswith("twitch_") for table in tables)
assert "twitch_organization" in tables
assert "twitch_game" in tables
def test_get_allowed_tables_excludes_non_matching(self) -> None:
"""Test that _get_allowed_tables excludes non-matching tables."""
# Use Django's connection to access the test database
db_connection = connection.connection
tables = _get_allowed_tables(db_connection, "twitch_")
# Should not include django, silk, or debug toolbar tables
assert not any(table.startswith("django_") for table in tables)
assert not any(table.startswith("silk_") for table in tables)
assert not any(table.startswith("debug_toolbar_") for table in tables)
def test_sql_literal_handles_none(self) -> None:
"""Test _sql_literal converts None to NULL."""
assert _sql_literal(None) == "NULL"
def test_sql_literal_handles_booleans(self) -> None:
"""Test _sql_literal converts booleans to 1/0."""
assert _sql_literal(True) == "1"
assert _sql_literal(False) == "0"
def test_sql_literal_handles_numbers(self) -> None:
"""Test _sql_literal handles int and float."""
assert _sql_literal(42) == "42"
assert _sql_literal(math.pi) == str(math.pi)
def test_sql_literal_handles_strings(self) -> None:
"""Test _sql_literal quotes and escapes strings."""
assert _sql_literal("test") == "'test'"
assert _sql_literal("o'reilly") == "'o''reilly'"
assert _sql_literal("test\nline") == "'test\nline'"
def test_sql_literal_handles_bytes(self) -> None:
"""Test _sql_literal converts bytes to hex notation."""
assert _sql_literal(b"\x00\x01\x02") == "X'000102'"
assert _sql_literal(b"hello") == "X'68656c6c6f'"
def test_write_dump_includes_schema_and_data(self) -> None:
"""Test _write_dump writes complete SQL dump."""
# Create test data
Organization.objects.create(twitch_id="test789", name="Write Test Org")
# Use Django's connection to access the test database
db_connection = connection.connection
output = io.StringIO()
tables = _get_allowed_tables(db_connection, "twitch_")
_write_dump(output, db_connection, tables)
content = output.getvalue()
# Check for SQL structure
assert "PRAGMA foreign_keys=OFF;" in content
assert "BEGIN TRANSACTION;" in content
assert "COMMIT;" in content
assert "PRAGMA foreign_keys=ON;" in content
# Check for schema
assert "CREATE TABLE" in content
assert "twitch_organization" in content
# Check for data
assert "INSERT INTO" in content
assert "Write Test Org" in content
@pytest.mark.django_db
class TestDatasetBackupViews:
"""Tests for dataset backup list and download views."""
@pytest.fixture
def datasets_dir(self, tmp_path: Path) -> Path:
"""Create a temporary datasets directory.
Returns:
Path to the created datasets directory.
"""
datasets_dir = tmp_path / "datasets"
datasets_dir.mkdir()
return datasets_dir
@pytest.fixture
def sample_backup(self, datasets_dir: Path) -> Path:
"""Create a sample backup file.
Returns:
Path to the created backup file.
"""
backup_file = datasets_dir / "ttvdrops-20260210-120000.sql.zst"
with (
backup_file.open("wb") as raw_handle,
zstd.open(raw_handle, "w") as compressed,
io.TextIOWrapper(compressed, encoding="utf-8") as handle,
):
handle.write("-- Sample backup content\n")
return backup_file
def test_dataset_list_view_shows_backups(
self,
client: Client,
datasets_dir: Path,
sample_backup: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test that dataset list view displays backup files."""
monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent)
response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:dataset_backups"))
assert response.status_code == 200
assert b"ttvdrops-20260210-120000.sql.zst" in response.content
assert b"1 datasets" in response.content or b"1 dataset" in response.content
def test_dataset_list_view_empty_directory(
self,
client: Client,
datasets_dir: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test dataset list view with empty directory."""
monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent)
response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:dataset_backups"))
assert response.status_code == 200
assert b"No dataset backups found" in response.content
def test_dataset_list_view_sorts_by_date(
self,
client: Client,
datasets_dir: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test that backups are sorted by modification time."""
monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent)
# Create multiple backup files with different timestamps
older_backup = datasets_dir / "ttvdrops-20260210-100000.sql.zst"
newer_backup = datasets_dir / "ttvdrops-20260210-140000.sql.zst"
for backup in [older_backup, newer_backup]:
with (
backup.open("wb") as raw_handle,
zstd.open(raw_handle, "w") as compressed,
io.TextIOWrapper(compressed, encoding="utf-8") as handle,
):
handle.write("-- Test\n")
# Set explicit modification times to ensure proper sorting
older_time = 1707561600 # 2024-02-10 10:00:00 UTC
newer_time = 1707575400 # 2024-02-10 14:00:00 UTC
os.utime(older_backup, (older_time, older_time))
os.utime(newer_backup, (newer_time, newer_time))
response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:dataset_backups"))
content = response.content.decode()
newer_pos = content.find("20260210-140000")
older_pos = content.find("20260210-100000")
# Newer backup should appear first (sorted descending)
assert 0 < newer_pos < older_pos
def test_dataset_download_view_success(
self,
client: Client,
datasets_dir: Path,
sample_backup: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test successful backup download."""
monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent)
response: _MonkeyPatchedWSGIResponse = client.get(
reverse("twitch:dataset_backup_download", args=["ttvdrops-20260210-120000.sql.zst"]),
)
assert response.status_code == 200
# FileResponse may use application/x-compressed for .zst files
assert "attachment" in response["Content-Disposition"]
assert "ttvdrops-20260210-120000.sql.zst" in response["Content-Disposition"]
def test_dataset_download_prevents_path_traversal(
self,
client: Client,
datasets_dir: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test that path traversal attempts are blocked."""
monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent)
# Attempt path traversal
response = client.get(reverse("twitch:dataset_backup_download", args=["../../../etc/passwd"]))
assert response.status_code == 404
def test_dataset_download_rejects_invalid_extensions(
self,
client: Client,
datasets_dir: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test that files with invalid extensions cannot be downloaded."""
monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent)
# Create a file with invalid extension
invalid_file = datasets_dir / "malicious.exe"
invalid_file.write_text("not a backup")
response = client.get(reverse("twitch:dataset_backup_download", args=["malicious.exe"]))
assert response.status_code == 404
def test_dataset_download_file_not_found(
self,
client: Client,
datasets_dir: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test download returns 404 for non-existent file."""
monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent)
response = client.get(reverse("twitch:dataset_backup_download", args=["nonexistent.sql.zst"]))
assert response.status_code == 404
def test_dataset_list_view_shows_file_sizes(
self,
client: Client,
datasets_dir: Path,
sample_backup: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test that file sizes are displayed in human-readable format."""
monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent)
response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:dataset_backups"))
assert response.status_code == 200
# Should contain size information (bytes, KB, MB, or GB)
content = response.content.decode()
assert any(unit in content for unit in ["bytes", "KB", "MB", "GB"])
def test_dataset_list_ignores_non_zst_files(
self,
client: Client,
datasets_dir: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test that non-zst files are ignored in listing."""
monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent)
# Create various file types
(datasets_dir / "backup.sql.zst").write_bytes(b"valid")
(datasets_dir / "readme.txt").write_text("should be ignored")
(datasets_dir / "old_backup.gz").write_bytes(b"should be ignored")
response: _MonkeyPatchedWSGIResponse = client.get(reverse("twitch:dataset_backups"))
content = response.content.decode()
assert "backup.sql.zst" in content
assert "readme.txt" not in content
assert "old_backup.gz" not in content
def test_dataset_download_view_handles_subdirectories(
self,
client: Client,
datasets_dir: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test download works with files in subdirectories."""
monkeypatch.setattr(settings, "DATA_DIR", datasets_dir.parent)
# Create subdirectory with backup
subdir = datasets_dir / "2026" / "02"
subdir.mkdir(parents=True)
backup_file = subdir / "backup.sql.zst"
with (
backup_file.open("wb") as raw_handle,
zstd.open(raw_handle, "w") as compressed,
io.TextIOWrapper(compressed, encoding="utf-8") as handle,
):
handle.write("-- Test\n")
response: _MonkeyPatchedWSGIResponse = client.get(
reverse("twitch:dataset_backup_download", args=["2026/02/backup.sql.zst"]),
)
assert response.status_code == 200
assert "attachment" in response["Content-Disposition"]

View file

@ -27,6 +27,12 @@ urlpatterns: list[URLPattern] = [
path("channels/", views.ChannelListView.as_view(), name="channel_list"), path("channels/", views.ChannelListView.as_view(), name="channel_list"),
path("channels/<str:twitch_id>/", views.ChannelDetailView.as_view(), name="channel_detail"), path("channels/<str:twitch_id>/", views.ChannelDetailView.as_view(), name="channel_detail"),
path("debug/", views.debug_view, name="debug"), path("debug/", views.debug_view, name="debug"),
path("datasets/", views.dataset_backups_view, name="dataset_backups"),
path(
"datasets/download/<path:relative_path>/",
views.dataset_backup_download_view,
name="dataset_backup_download",
),
path("docs/rss/", views.docs_rss_view, name="docs_rss"), path("docs/rss/", views.docs_rss_view, name="docs_rss"),
path("emotes/", views.emote_gallery_view, name="emote_gallery"), path("emotes/", views.emote_gallery_view, name="emote_gallery"),
path("games/", views.GamesGridView.as_view(), name="game_list"), path("games/", views.GamesGridView.as_view(), name="game_list"),

View file

@ -3,6 +3,7 @@ from __future__ import annotations
import datetime import datetime
import json import json
import logging import logging
import operator
from collections import OrderedDict from collections import OrderedDict
from collections import defaultdict from collections import defaultdict
from copy import copy from copy import copy
@ -10,6 +11,7 @@ from typing import TYPE_CHECKING
from typing import Any from typing import Any
from typing import Literal from typing import Literal
from django.conf import settings
from django.core.paginator import EmptyPage from django.core.paginator import EmptyPage
from django.core.paginator import Page from django.core.paginator import Page
from django.core.paginator import PageNotAnInteger from django.core.paginator import PageNotAnInteger
@ -22,10 +24,12 @@ from django.db.models import Prefetch
from django.db.models import Q from django.db.models import Q
from django.db.models import Subquery from django.db.models import Subquery
from django.db.models.functions import Trim from django.db.models.functions import Trim
from django.http import FileResponse
from django.http import Http404 from django.http import Http404
from django.http import HttpRequest from django.http import HttpRequest
from django.http import HttpResponse from django.http import HttpResponse
from django.shortcuts import render from django.shortcuts import render
from django.template.defaultfilters import filesizeformat
from django.urls import reverse from django.urls import reverse
from django.utils import timezone from django.utils import timezone
from django.views.generic import DetailView from django.views.generic import DetailView
@ -52,10 +56,13 @@ from twitch.models import TimeBasedDrop
if TYPE_CHECKING: if TYPE_CHECKING:
from collections.abc import Callable from collections.abc import Callable
from os import stat_result
from pathlib import Path
from debug_toolbar.utils import QueryDict from debug_toolbar.utils import QueryDict
from django.db.models.query import QuerySet from django.db.models.query import QuerySet
logger: logging.Logger = logging.getLogger("ttvdrops.views") logger: logging.Logger = logging.getLogger("ttvdrops.views")
MIN_QUERY_LENGTH_FOR_FTS = 3 MIN_QUERY_LENGTH_FOR_FTS = 3
@ -319,6 +326,99 @@ def format_and_color_json(data: dict[str, Any] | list[dict] | str) -> str:
return highlight(formatted_code, JsonLexer(), HtmlFormatter()) return highlight(formatted_code, JsonLexer(), HtmlFormatter())
# MARK: /datasets/
def dataset_backups_view(request: HttpRequest) -> HttpResponse:
"""View to list database backup datasets on disk.
Args:
request: The HTTP request.
Returns:
HttpResponse: The rendered dataset backups page.
"""
datasets_root: Path = settings.DATA_DIR / "datasets"
search_dirs: list[Path] = [datasets_root]
seen_paths: set[str] = set()
datasets: list[dict[str, Any]] = []
for folder in search_dirs:
if not folder.exists() or not folder.is_dir():
continue
# Only include .zst files
for path in folder.glob("*.zst"):
if not path.is_file():
continue
key = str(path.resolve())
if key in seen_paths:
continue
seen_paths.add(key)
stat: stat_result = path.stat()
updated_at: datetime.datetime = datetime.datetime.fromtimestamp(
stat.st_mtime,
tz=timezone.get_current_timezone(),
)
try:
display_path = str(path.relative_to(datasets_root))
download_path: str | None = display_path
except ValueError:
display_path: str = path.name
download_path: str | None = None
datasets.append({
"name": path.name,
"display_path": display_path,
"download_path": download_path,
"size": filesizeformat(stat.st_size),
"updated_at": updated_at,
})
datasets.sort(key=operator.itemgetter("updated_at"), reverse=True)
context: dict[str, Any] = {
"datasets": datasets,
"data_dir": str(datasets_root),
"dataset_count": len(datasets),
}
return render(request, "twitch/dataset_backups.html", context)
def dataset_backup_download_view(request: HttpRequest, relative_path: str) -> FileResponse: # noqa: ARG001
"""Download a dataset backup from the data directory.
Args:
request: The HTTP request.
relative_path: The path relative to the data directory.
Returns:
FileResponse: The file response for the requested dataset.
Raises:
Http404: When the file is not found or is outside the data directory.
"""
allowed_endings = (".zst",)
datasets_root: Path = settings.DATA_DIR / "datasets"
requested_path: Path = (datasets_root / relative_path).resolve()
data_root: Path = datasets_root.resolve()
try:
requested_path.relative_to(data_root)
except ValueError as exc:
msg = "File not found"
raise Http404(msg) from exc
if not requested_path.exists() or not requested_path.is_file():
msg = "File not found"
raise Http404(msg)
if not requested_path.name.endswith(allowed_endings):
msg = "File not found"
raise Http404(msg)
return FileResponse(
requested_path.open("rb"),
as_attachment=True,
filename=requested_path.name,
)
def _enhance_drops_with_context(drops: QuerySet[TimeBasedDrop], now: datetime.datetime) -> list[dict[str, Any]]: def _enhance_drops_with_context(drops: QuerySet[TimeBasedDrop], now: datetime.datetime) -> list[dict[str, Any]]:
"""Helper to enhance drops with countdown and context. """Helper to enhance drops with countdown and context.