基本配置
一、依赖
celery==5.4.0
Django==5.1.4
django-celery-beat==2.7.0
django-celery-results==2.5.1
django-redis==5.4.0
flower==2.0.1
mysqlclient==2.2.6
二、配置 settings.py
1. INSTALLED_APPS
INSTALLED_APPS = [
'django_celery_results',
'django_celery_beat',
]
2. Celery 配置(含时区配置)
from celery.schedules import crontab
from django.conf.locale.zh_Hans import formats as zh_formats
# 时区语言配置
LANGUAGE_CODE = "zh-hans"
TIME_ZONE = "Asia/Shanghai"
USE_I18N = True
USE_TZ = False
DATETIME_FORMAT = "Y-m-d H:i:s"
zh_formats.DATETIME_FORMAT = "Y年m月d日 H:i:s"
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
USE_THOUSAND_SEPARATOR = True
# ---------------------------- celery 配置 ----------------------------
# 最重要的配置,设置消息broker,格式为:db://user:password@host:port/dbname
CELERY_BROKER_URL = "redis://:xxxxxxxx@host.docker.internal:6379/0"
# celery时区设置,建议与Django settings中TIME_ZONE同样时区,防止时差
# Django设置时区需同时设置USE_TZ=True和TIME_ZONE = 'Asia/Shanghai'
CELERY_TIMEZONE = TIME_ZONE
CELERY_ENABLE_UTC = False
DJANGO_CELERY_BEAT_TZ_AWARE = False
# 为 django_celery_results 存储 Celery 任务执行结果设置后台
# 格式为:db+scheme://user:password@host:port/dbname
# 支持数据库django-db和缓存django-cache存储任务状态及结果
CELERY_RESULT_BACKEND = "django-db"
# celery内容等消息的格式设置,默认json
CELERY_ACCEPT_CONTENT = ['application/x-python-serialize', 'application/json', ]
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'json'
# 为任务设置超时时间,单位秒。超时即中止,执行下个任务。
# CELERY_TASK_TIME_LIMIT = 5
# 为存储结果设置过期日期,默认1天过期。如果beat开启,Celery每天会自动清除。
# 设为0,存储结果永不过期
CELERY_RESULT_EXPIRES = 60 * 60 * 24 * 30
# CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 * 30 # 后端存储的任务时长,自动删除数据库中的任务数据,单位秒
CELERYD_MAX_TASKS_PER_CHILD = 1000 # 每个worker执行1000次任务后,自动重启worker,防止任务占用太多内存导致内存泄漏
CELERY_RESULT_EXTENDED = True
# # 任务限流
# CELERY_TASK_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
# # Worker并发数量,一般默认CPU核数,可以不设置
CELERY_WORKER_CONCURRENCY = 15
# # 每个worker执行了多少任务就会死掉,默认是无限的
# CELERY_WORKER_MAX_TASKS_PER_CHILD = 200
# https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-result_expires
CELERY_WORKER_HIJACK_ROOT_LOGGER = False
# Celery Beat 配置(不配置,后台添加的定时任务不允许)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
三、celery.py 配置
celery.py
和 settings.py
同级目录
import json
import os
from datetime import timedelta
from celery import Celery
from celery.schedules import crontab
# 设置 Django 的默认设置模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'great_unity_platform.settings')
app = Celery('great_unity_platform')
# 使用与 Django 设置相同的配置文件
app.config_from_object('django.conf:settings', namespace='CELERY')
# 从所有已注册的 Django app 中加载任务模块
app.autodiscover_tasks()
app.conf.beat_schedule = {
"test_crontab": {
"task": "test_crontab",
'schedule': crontab(hour="3,9,15,21", minute="1"),
'args': ()
},
'test_interval': {
"task": "test_interval",
'schedule': 120.0, # 每120秒执行1次
'args': ()
},
'test_timedelta': {
"task": "test_timedelta",
'schedule': timedelta(seconds=120), # 每20s执行1次
'args': ()
},
}
四、说明
1. 时区说明
2. 存储结果说明
3. 存储结果中文unicode说明
django-celery-results 保存的任务结果 Task results 中,Task Positional Arguments
、Task Named Arguments
、Result Data
三个字段中的中文,显示为\u6570\u91cf
类似的 unicode 结果。
原因:因为保存结果的时候,json.dumps 默认的 ensure_ascii 参数为 True,导致中文被编码为 unicode 字符。
解决:在celery.py
配置中,os.environ.setdefault
前添加配置:
import json
from kombu.serialization import register
def json_serializer(data):
return json.dumps(data, ensure_ascii=False)
def json_deserializer(data):
return json.loads(data)
register('json', json_serializer, json_deserializer, 'application/json')
4. Celery 异步报错 Object of type xxx is not JSON serializable
Celery 任务的方法接收的参数,不能被json序列化,有3种解决方案。
- 自定义 encoder 和 decoder,类似
存储结果中文unicode说明
中,自定义序列化器 - 在传递参数前转为json,在方法接收参数后,再自行反序列号
- 使用
pickle
进行序列化,Celery内置有json和pickle两种序列化器,默认是json,将CELERY_TASK_SERIALIZER
修改为pickle。但是要注意CELERY_ACCEPT_CONTENT
中要配置application/x-python-serialize
。CELERY_RESULT_SERIALIZER
如果没有特殊要求,最好是json格式,不然结果中保存的是pickle字符串,在后台查看任务详情是看不懂的。
5. 线程说明
6. Beat 定时任务配置说明
作者:六楼的雨 创建时间:2024-12-24 16:53
最后编辑:六楼的雨 更新时间:2024-12-24 17:28
最后编辑:六楼的雨 更新时间:2024-12-24 17:28