Skip to content

shaowei1/geek

Repository files navigation

Celery

1

8

9

  • 给用户发送验证邮件

  • 发送短信验证码

  • 解压大文件

  • 定时任务,比如每天定时统计网站的注册人数,也可以交给Celery周期性的处理。

  • 页面静态化

10

Celery Architecture

  1. celery client发送message给broker

  2. worker 从broker中消费消息,并将结果存储在result_end中

11

12

13

14

15

16

1整体过程

代码结构

jiegou

main.py

from celery import Celery
from kombu import Queue
import time


app = Celery('tasks', backend='redis://127.0.0.1:6379/6')
app.config_from_object('celeryconfig')

class CallbackTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        print "----%s is done" % task_id

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        pass

task1.py

from celery_app import app

@app.task(base=CallbackTask) 
def add(x, y):
    return x + y

task2.py

from celery_app import app

@app.task(base=CallbackTask) 
def multiply(x,y):
    return x * y

celeryconfig.py

from celery.schedules import crontab
from datetime import timedelta
from kombu import Queue
from kombu import Exchange

result_serializer = 'json'

broker_url = "redis://192.168.1.2"
result_backend = "mongodb://192.168.1.2/celery"

imports = (
    'celery_app.task1',
    'celery_app.task2'
)

beat_schedule = {
    'add-every-20-seconds': {
        'task': 'celery_app.task1.multiply',
        'schedule': timedelta(seconds=20),
        'args': (5, 7)
    },
    'add-every-10-seconds': {
        'task': 'celery_app.task2.add',
         #'schedule': crontab(hour=9, minute=10)
        'schedule': timedelta(seconds=10),
        'args': (23, 54)
    }
}

task_queues = (
    Queue('default', exchange=Exchange('default'), routing_key='default'),
    Queue('priority_high', exchange=Exchange('priority_high'), routing_key='priority_high'),
    Queue('priority_low', exchange=Exchange('priority_low'), routing_key='priority_low'),
)

task_routes = {
    'celery_app.task1.multiply': {'queue': 'priority_high', 'routing_key': 'priority_high'},
    'celery_app.task2.add': {'queue': 'priority_low', 'routing_key': 'priority_low'},
}

# 每分钟最大速率
# task_annotations = {
#     'task2.multiply': {'rate_limit': '10/m'}
# }

# 内存泄露
CELERYD_MAX_TASKS_PER_CHILD = 40 # 每个worker执行多少个任务就会死掉

Deploy

all

Celery Server and Client

启动多个不同的worker 执行不同的任务

  • 在同一台机器上,不同优先级的任务启动不同的worker去执行,有利于保证高优先级的任务得到更多的系统资源
    • 比如: 分开实时任务和定时任务
    • 分开执行频率高的任务和执行频率低的任务

Worker on Server1

消费priority_high事件

celery -A  celery_app.main worker -Q priority_high --concurrency=4 -l info -E -n worker1@%h

Worker on Server2

消费priority_high和priority_low事件

celery -A celery_app.main worker -Q priority_high,priority_low --concurrency=4 -l info -E -n worker2@%h

Client test.py

from celery_app.task1 import add
from celery_app.task1 import multiply

for i in range(50):
    add.delay(2, 2)
    multiply.delay(10,10)

eleyrun

监控

pip install flower
celery flower --broker=amqp://guest:guest@192.168.xx.xxx:5672//
http://server2_ip:5555

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published