文档章节

celery+rabbitmq+redis 分布任务队列探索(一)

丰_申
 丰_申
发布于 2017/03/22 00:33
字数 1380
阅读 600
收藏 0

首先我们说一下celery是什么以及它的组成:

Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。

celery由5个主要组件组成: 
producer: 任务发布者, 通过调用API向celery发布任务的程序 
celery beat: 任务调度, 根据配置文件发布定时任务 
worker: 实际执行任务的程序 
broker: 接受任务消息,存入队列再按顺序分发给worker执行 
backend: 存储结果的服务器

 

预期目标:

在Django中生成任务(生产者),通过rabbitmq记录消息,celery worker执行mq中任务(消费者),回写结果进redis。机器A:windows部署Django(1.10.2)、Celery(3.1.7),机器B:centos部署rabbitmq(3.5.3)、redis(3.2.8)。机器C:windows部署Celery(3.1.7)、redis(3.2.8)

实际结果:机器A:生成任务发送到机器B的MQ,然后机器C执行任务。

1、安装

机器A:

按照既定的软件列表安装Django、Celery:

pip install django==1.10.2

因celery4.0以上版本不支持windows 所以,我们先安装3.1.7 做一个调试用

pip install celery==3.1.7

因会对redis进行一些操作,所以我们这里也安装上redis

pip install redis

 

机器B:

通过源码安装redis 跟 rabbitmq。此处安装省略,可以参考我之前的MAC下redis 及rabbitmq安装手册。

因为centos6.5对于如此干净的环境,源码安装redis可能还好。但是rabbitmq的安装因为要依赖的比较多,比如erlang,所以这块的安装需要大家特别注意一下。在这里就省略不说了。。。

 

机器C:

对应的安装 celery  、redis

 

2.代码调试(python)

首先我们在机器A的Django中新建工程。

然后在app下创建一个目录queue,然后分别创建三个文件。celeryconfig.py、tasks.py、testcelery.py

celeryconfig.py 根据名字我们就可以知道,主要是用来做一个celery通用配置的操作。原本可以放在django的settings.py文件中,但是此处为了我们能够更好的理解celery,就单独拎出来了。

# -*- coding:utf-8 -*-

"""
@version: python2.7
@license: Apache Licence 
@software: PyCharm
@file: celeryconfig.py
@time: 2017/3/21 21:38
"""

from kombu import Exchange, Queue

BROKER_URL = 'amqp://autotest:autotest@10.83.14.245//'  # 消息存储数据存储rabbitmq
CELERY_RESULT_BACKEND = 'redis://10.83.14.245/0'  # 消息执行后的结果包括函数返回值的数据存储在仓库0

CELERY_QUEUES = (  # 定义任务队列
    Queue("default", Exchange("default"), routing_key="default"),
    Queue("for_task_realtime", Exchange("for_task_realtime"), routing_key="task_realtime"),
    Queue("for_task_timer", Exchange("for_task_timer"), routing_key="task_timer"),
    Queue("for_task_monitor", Exchange("for_task_monitor"), routing_key="task_monitor")
)

CELERY_ROUTES = {  # 定义routes用来决定不同的任务去哪一个queue
    # tasks.taskrealtime的消息会进入for_task_realtime队列
    'tasks.taskrealtime': {"queue": "for_task_realtime", "routing_key": "task_realtime"},
    # tasks.tasktimer的消息会进入for_task_timer队列
    'tasks.tasktimer': {"queue": "for_task_timer", "routing_key": "task_timer"},
    # tasks.taskmonitor的消息会进入for_task_monitor队列
    'tasks.taskmonitor': {"queue": "for_task_monitor", "routing_key": "task_monitor"},
}

首先设置了BROKER_URL 以及CELERY_RESULT_BACKEND ,接下来定义了三个Message Queue,并且指明了Queue对应的Exchange以及routing_key的值。

tasks.py 则是我们具体任务执行的方法,在这里会定义对应之前的消息生产函数:

# -*- coding:utf-8 -*-

"""
@version: python2.7
@license: Apache Licence 
@software: PyCharm
@file: tasks.py
@time: 2017/3/21 21:16
"""

from celery import Celery
import subprocess

app = Celery()
app.config_from_object("celeryconfig")


@app.task
def taskrealtime(x, y):
    return (x + y), 'taskrealtime'

@app.task
def tasktimer(x, y):
    return (x + y), 'tasktimer'


@app.task
def taskmonitor(x, y):
    return (x + y), 'taskmonitor'

为区分是调用的不同的queue,我在每个方法的后面分别加上了方法名如:taskmonitor

testcelery.py 是我们具体要用来调用的文件。

# -*- coding:utf-8 -*-

"""
@version: python2.7
@license: Apache Licence 
@software: PyCharm
@file: testcelery.py
@time: 2017/3/21 19:45
"""

from tasks import *

r = taskrealtime.delay(1, 1)
print r,r.get()

r = tasktimer.delay(2, 2)
print r,r.get()

r = taskmonitor.delay(3, 3)
print r,r.get()

在这里,我分别对三个方法进行了输出操作,便于在前端查看。

3.启动服务

首先启动redis、rabbitmq,具体的启动方法,在之前的文章中已经做了对应的操作。

但是接下来我们要启动celery-work去做消费。

拷贝tasks.py、celeryconfig.py文件到机器C中桌面建立的queue中,

然后cd进入到该层级celery worker -c 4 --loglevel=info -A tasks

C:\Users\Administrator\Desktop\queue>celery worker -c 4 --loglevel=info -A tasks

[2017-03-22 00:24:53,770: WARNING/MainProcess] c:\python27\lib\site-packages\cel
ery\apps\worker.py:159: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers
the ability to execute any command.  It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.

If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.


  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

 -------------- celery@WIN-QHB59C6FF53 v3.1.7 (Cipater)
---- **** -----
--- * ***  * -- Windows-2008ServerR2-6.1.7601-SP1
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         __main__:0x32cc7b8
- ** ---------- .> transport:   amqp://autotest:**@10.83.14.245:5672//
- ** ---------- .> results:     redis://10.83.14.245/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> default          exchange=default(direct) key=default
                .> for_task_monitor exchange=for_task_monitor(direct) key=task_m
onitor
                .> for_task_realtime exchange=for_task_realtime(direct) key=task
_realtime
                .> for_task_timer   exchange=for_task_timer(direct) key=task_tim
er

[tasks]
  . tasks.taskmonitor
  . tasks.taskrealtime
  . tasks.tasktimer

[2017-03-22 00:24:53,811: INFO/MainProcess] Connected to amqp://autotest:**@10.8
3.14.245:5672//
[2017-03-22 00:24:53,844: INFO/MainProcess] mingle: searching for neighbors
[2017-03-22 00:24:54,868: INFO/MainProcess] mingle: all alone
[2017-03-22 00:24:54,901: WARNING/MainProcess] celery@WIN-QHB59C6FF53 ready.

我们可以看到,celery woker已经启动。接下来,运行机器A中的testcelery.py,可以看到机器A中输出的打印信息如下:

33361d05-89c6-408e-b491-88c72a89519b (2, 'taskrealtime')
3540dc5f-69a1-45d5-a421-7dfe95af40aa (4, 'tasktimer')
0173d003-8807-4e2b-b9d0-591efa36c05d (6, 'taskmonitor')

这就是对于celery这个分布式任务调度系统的一个基本操作。希望对大家有所帮助,接下来,我们会进一步的探索celery更多的玩法。

© 著作权归作者所有

丰_申
粉丝 10
博文 179
码字总数 114855
作品 0
深圳
QA/测试工程师
私信 提问
Android线程,线程池使用及原理博文参考

通过以下文章的阅读,相信你对android的线程,线程池以及原理会有更加深刻的理解 这块的知识可以说是一大块,要撸清楚还是要花点时间,线程池中关联到的队列不仅在线程池中使用,在各种第三方网络...

xingjm8511
2016/06/23
25
0
ForkJoinPool 探索

介绍 “分而治之“是理清思路和解决问题的一个重要的方法。大到系统架构对功能模块的拆分,小到归并排序的实现,无一不在散发着分而治之的思想。在实现分而治之的算法的时候,我们通常使用递...

robinhan
01/10
0
0
浅谈分布式计算的开发与实现

介绍 分布式计算简单来说,是把一个大计算任务拆分成多个小计算任务分布到若干台机器上去计算,然后再进行结果汇总。 目的在于分析计算海量的数据,从雷达监测的海量历史信号中分析异常信号(...

看看这天
2016/10/20
30
0
根据调试工具看Vue源码之watch

官方定义 类型: 详细: 一个对象,键是需要观察的表达式,值是对应回调函数。值也可以是方法名,或者包含选项的对象。Vue 实例将会在实例化时调用 $watch(),遍历 watch 对象的每一个属性。...

tonychen
03/25
0
0
对线程池使用的一些探索

最近在项目过程中,有了一些对线程池使用的经验。 1.如何等待线程池中的全部任务执行完 可以通过ExecutorService的awaitTermination方法。在调用线程池的shutdown()方法后,再调用线程池的a...

tanjunjie
2016/11/18
608
0

没有更多内容

加载失败,请刷新页面

加载更多

用 Sphinx 搭建博客时,如何自定义插件?

之前有不少同学看过我的个人博客(http://python-online.cn),也根据我写的教程完成了自己个人站点的搭建。 点此:使用 Python 30分钟 教你快速搭建一个博客 为防有的同学不清楚 Sphinx ,这...

王炳明
昨天
3
0
黑客之道-40本书籍助你快速入门黑客技术免费下载

场景 黑客是一个中文词语,皆源自英文hacker,随着灰鸽子的出现,灰鸽子成为了很多假借黑客名义控制他人电脑的黑客技术,于是出现了“骇客”与"黑客"分家。2012年电影频道节目中心出品的电影...

badaoliumang
昨天
12
0
很遗憾,没有一篇文章能讲清楚线程的生命周期!

(手机横屏看源码更方便) 注:java源码分析部分如无特殊说明均基于 java8 版本。 简介 大家都知道线程是有生命周期,但是彤哥可以认真负责地告诉你网上几乎没有一篇文章讲得是完全正确的。 ...

彤哥读源码
昨天
13
0
jquery--DOM操作基础

本文转载于:专业的前端网站➭jquery--DOM操作基础 元素的访问 元素属性操作 获取:attr(name);$("#my").attr("src"); 设置:attr(name,value);$("#myImg").attr("src","images/1.jpg"); ......

前端老手
昨天
6
0
Django的ChoiceField和MultipleChoiceField错误提示,选择一个有效的选项

在表单验证时提示错误:选择一个有效的选项 例如有这样一个表单: class ProductForm(Form): category = fields.MultipleChoiceField( widget=widgets.SelectMultiple(), ...

编程老陆
昨天
13
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部