Refactor celery.py to enhance type hinting

This commit is contained in:
Joakim Hellsén 2026-03-23 23:32:24 +01:00
commit dfcb8975b8
Signed by: Joakim Hellsén
SSH key fingerprint: SHA256:/9h/CsExpFp+PRhsfA0xznFx2CGfTT5R/kpuFfUgEQk

View file

@ -1,11 +1,43 @@
import os import os
from typing import TYPE_CHECKING
from celery import Celery from celery import Celery
from celery import Signature
from celery.app.task import Task
from celery.contrib.abortable import AbortableAsyncResult
from celery.contrib.abortable import AbortableTask
from celery.contrib.django.task import DjangoTask
from celery.local import class_property
from celery.result import AsyncResult
from celery.utils.objects import FallbackContext
if TYPE_CHECKING:
from config.settings import Any
classes = [
Celery,
Task,
DjangoTask,
AbortableTask,
AsyncResult,
AbortableAsyncResult,
Signature,
FallbackContext,
class_property,
]
for cls in classes:
setattr( # noqa: B010
cls,
"__class_getitem__",
classmethod(lambda cls, *args, **kwargs: cls), # noqa: ARG005
)
# Set the default Django settings module for the 'celery' program. # Set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings") os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
app = Celery("config") app: Celery[Task[Any, Any]] = Celery("config")
# Using a string here means the worker doesn't have to serialize # Using a string here means the worker doesn't have to serialize
# the configuration object to child processes. # the configuration object to child processes.