-
给用户发送验证邮件
-
发送短信验证码
-
解压大文件
-
定时任务,比如每天定时统计网站的注册人数,也可以交给Celery周期性的处理。
-
页面静态化
-
celery client发送message给broker
-
worker 从broker中消费消息,并将结果存储在result_end中
from celery import Celery
from kombu import Queue
import time
app = Celery('tasks', backend='redis://127.0.0.1:6379/6')
app.config_from_object('celeryconfig')
class CallbackTask(Task):
def on_success(self, retval, task_id, args, kwargs):
print "----%s is done" % task_id
def on_failure(self, exc, task_id, args, kwargs, einfo):
pass
from celery_app import app
@app.task(base=CallbackTask)
def add(x, y):
return x + y
from celery_app import app
@app.task(base=CallbackTask)
def multiply(x,y):
return x * y
from celery.schedules import crontab
from datetime import timedelta
from kombu import Queue
from kombu import Exchange
result_serializer = 'json'
broker_url = "redis://192.168.1.2"
result_backend = "mongodb://192.168.1.2/celery"
imports = (
'celery_app.task1',
'celery_app.task2'
)
beat_schedule = {
'add-every-20-seconds': {
'task': 'celery_app.task1.multiply',
'schedule': timedelta(seconds=20),
'args': (5, 7)
},
'add-every-10-seconds': {
'task': 'celery_app.task2.add',
#'schedule': crontab(hour=9, minute=10)
'schedule': timedelta(seconds=10),
'args': (23, 54)
}
}
task_queues = (
Queue('default', exchange=Exchange('default'), routing_key='default'),
Queue('priority_high', exchange=Exchange('priority_high'), routing_key='priority_high'),
Queue('priority_low', exchange=Exchange('priority_low'), routing_key='priority_low'),
)
task_routes = {
'celery_app.task1.multiply': {'queue': 'priority_high', 'routing_key': 'priority_high'},
'celery_app.task2.add': {'queue': 'priority_low', 'routing_key': 'priority_low'},
}
# 每分钟最大速率
# task_annotations = {
# 'task2.multiply': {'rate_limit': '10/m'}
# }
# 内存泄露
CELERYD_MAX_TASKS_PER_CHILD = 40 # 每个worker执行多少个任务就会死掉
- 在同一台机器上,不同优先级的任务启动不同的worker去执行,有利于保证高优先级的任务得到更多的系统资源
- 比如: 分开实时任务和定时任务
- 分开执行频率高的任务和执行频率低的任务
消费priority_high事件
celery -A celery_app.main worker -Q priority_high --concurrency=4 -l info -E -n worker1@%h
消费priority_high和priority_low事件
celery -A celery_app.main worker -Q priority_high,priority_low --concurrency=4 -l info -E -n worker2@%h
from celery_app.task1 import add
from celery_app.task1 import multiply
for i in range(50):
add.delay(2, 2)
multiply.delay(10,10)
pip install flower
celery flower --broker=amqp://guest:guest@192.168.xx.xxx:5672//
http://server2_ip:5555