Tussilago/control_plane/local_test_deployment.py
2026-04-27 20:43:26 +02:00

212 lines
7.3 KiB
Python

from __future__ import annotations
import hashlib
import secrets
import socket
import time
from dataclasses import dataclass
from celery import chain
from django.conf import settings
from django.db import transaction
from django.utils import timezone
from control_plane.host_commands import HostCommandError
from control_plane.local_test_runtime import build_test_django_local_url
from control_plane.models import Deployment
from control_plane.models import DeploymentStatus
from control_plane.models import HostedSite
from control_plane.models import Tenant
from control_plane.observability import capture_test_deployment_diagnostics
from control_plane.tasks import mark_deployment_booting
from control_plane.tasks import mark_deployment_provisioning
from control_plane.tasks import provision_test_django_runtime
from control_plane.tasks import provision_test_runtime_services
from control_plane.tasks import run_test_django_runtime_provisioning
@dataclass(frozen=True, slots=True)
class CreatedTestDeployment:
"""Bundle control-plane rows created for one local test deployment."""
tenant: Tenant
hosted_site: HostedSite
deployment: Deployment
@property
def sentinel_url(self) -> str:
"""Return published local sentinel URL for this deployment."""
return build_test_django_local_url(self.deployment)
def create_test_deployment() -> CreatedTestDeployment:
"""Create a randomized tenant, hosted site, and deployment for local testing.
Returns:
Newly created tenant, hosted site, and deployment rows.
"""
tenant_token = secrets.token_hex(4)
site_token = secrets.token_hex(4)
tenant_slug = f"tenant-{tenant_token}"
site_slug = f"site-{site_token}"
idempotency_key = f"test-deploy-{secrets.token_hex(8)}"
guest_port = _find_free_port()
source_sha256 = hashlib.sha256(
f"{tenant_slug}:{site_slug}:{idempotency_key}".encode(),
).hexdigest()
with transaction.atomic():
tenant = Tenant.objects.create(
slug=tenant_slug,
display_name=f"Test Tenant {tenant_token.upper()}",
)
hosted_site = HostedSite.objects.create(
tenant=tenant,
slug=site_slug,
display_name=f"Test Site {site_token.upper()}",
wsgi_module="tenant_site.wsgi:application",
service_port=guest_port,
)
deployment = Deployment.objects.create(
hosted_site=hosted_site,
idempotency_key=idempotency_key,
source_sha256=source_sha256,
guest_port=guest_port,
)
return CreatedTestDeployment(
tenant=tenant,
hosted_site=hosted_site,
deployment=deployment,
)
def queue_test_deployment_provisioning(deployment_id: str) -> str:
"""Queue full local test deployment Celery chain and return task id.
Returns:
Celery task id for the queued orchestration chain.
"""
_ensure_async_broker_configuration()
result = chain(
mark_deployment_provisioning.si(deployment_id),
provision_test_runtime_services.si(deployment_id),
mark_deployment_booting.si(deployment_id),
provision_test_django_runtime.si(deployment_id),
).apply_async()
return str(result.id)
def wait_for_test_deployment(
deployment_id: str,
*,
timeout_seconds: float,
poll_interval_seconds: float,
) -> Deployment:
"""Wait until a queued local test deployment becomes running or fails.
Returns:
Deployment row in running state.
Raises:
RuntimeError: If deployment reaches failed state.
TimeoutError: If deployment does not finish before timeout.
"""
deadline = time.monotonic() + timeout_seconds
while True:
deployment = Deployment.objects.select_related("hosted_site__tenant").get(pk=deployment_id)
if deployment.status == DeploymentStatus.RUNNING.value:
return deployment
if deployment.status == DeploymentStatus.FAILED.value:
failure_message = deployment.last_error or "Local test deployment failed."
raise RuntimeError(failure_message)
if time.monotonic() >= deadline:
msg = (
"Timed out waiting for local test deployment "
f"{deployment.id} to become ready. Current status: {deployment.status}."
)
raise TimeoutError(msg)
time.sleep(poll_interval_seconds)
def provision_test_deployment(deployment_id: str) -> Deployment:
"""Run full local test deployment provisioning inline in the current process.
Returns:
Deployment row after provisioning completes.
Raises:
RuntimeError: If runtime provisioning fails.
TimeoutError: If the Django sentinel endpoint never becomes ready.
ValueError: If runtime configuration is invalid.
"""
try:
mark_deployment_provisioning.run(deployment_id)
provision_test_runtime_services.run(deployment_id)
mark_deployment_booting.run(deployment_id)
run_test_django_runtime_provisioning(deployment_id)
except HostCommandError as error:
message = _build_host_command_failure_message(error)
_mark_inline_deployment_failed(deployment_id, message=message)
_capture_test_deployment_diagnostics_snapshot(deployment_id)
raise RuntimeError(message) from error
except (RuntimeError, TimeoutError, ValueError) as error:
_mark_inline_deployment_failed(deployment_id, message=str(error))
_capture_test_deployment_diagnostics_snapshot(deployment_id)
raise
return Deployment.objects.select_related("hosted_site__tenant").get(pk=deployment_id)
def _ensure_async_broker_configuration() -> None:
broker_url = settings.CELERY_BROKER_URL
if not broker_url:
msg = "Async queueing requires TUSSILAGO_CELERY_BROKER_URL to be set to a real broker URL."
raise RuntimeError(msg)
if broker_url == "memory://":
msg = (
"Async queueing cannot use memory:// because the worker cannot consume tasks from another process. "
"Set TUSSILAGO_CELERY_BROKER_URL to a real broker such as Redis or RabbitMQ."
)
raise RuntimeError(msg)
def _mark_inline_deployment_failed(deployment_id: str, *, message: str) -> None:
deployment = Deployment.objects.get(pk=deployment_id)
if deployment.status == DeploymentStatus.FAILED.value:
return
deployment.status = DeploymentStatus.FAILED.value
deployment.last_error = message
deployment.finished_at = timezone.now()
deployment.save(update_fields=["status", "last_error", "finished_at", "updated_at"])
def _build_host_command_failure_message(error: HostCommandError) -> str:
lines = [str(error)]
if error.stderr.strip():
lines.append(error.stderr.strip())
elif error.stdout.strip():
lines.append(error.stdout.strip())
return "\n".join(lines)
def _capture_test_deployment_diagnostics_snapshot(deployment_id: str) -> None:
try:
capture_test_deployment_diagnostics(deployment_id)
except OSError:
return
except ValueError:
return
except Deployment.DoesNotExist:
return
def _find_free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as probe:
probe.bind(("127.0.0.1", 0))
probe.listen(1)
return int(probe.getsockname()[1])