#celery#周期性任务

原创
2015/05/24 15:15
阅读数 1.4W

玩了一个星期的clannad,是时候干点事了。


折腾了下celery周期性任务:

celery提供了一个叫celery beat的服务,用于定时驱使worker执行任务。也就是说,如果本地没有活动的worker,它将不会得到任何执行结果,他只是负责把任务消息传到rabbitmq,一旦启动一个可用worker,则自动从rabbitmq获取任务信息并执行。

与此配置相关的参数是CELERYBEAT_SCHEDULE,我把我的celery应用proj的所有配置内容都放置在一个config.py文件中:

from __future__ import absolute_import
from datetime import timedelta
from celery.schedules import crontab

CELERY_TASK_RESULT_EXPIRES=3600
CELERY_TASK_SERIALIZER='json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_RESULT_SERIALIZER='json'

CELERYBEAT_SCHEDULE = {
    'add-every-1-min': {
        'task': 'proj.agent.add',
        'schedule': crontab(),
        'args': (16, 16),
    },
}
#CELERYBEAT_SCHEDULE = {
#    'add-every-2-seconds': {
#        'task': 'proj.agent.add',
#        'schedule': timedelta(seconds=3),
#        'args': (16, 16)
#    },
#}

CELERY_TIMEZONE = 'UTC'

目前的定时任务是:

add-every-4-s

task指定了相应的任务:proj目录下agent模块的add函数

schedule指定了定时工具,这里是celery.schedules的crontab

args是任务的参数


此时我们回到proj所在的目录中,启动一个worker:

root@workgroup0:~/celeryapp/configtest# ls
celerybeat-schedule  logging  proj
root@workgroup0:~/celeryapp/configtest# celery -A proj worker --loglevel=INFO
/usr/local/lib/python2.7/dist-packages/celery/platforms.py:766: RuntimeWarning: You are running the worker with superuser privileges, which is
absolutely not recommended!

Please specify a different user using the -u option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,
 
 -------------- celery@workgroup0.hzg.com v3.1.17 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         proj:0x7f0027635510
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     amqp://guest@loaclhost//
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . proj.agent.add
  . proj.agent.mul
  . proj.agent.writefile
  . proj.agent.xsum

[2015-05-24 15:00:37,873: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2015-05-24 15:00:37,940: INFO/MainProcess] mingle: searching for neighbors
[2015-05-24 15:00:38,980: INFO/MainProcess] mingle: all alone
[2015-05-24 15:00:39,021: WARNING/MainProcess] celery@workgroup0.hzg.com ready.

worker启动成功,此时再开一个终端,启动beat服务:

root@workgroup0:~/celeryapp/configtest# celery -A proj beat -s celerybeat-schedule #这里的celerybeat-schedule指定一个记录文件

celery beat v3.1.17 (Cipater) is starting.
__    -    ... __   -        _
Configuration ->
    . broker -> amqp://guest:**@localhost:5672//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> now (0s)
[2015-05-24 15:02:53,761: INFO/MainProcess] beat: Starting...
[2015-05-24 15:03:00,000: INFO/MainProcess] Scheduler: Sending due task add-every-1-min (proj.agent.add)

#已经相隔1min了

[2015-05-24 15:04:00,066: INFO/MainProcess] Scheduler: Sending due task add-every-1-min (proj.agent.add)

返回看看worker的输出:

[2015-05-24 15:01:50,827: INFO/MainProcess] Task proj.agent.add[9b6f962a-9b66-4fde-916f-fc5a951ad599] succeeded in 0.0342152439989s: {'value': '32'}
[2015-05-24 15:02:24,923: INFO/MainProcess] Received task: proj.agent.add[e4b9840b-09f6-4db6-88c1-2a418b11d393]
[2015-05-24 15:02:24,947: INFO/MainProcess] Task proj.agent.add[e4b9840b-09f6-4db6-88c1-2a418b11d393] succeeded in 0.0200459280004s: {'value': '32'}
[2015-05-24 15:03:00,015: INFO/MainProcess] Received task: proj.agent.add[98f44dd1-e6e2-4457-bfd6-ff59d0ee6d2f]
[2015-05-24 15:03:00,031: INFO/MainProcess] Task proj.agent.add[98f44dd1-e6e2-4457-bfd6-ff59d0ee6d2f] succeeded in 0.0125673500006s: {'value': '32'}

这就是周期性任务的执行。


遇到的坑:

在配置文件中,from __future__ import absolute_import这一行很关键,如果没有这一行,

from celery.schedules import crontab

这个命令执行时会报错,celery beat无法正常启动。


补充:

默认情况下,celery beat使用UTC时区,你也可以配置其他时区:

CELERY_TIMEZONE = 'Europe/London'


关于设置任务执行周期,你可以通过datetime的timedelta设置,可以让任务执行间隔精确到秒,相应的配置如下:

CELERYBEAT_SCHEDULE = {
    'add-every-2-seconds': {
        'task': 'proj.agent.add',
        'schedule': timedelta(seconds=3),
        'args': (16, 16)
    },
}

也可以用crontab风格的:

CELERYBEAT_SCHEDULE = {
    # Executes every Monday morning at 7:30 A.M
    'add-every-1-min': {
        'task': 'proj.agent.add',
        'schedule': crontab(),
        'args': (16, 16),
    },
}


关于一个CELERYBEAT_SCHEDULE的可以配置的参数,以及crontab的详细示例,请参见celery官方文档。


关于启动celery beat的tips,我这里只贴原文:

Starting the Scheduler

To start the celery beat service:

$ celery -A proj beat

You can also start embed beat inside the worker by enabling workers -B option, this is convenient if you will never run more than one worker node, but it’s not commonly used and for that reason is not recommended for production use:

$ celery -A proj worker -B

Beat needs to store the last run times of the tasks in a local database file (namedcelerybeat-schedule by default), so it needs access to write in the current directory, or alternatively you can specify a custom location for this file:

$ celery -A proj beat -s /home/celery/var/run/celerybeat-schedule


展开阅读全文
打赏
2
4 收藏
分享
加载中
ClaraMic博主

引用来自“bwh1994”的评论

回去了请教你

你有qq之类的吗,这样解释好不方便
2015/12/08 17:39
回复
举报
回去了请教你
2015/12/08 17:14
回复
举报

引用来自“Hochikong”的评论

引用来自“bwh1994”的评论

你的意思时从数据库将设置的时间查询出来然后设置到delay?

其实一个嵌套函数就可以解决问题→_→
一个嵌套函数在用户只做了一次新增的操作下可以,如果他修改下一次的延迟时间,嵌套函数就不行了,不能保证一致
2015/12/08 17:07
回复
举报
celery方法调用的时候没有delay参数 啊?
2015/12/08 16:42
回复
举报
@celery.task
def checkLz(data, bigTaskId, taskResultId):
taskResult = TaskResult.getOne(TaskResult.taskResultId == taskResultId)
companyName = data0
url = data1
if url != '':
if (validateUrl(url)):
shortUrl = getDomain(url)
isExistsWebsiteResult = Website.getOne(Website.domain == shortUrl)
isExistsWebId = Website.webId
# 子任务
subTask = TaskInfo()
subTask.taskResultId = taskResult
subTask.state = ''
subTask.save(force_insert=True)
subId = subTask.id
# 数据库中无此url的记录
if isExistsWebsiteResult is None:
# 抓取检测
fetchWebsite.delay(companyName, bigTaskId, shortUrl, subId, url)
else:
# 如果当前网站更新时间小于过期时间,说明不用重新进行抓取并检查
expired = Configs.getOne(Configs.type == 'update').expired
# 获取当前时间
currentTime = datetime.datetime.now()
2015/12/08 16:41
回复
举报
该评论暂时无法显示,详情咨询 QQ 群:912889742
我的任务现在是这样的,有一个大任务下有若干个子任务,如果调用的时候设置了任务延迟时间,需要大任务下面的所有子任务完成之后等待延迟的时间进行执行,怎么设置这个时间?
2015/12/08 16:40
回复
举报
ClaraMic博主

引用来自“bwh1994”的评论

你的意思时从数据库将设置的时间查询出来然后设置到delay?

其实一个嵌套函数就可以解决问题→_→
2015/12/08 16:36
回复
举报
ClaraMic博主

引用来自“bwh1994”的评论

你的意思时从数据库将设置的时间查询出来然后设置到delay?

那个中间人的角色就好像SQL和程序对象之间的ORM一样而已
2015/12/08 16:26
回复
举报
ClaraMic博主

引用来自“bwh1994”的评论

你的意思时从数据库将设置的时间查询出来然后设置到delay?

任务不要设置delay,而是通过中间人的调度去执行。比如现在是4:00,用户要求在4:30执行某个任务,此时他提交任务,把delay设置为30min并存入数据库。但实际上提交任务时接受用户任务数据的是中间人而非你开发的celery client,这个中间人会等到30min后才调用你的celery client执行一个无delay设置的task
2015/12/08 16:20
回复
举报
更多评论
打赏
19 评论
4 收藏
2
分享
返回顶部
顶部