212 lines
7.3 KiB
Python
212 lines
7.3 KiB
Python
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import secrets
|
|
import socket
|
|
import time
|
|
from dataclasses import dataclass
|
|
|
|
from celery import chain
|
|
from django.conf import settings
|
|
from django.db import transaction
|
|
from django.utils import timezone
|
|
|
|
from control_plane.host_commands import HostCommandError
|
|
from control_plane.local_test_runtime import build_test_django_local_url
|
|
from control_plane.models import Deployment
|
|
from control_plane.models import DeploymentStatus
|
|
from control_plane.models import HostedSite
|
|
from control_plane.models import Tenant
|
|
from control_plane.observability import capture_test_deployment_diagnostics
|
|
from control_plane.tasks import mark_deployment_booting
|
|
from control_plane.tasks import mark_deployment_provisioning
|
|
from control_plane.tasks import provision_test_django_runtime
|
|
from control_plane.tasks import provision_test_runtime_services
|
|
from control_plane.tasks import run_test_django_runtime_provisioning
|
|
|
|
|
|
@dataclass(frozen=True, slots=True)
|
|
class CreatedTestDeployment:
|
|
"""Bundle control-plane rows created for one local test deployment."""
|
|
|
|
tenant: Tenant
|
|
hosted_site: HostedSite
|
|
deployment: Deployment
|
|
|
|
@property
|
|
def sentinel_url(self) -> str:
|
|
"""Return published local sentinel URL for this deployment."""
|
|
return build_test_django_local_url(self.deployment)
|
|
|
|
|
|
def create_test_deployment() -> CreatedTestDeployment:
|
|
"""Create a randomized tenant, hosted site, and deployment for local testing.
|
|
|
|
Returns:
|
|
Newly created tenant, hosted site, and deployment rows.
|
|
"""
|
|
tenant_token = secrets.token_hex(4)
|
|
site_token = secrets.token_hex(4)
|
|
tenant_slug = f"tenant-{tenant_token}"
|
|
site_slug = f"site-{site_token}"
|
|
idempotency_key = f"test-deploy-{secrets.token_hex(8)}"
|
|
guest_port = _find_free_port()
|
|
source_sha256 = hashlib.sha256(
|
|
f"{tenant_slug}:{site_slug}:{idempotency_key}".encode(),
|
|
).hexdigest()
|
|
|
|
with transaction.atomic():
|
|
tenant = Tenant.objects.create(
|
|
slug=tenant_slug,
|
|
display_name=f"Test Tenant {tenant_token.upper()}",
|
|
)
|
|
hosted_site = HostedSite.objects.create(
|
|
tenant=tenant,
|
|
slug=site_slug,
|
|
display_name=f"Test Site {site_token.upper()}",
|
|
wsgi_module="tenant_site.wsgi:application",
|
|
service_port=guest_port,
|
|
)
|
|
deployment = Deployment.objects.create(
|
|
hosted_site=hosted_site,
|
|
idempotency_key=idempotency_key,
|
|
source_sha256=source_sha256,
|
|
guest_port=guest_port,
|
|
)
|
|
|
|
return CreatedTestDeployment(
|
|
tenant=tenant,
|
|
hosted_site=hosted_site,
|
|
deployment=deployment,
|
|
)
|
|
|
|
|
|
def queue_test_deployment_provisioning(deployment_id: str) -> str:
|
|
"""Queue full local test deployment Celery chain and return task id.
|
|
|
|
Returns:
|
|
Celery task id for the queued orchestration chain.
|
|
"""
|
|
_ensure_async_broker_configuration()
|
|
result = chain(
|
|
mark_deployment_provisioning.si(deployment_id),
|
|
provision_test_runtime_services.si(deployment_id),
|
|
mark_deployment_booting.si(deployment_id),
|
|
provision_test_django_runtime.si(deployment_id),
|
|
).apply_async()
|
|
return str(result.id)
|
|
|
|
|
|
def wait_for_test_deployment(
|
|
deployment_id: str,
|
|
*,
|
|
timeout_seconds: float,
|
|
poll_interval_seconds: float,
|
|
) -> Deployment:
|
|
"""Wait until a queued local test deployment becomes running or fails.
|
|
|
|
Returns:
|
|
Deployment row in running state.
|
|
|
|
Raises:
|
|
RuntimeError: If deployment reaches failed state.
|
|
TimeoutError: If deployment does not finish before timeout.
|
|
"""
|
|
deadline = time.monotonic() + timeout_seconds
|
|
while True:
|
|
deployment = Deployment.objects.select_related("hosted_site__tenant").get(pk=deployment_id)
|
|
if deployment.status == DeploymentStatus.RUNNING.value:
|
|
return deployment
|
|
if deployment.status == DeploymentStatus.FAILED.value:
|
|
failure_message = deployment.last_error or "Local test deployment failed."
|
|
raise RuntimeError(failure_message)
|
|
if time.monotonic() >= deadline:
|
|
msg = (
|
|
"Timed out waiting for local test deployment "
|
|
f"{deployment.id} to become ready. Current status: {deployment.status}."
|
|
)
|
|
raise TimeoutError(msg)
|
|
|
|
time.sleep(poll_interval_seconds)
|
|
|
|
|
|
def provision_test_deployment(deployment_id: str) -> Deployment:
|
|
"""Run full local test deployment provisioning inline in the current process.
|
|
|
|
Returns:
|
|
Deployment row after provisioning completes.
|
|
|
|
Raises:
|
|
RuntimeError: If runtime provisioning fails.
|
|
TimeoutError: If the Django sentinel endpoint never becomes ready.
|
|
ValueError: If runtime configuration is invalid.
|
|
"""
|
|
try:
|
|
mark_deployment_provisioning.run(deployment_id)
|
|
provision_test_runtime_services.run(deployment_id)
|
|
mark_deployment_booting.run(deployment_id)
|
|
run_test_django_runtime_provisioning(deployment_id)
|
|
except HostCommandError as error:
|
|
message = _build_host_command_failure_message(error)
|
|
_mark_inline_deployment_failed(deployment_id, message=message)
|
|
_capture_test_deployment_diagnostics_snapshot(deployment_id)
|
|
raise RuntimeError(message) from error
|
|
except (RuntimeError, TimeoutError, ValueError) as error:
|
|
_mark_inline_deployment_failed(deployment_id, message=str(error))
|
|
_capture_test_deployment_diagnostics_snapshot(deployment_id)
|
|
raise
|
|
|
|
return Deployment.objects.select_related("hosted_site__tenant").get(pk=deployment_id)
|
|
|
|
|
|
def _ensure_async_broker_configuration() -> None:
|
|
broker_url = settings.CELERY_BROKER_URL
|
|
if not broker_url:
|
|
msg = "Async queueing requires TUSSILAGO_CELERY_BROKER_URL to be set to a real broker URL."
|
|
raise RuntimeError(msg)
|
|
|
|
if broker_url == "memory://":
|
|
msg = (
|
|
"Async queueing cannot use memory:// because the worker cannot consume tasks from another process. "
|
|
"Set TUSSILAGO_CELERY_BROKER_URL to a real broker such as Redis or RabbitMQ."
|
|
)
|
|
raise RuntimeError(msg)
|
|
|
|
|
|
def _mark_inline_deployment_failed(deployment_id: str, *, message: str) -> None:
|
|
deployment = Deployment.objects.get(pk=deployment_id)
|
|
if deployment.status == DeploymentStatus.FAILED.value:
|
|
return
|
|
|
|
deployment.status = DeploymentStatus.FAILED.value
|
|
deployment.last_error = message
|
|
deployment.finished_at = timezone.now()
|
|
deployment.save(update_fields=["status", "last_error", "finished_at", "updated_at"])
|
|
|
|
|
|
def _build_host_command_failure_message(error: HostCommandError) -> str:
|
|
lines = [str(error)]
|
|
if error.stderr.strip():
|
|
lines.append(error.stderr.strip())
|
|
elif error.stdout.strip():
|
|
lines.append(error.stdout.strip())
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _capture_test_deployment_diagnostics_snapshot(deployment_id: str) -> None:
|
|
try:
|
|
capture_test_deployment_diagnostics(deployment_id)
|
|
except OSError:
|
|
return
|
|
except ValueError:
|
|
return
|
|
except Deployment.DoesNotExist:
|
|
return
|
|
|
|
|
|
def _find_free_port() -> int:
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as probe:
|
|
probe.bind(("127.0.0.1", 0))
|
|
probe.listen(1)
|
|
return int(probe.getsockname()[1])
|