Celery 实现分布式任务队列

原创
2015/09/01 13:28
阅读数 7.4K

Celery 简介

Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了可以有多个 worker 的存在,队列表示其是异步操作,即存在一个产生任务提出需求的工头,和一群等着被分配工作的码农。

在 Python 中定义 Celery 的时候,我们要引入 Broker,中文中有中间人的意思,在这里 Broker 起到一个中间人的角色。在工头提出任务的时候,把所有的任务放到 Broker 里面,在 Broker 的另外一头,一群码农等着取出一个个任务准备着手做。

这种模式注定了整个系统会是个开环系统,工头对于码农们把任务做的怎样是不知情的。所以我们要引入 Backend 来保存每次任务的结果。这个 Backend 有点像我们的 Broker,也是存储任务的信息用的,只不过这里存的是那些任务的返回结果。我们可以选择只让错误执行的任务返回结果到 Backend,这样我们取回结果,便可以知道有多少任务执行失败了。

Show me the code

# 以下为 dispatcher.py
from worker import divide

# 1
divide.delay(1,2)
# 2
divide.apply_async((1, 2))


# 以下为 worker.py
from celery import Celery
app = Celery('tasks', backend='amqp://guest@localhost//', broker='redis://')

@app.task
def divide(x, y):
    print x / y

worker.py 中新建了一个 Celery 实例,以 amqp 作为 broker,以 redis 作为 backend 储存所有 task 执行的历史记录。我们在此例中使用 RabbitMQ 作为我们的消息队列服务器。

我们一方面通过命令行中执行以下语句来启动 celery 服务。

celery -A worker worker --loglevel=info

另外一方面,我们运行 dispatch.py,代码中将 worker 中的 divide 函数导入,再接着以两种方式将 task 启动。第一种方法中的 delay 方法接收了两个参数,实际为第二种方法的便捷调用,第二种方法在使用时,要将我们要传给 divide 的参数作为 tuple 放在第一个参数位置。

apply_async 的其他参数

apply_async 还支持其他参数,比如设置回调。

设置 task 实例的回调可以采用 link:

divide.apply_async((16, 2), link=divide.s(8))

首先计算 16 / 2,然后把结果 8 / 8,最后执行的结果等于 1. 所以这里的 link 是指向一个后继的调用函数,即完成当前 divide 以后再进行下一个 divide 操作。 除了 link 之外,还有 link_error,只会在该任务执行失败时调用。在本例中,我们可以在 divide 执行失败时,执行 link_error 所指的函数,这个函数就是错误消息的处理句柄,它会接收到一个 task 的 UUID,我们可以通过 UUID 来访问出错的任务的异常状态。

# dispatcher
divide.apply_async((1, 0), link_error=error_handler.s())

# 这里我们把 1 和 0 放到了 divide 函数中执行,引发了除零异常,继而执行 link_error 对应的 error_handler,error_handler 接收到 uuid 参数,通过 AsyncResult 生产一个结果实例,我们可以用 result.state 打印出该任务的执行情况,用 result.info 来获取异常的具体信息。

# worker
@app.task
def divide(x, y):
    print x/y

@app.task
def error_handler(uuid):
    result = AsyncResult(uuid)
    print 'task error {0}'.format(uuid)
# [2015-09-01 13:43:26,569: WARNING/Worker-2] task error 8e516377-a6c0-4a40-934f-dd1b0692c5fa
    print result.state
# [2015-09-01 13:43:26,572: WARNING/Worker-2] FAILURE
    print result.info
# [2015-09-01 13:43:26,572: WARNING/Worker-2] integer division or modulo by zero

跟踪异常的成因

异常的成因我们可以如上述代码所示将 result.info 打印出来而得知。然而我们并不能满足于此,仅仅知晓出错的 task 的 UUID 和其状态是不够的,我们想要知道发生错误时,task 的传入参数是什么。我一开始没有尝试出通过 UUID 来获取到原来的 1 和 0 这两个参数,后来我追踪了 apply_async 这个函数,位于 task.py 中,再跟踪到 trace.py 中的 build_tracer 函数,果然在 link_error 的调用时只传递了 UUID 一个参数,代码如下:

    def on_error(request, exc, uuid, state=FAILURE, call_errbacks=True):
        if propagate:
            raise
        I = Info(state, exc)
        R = I.handle_error_state(task, eager=eager)
        if call_errbacks:
            group(
                [signature(errback, app=app)
                 for errback in request.errbacks or []], app=app,
            ).apply_async((uuid, ))
            # ).apply_async((uuid, request.args))
            # 可以改成上一行注释中的代码,这样就可以在 error_handler 中得到原来调用的任务的输入参数了。
        return I, R, I.state, I.retval

此处通过修改 celery 源代码来获取出错时 task 的传入参数,但是方法并不好。于是我想能不能通过 UUID 直接获取到原来的 task,然后查看 task 的 args,但是这篇文档有些晦涩难懂,我就先放弃了,便发现了以下方法。

class DebugTask(Task):
    abstract = True
    def on_failure(self, *args, **kwargs):
        print self.request.args

@app.task(base=DebugTask)
def divide(x, y):
    print x / y

这段代码将原来应该继承的 Task 类中的 on_failure 函数重写,当 divide 函数发生异常时,该 task 的 state 自动变成 failure,Task 会自动调用 on_failure 函数,从而打印出传入的 args。

任务的远程调用

关于 task 的调用,celery 还提供了另外一种 send_task 方法。

Celery 作为分布式系统,自然就支持远程 worker,这个时候我们可以利用 send_task 这个函数,以函数名的方式调用 task。代码如下:

from celery import Celery

app = Celery()
app.config_from_object('celeryconfig')
app.send_task('worker.divide', args=[1, 0])

# send_task 也支持 link_error,这个官方文档上没写详细,这里需要调用 signature 函数来生产函数的 signature,这时 divide 的 UUID 和我们通过修改源代码得到的 args。

app.send_task('worker.divide', args=[1, 0], link_error=app.signature('worker.error_handler'))

这里我们没有通过 module 的方式把 divide 函数给 import 到程序中来,也就意味着我们可以不将 worker 放在与 dispatcher 同一目录下。我们的想法是,将 worker 放在另外一台服务器上,通过 celery 调用它,本地 django 项目调用这个 dispatcher 后,将 task 发送到远程服务器的队列中,然后由远程服务器中的 worker 处理。

配置文件

此时需要注意的是,这里的 dispatcher 是通过文件的方式配置的,其配置文件应与 worker 端配置文件吻合,如下:

# celeryconfig.py
# coding=utf-8

# Broker 设置 RabbitMQ
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_RESULT_BACKEND = 'redis://'

# Tasks 位于 worker.py 中
CELERY_IMPORTS = ('worker', )

# 默认为1次/秒的任务
CELERY_ANNOTATIONS = {'worker.divide': {'rate_limit': '1/s'}}

CELERY_ROUTES = {'worker.divide': {'queue': 'divide'},
                 'worker.error_handler': {'queue': 'error'}}

# 默认所有格式为 json
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']

使用了配置文件以后,我们在 worker 中也可以采用相同的方式定义 app,如下:

# coding=utf-8
from celery import Celery
app = Celery()
app.config_from_object('celeryconfig')

@app.task
def divide(x, y):
    print x / y

我们在配置文件中为 worker.divide 这个 task 指定了 divide 这个队列,为 error_handler 定义了 error 这个队列用于错误处理。 在启动 celery 的时候可以通过 -Q 参数指定队列。在终端中执行了以下命令后,celery 服务器就启动了,当前 celery 会监视 divide 队列,取出参数执行任务。而如果我们不启动另外一个 celery 来监视 error 队列,error_handler 就不会前往队列去拿参数执行。

celery -A worker worker --loglevel=info -Q divide

关于 Celery,网上英文教程都不多,更别说中文的了。

网上有些关于 Celery 性能的讨论,我暂且没有做分析,如果有更好的解决方案能够替代它,请留言告知。

如果发现本文有错误,请指正。

展开阅读全文
打赏
4
15 收藏
分享
加载中
celery里面延迟任务时间配置在数据库中,怎么实现?
2015/12/08 10:28
回复
举报
xh4n3博主

引用来自“bwh1994”的评论

我现在有一个进行抓取网页的任务,当我需要在多台机器上进行抓取时,是不是得把我的这个项目发布到多台机器上,还是只需要在一台机器上分发?
不好意思,之前看走眼了。我不知道你是想让多台机器抓取网页还是多台机器将网页抓取结果存储到数据库中。不过不管是哪种,都是可以实现的。前者的话,将 app.task 包装的那些函数分发到每台机器上,然后分别启动 worker 就可以啊。我觉得你说的应该是第二种吧,它是一个 dispatcher 端抓取所有网页,然后发送存储任务到 RabbitMQ 队列中,然后所有的 worker 都要从这个队列中取任务执行。我现在也在做类似的爬虫,直接用 pika 调用 RabbitMQ,而且 RabbitMQ 提供 exchange 等高级的路由特性,可以方便你设置不同的 worker 以处理不同的任务。
2015/10/17 19:29
回复
举报
我一开始是打算用celery做一个分布式管理容器的工具,但是增删节点太麻烦,现在在用不同的方案实施,我的博客也有一些关于celery的自己的实验笔记之类的
2015/10/17 18:19
回复
举报
我现在有一个进行抓取网页的任务,当我需要在多台机器上进行抓取时,是不是得把我的这个项目发布到多台机器上,还是只需要在一台机器上分发?
2015/10/17 13:07
回复
举报
我现在有一个进行抓取网页的任务,当我需要在多台机器上进行抓取时,是不是得把我的这个项目发布到多台机器上,还是只需要在一台机器上分发?
2015/10/17 13:07
回复
举报

引用来自“xh4n3”的评论

引用来自“bwh1994”的评论

我现在郁闷的就是Celery如何开启多个worker,多台机器开启worker,每个上面默认开启一个worker,官方文档里面只有在一台电脑上启动多个worker的?

官方文档中 workers guide里面有写喔 或者直接谷歌celery multiple worker就可以搜到
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1.%h $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2.%h $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3.%h 这个是在一台机器上开启多个worker,并没有说明在多台机器上如何搭建?
2015/10/17 13:01
回复
举报
xh4n3博主

引用来自“bwh1994”的评论

我现在郁闷的就是Celery如何开启多个worker,多台机器开启worker,每个上面默认开启一个worker,官方文档里面只有在一台电脑上启动多个worker的?

官方文档中 workers guide里面有写喔 或者直接谷歌celery multiple worker就可以搜到
2015/10/15 22:39
回复
举报

引用来自“xufinal”的评论

79写得很好
麻烦问一下celery的集群环境如何配置和调用,多个worker在多台机器上如何配置?
2015/10/15 20:28
回复
举报

引用来自“xufinal”的评论

79写得很好
麻烦问一下celery的集群环境如何配置和调用,多个worker在多台机器上如何配置?
2015/10/15 20:27
回复
举报
我现在郁闷的就是Celery如何开启多个worker,多台机器开启worker,每个上面默认开启一个worker,官方文档里面只有在一台电脑上启动多个worker的?
2015/10/15 20:27
回复
举报
更多评论
打赏
11 评论
15 收藏
4
分享
返回顶部
顶部