from __future__ import annotations import hashlib import secrets import socket import time from dataclasses import dataclass from celery import chain from django.conf import settings from django.db import transaction from django.utils import timezone from control_plane.host_commands import HostCommandError from control_plane.local_test_runtime import build_test_django_local_url from control_plane.models import Deployment from control_plane.models import DeploymentStatus from control_plane.models import HostedSite from control_plane.models import Tenant from control_plane.observability import capture_test_deployment_diagnostics from control_plane.tasks import mark_deployment_booting from control_plane.tasks import mark_deployment_provisioning from control_plane.tasks import provision_test_django_runtime from control_plane.tasks import provision_test_runtime_services from control_plane.tasks import run_test_django_runtime_provisioning @dataclass(frozen=True, slots=True) class CreatedTestDeployment: """Bundle control-plane rows created for one local test deployment.""" tenant: Tenant hosted_site: HostedSite deployment: Deployment @property def sentinel_url(self) -> str: """Return published local sentinel URL for this deployment.""" return build_test_django_local_url(self.deployment) def create_test_deployment() -> CreatedTestDeployment: """Create a randomized tenant, hosted site, and deployment for local testing. Returns: Newly created tenant, hosted site, and deployment rows. """ tenant_token = secrets.token_hex(4) site_token = secrets.token_hex(4) tenant_slug = f"tenant-{tenant_token}" site_slug = f"site-{site_token}" idempotency_key = f"test-deploy-{secrets.token_hex(8)}" guest_port = _find_free_port() source_sha256 = hashlib.sha256( f"{tenant_slug}:{site_slug}:{idempotency_key}".encode(), ).hexdigest() with transaction.atomic(): tenant = Tenant.objects.create( slug=tenant_slug, display_name=f"Test Tenant {tenant_token.upper()}", ) hosted_site = HostedSite.objects.create( tenant=tenant, slug=site_slug, display_name=f"Test Site {site_token.upper()}", wsgi_module="tenant_site.wsgi:application", service_port=guest_port, ) deployment = Deployment.objects.create( hosted_site=hosted_site, idempotency_key=idempotency_key, source_sha256=source_sha256, guest_port=guest_port, ) return CreatedTestDeployment( tenant=tenant, hosted_site=hosted_site, deployment=deployment, ) def queue_test_deployment_provisioning(deployment_id: str) -> str: """Queue full local test deployment Celery chain and return task id. Returns: Celery task id for the queued orchestration chain. """ _ensure_async_broker_configuration() result = chain( mark_deployment_provisioning.si(deployment_id), provision_test_runtime_services.si(deployment_id), mark_deployment_booting.si(deployment_id), provision_test_django_runtime.si(deployment_id), ).apply_async() return str(result.id) def wait_for_test_deployment( deployment_id: str, *, timeout_seconds: float, poll_interval_seconds: float, ) -> Deployment: """Wait until a queued local test deployment becomes running or fails. Returns: Deployment row in running state. Raises: RuntimeError: If deployment reaches failed state. TimeoutError: If deployment does not finish before timeout. """ deadline = time.monotonic() + timeout_seconds while True: deployment = Deployment.objects.select_related("hosted_site__tenant").get(pk=deployment_id) if deployment.status == DeploymentStatus.RUNNING.value: return deployment if deployment.status == DeploymentStatus.FAILED.value: failure_message = deployment.last_error or "Local test deployment failed." raise RuntimeError(failure_message) if time.monotonic() >= deadline: msg = ( "Timed out waiting for local test deployment " f"{deployment.id} to become ready. Current status: {deployment.status}." ) raise TimeoutError(msg) time.sleep(poll_interval_seconds) def provision_test_deployment(deployment_id: str) -> Deployment: """Run full local test deployment provisioning inline in the current process. Returns: Deployment row after provisioning completes. Raises: RuntimeError: If runtime provisioning fails. TimeoutError: If the Django sentinel endpoint never becomes ready. ValueError: If runtime configuration is invalid. """ try: mark_deployment_provisioning.run(deployment_id) provision_test_runtime_services.run(deployment_id) mark_deployment_booting.run(deployment_id) run_test_django_runtime_provisioning(deployment_id) except HostCommandError as error: message = _build_host_command_failure_message(error) _mark_inline_deployment_failed(deployment_id, message=message) _capture_test_deployment_diagnostics_snapshot(deployment_id) raise RuntimeError(message) from error except (RuntimeError, TimeoutError, ValueError) as error: _mark_inline_deployment_failed(deployment_id, message=str(error)) _capture_test_deployment_diagnostics_snapshot(deployment_id) raise return Deployment.objects.select_related("hosted_site__tenant").get(pk=deployment_id) def _ensure_async_broker_configuration() -> None: broker_url = settings.CELERY_BROKER_URL if not broker_url: msg = "Async queueing requires TUSSILAGO_CELERY_BROKER_URL to be set to a real broker URL." raise RuntimeError(msg) if broker_url == "memory://": msg = ( "Async queueing cannot use memory:// because the worker cannot consume tasks from another process. " "Set TUSSILAGO_CELERY_BROKER_URL to a real broker such as Redis or RabbitMQ." ) raise RuntimeError(msg) def _mark_inline_deployment_failed(deployment_id: str, *, message: str) -> None: deployment = Deployment.objects.get(pk=deployment_id) if deployment.status == DeploymentStatus.FAILED.value: return deployment.status = DeploymentStatus.FAILED.value deployment.last_error = message deployment.finished_at = timezone.now() deployment.save(update_fields=["status", "last_error", "finished_at", "updated_at"]) def _build_host_command_failure_message(error: HostCommandError) -> str: lines = [str(error)] if error.stderr.strip(): lines.append(error.stderr.strip()) elif error.stdout.strip(): lines.append(error.stdout.strip()) return "\n".join(lines) def _capture_test_deployment_diagnostics_snapshot(deployment_id: str) -> None: try: capture_test_deployment_diagnostics(deployment_id) except OSError: return except ValueError: return except Deployment.DoesNotExist: return def _find_free_port() -> int: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as probe: probe.bind(("127.0.0.1", 0)) probe.listen(1) return int(probe.getsockname()[1])