djcelery入门:实现运行定时任务

原创
2014/07/18 12:39
阅读数 3.2W


基于Django与Celery实现异步对列任务 - Python - 伯乐在线 http://python.jobbole.com/81953/ 更新于2015-08-26


注:本文根据官方文档结合具体例子整理,供celery入门学习,更多内容请移步参考资料一节。最近更新于2014-10-19

1 什么是celery

celery是一个异步任务队列/基于分布式消息传递的作业队列。Celery通过消息(message)进行通信,使用代理(broker)在客户端和工作执行者之间进行交互。当开始一个任务时,客户端发送消息到队列并由代理将其发往响应的工作执行者处。

2 Windows环境配置

在这里使用RabbitMQ作为消息代理(broker),Django数据库作为结果存储(ResultStore)。

2.1 安装ERLang

  1. 首先是到ERLang官网去下载ERlang可执行文件  地址:http://www.erlang.org/download.html

  2. 然后安装ERLang。

  3. 然后设置ERLang的环境变量。

  4. 在环境变量中加入 ERL_HOME = erlang安装目录

  5. 在path中添加 %ERL_HOME%\bin


2.2 安装rabbitmq

http://www.rabbitmq.com/releases/rabbitmq-server/v3.1.5/rabbitmq-server-3.1.5.exe下载rabbitmq-server 并安装。

 点击开始菜单中的 rabbitmq-start ,rabbitmq-server就启动了,在管理工具-服务中可以看到相关信息的。

2.3 安装djcelery

和大多数Python第三方包一样,用 pip安装celery和djcelery两个包。djcelery依赖于djcelery,所以只要执行pip install djcelery命令即可。

3 配置Djcelery

主要步骤

  1. 在settings配置相关参数

  2. 定义任务

  3. 执行任务,可以在程序中调用执行,也可交给后台周期性执行

3.1 基本配置

下面是djcelery的有关配置,定义在Django项目的settings模块内。

#... ... 
 
INSTALLED_APPS = (
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.sites',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    # Uncomment the next line to enable the admin:
    'django.contrib.admin',
    # Uncomment the next line to enable admin documentation:
    # 'django.contrib.admindocs',
    'djcelery',  #添加djcelery
     'mrs_app',  #自己的APP
) 
 
# ... .... 
# 配置djcelery相关参数,ResultStore默认存储在数据库可不必重写 ,
import djcelery
djcelery.setup_loader()
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
#任务定义所在的模块
CELERY_IMPORTS = ('mrs_app.my_celery.tasks', )
# 使用和Django一样的时区
CELERY_TIMEZONE = TIME_ZONE

#以上为基本配置,以下为周期性任务定义,以celerybeat_开头的  

CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

#CELERYBEAT_SCHEDULE = {
#    'add-every-3-minutes': {
#        'task': 'mrs_app.my_celery.tasks.monthly_reading_task',
#        'schedule': timedelta(minutes=3)
#    },
#}

首先导出djcelery模块,并调用setup_loader方法加载有关配置。

broker_url 类URL格式表明使用哪种消息代理 格式参见这里
celery_imports 任务定义在哪个模块,最好以tasks.py命名
celery_timezone 时间时区,等于TIMEZONE即和Django一样的时区。
celerybeat_scheduler 周期性任务,用Django数据库保存最后运行时间等信息
celerybeat_schedule 定义任务,上面注释表示每隔3分钟执行monthly_reading_task任务

 3.2 定义任务

有两种格式

  • 类定义:一个继承了celery.app.task的类并实现了run方法

  • 函数定义:@task装饰的函数

通过实现task相关方法可以实现更多的逻辑,比如成功回调、错误处理、重试机制等,以下是最基本的定义方式。

#mrs_app.my_celery.tasks.py

from celery import task

#第一种,函数方式  
 @task(name='monthly_reading')
def monthly_reading_task():
    task_obj = MonthlyReading(debug=False)
    task_obj.start()
    
#第二种,类定义
class MonthlyReadingTask(Task):
    name='monthly_reading'
    def run(*args, **kwargs): 
        task_obj = MonthlyReading(debug=False)
        task_obj.start()

3.3 启动

启动 python manage.py celery worker -l info

如果有定时任务的话,还需要启动心跳

 另开一个cmd窗口 python manage.py celery beat  (windows下-B选项不可用)

4 执行任务

4.1 直接调用

自己在代码中的调用,支持延迟/同步/异步调用,可参考task类定义,例子见参考资料的《使用django+celery+RabbitMQ实现异步执行》。

4.2 周期性调用

这种由djcelery调用,所以需要在settings.CELERYBEAT_SCHEDULER设置一个调度器,这里使用数据库。

djcelery提供了一些Model(定义在djcelery/models.py文件),数据库模型如下,

periodictask

描述定时任务。重要字段有:

name: 字符串,标识符

task,字符串,任务函数/类所在的路径,一般是celery_imports + function name。

interval:外键指向intervalschedule,表示每隔多少时间执行

crontab:外键指向crontabschedule,表示在某一时刻执行。

enabled:是否生效


intervalschedule

表示时间间隔,有两个参数:

every:正数;period间隔单位。比如intervalschedule(every=2, period='day')表示每隔2天


crontabschedule 表示某一时刻,有minute、hour、day_of-week、day_of_month、day_of_year它们的组合意义,参见cron时间表示法,比如0 0 * 10 * 表示每个月的10号凌晨。

说明:

  • 任务和定时任务的区别:定时任务 = 任务 + intervalschedule/crontabschedule 两个定时任务可以执行同一个任务。

  • 任务没有相应的Model,用字符串表示,即periodictask模型的task字段

  • 定时任务有相应的Model即periodictask。

djcelery在初始化中主要完成两件:

在settings.CELERY_IMPORTS定义下的模块搜索所有任务。这个对数据库没有任何改变,只是用Admin添加定时任务时periodictask.task字段变成选择框,列出了所有定义的任务。

从settings.CELERYBEAT_SCHEDULE创建定时任务,这个会创建数据记录,相当于celery_models.PeriodicTask.objects.create(..)语句。

4.3 创建定时任务

通过它提供的Model Query API来操作,同平常的数据库查询一样。

from djcelery import models as celery_models

celery_models.PeriodicTask.objects.create(...)
celery_models.PeriodicTask.ojects.get(name='add')
....

djcelery提供了admin管理界面,访问http://localhost:8000/admin/djcelery/ 即可,在这里可以对定时任务进行增删改查,具体和Django admin一样。

(图待补充)

注:当我们修改任务的设置后,比如关闭、更改时间后不用重启celery服务等。

4.4 示例

admin毕竟是给后台管理人员使用的,它所有的参数都暴露给使用者了。下面是一个实际使用的例子。

需求:ajax实现月度定时任务monthly_reading_task的执行和控制,即

每个月的某一天执行该任务;可以选择开启或者关闭该定时任务;能够选择任务在哪一天(1-28日)执行。

界面看起来是这样的:

基本上就是Model的增删改查。就不是通过admin来操作了。

查询任务信息

    def read(self, request, *args, **kwargs):
        try:
            task = celery_models.PeriodicTask.objects.get(name=self.TASK_NAME)
            if task.enabled:
                return {
                    'enabled': True,
                    'day_of_month': int(task.crontab.day_of_month),
                    'last_run_at': task.last_run_at if task.last_run_at else '0'
                }
            else:
                return {'enabled': False}
        except celery_models.PeriodicTask.DoesNotExist:
            return {'enabled': False}

更新日期

    def create(self, request, *args, **kwargs):
        enabled = request.POST.get('enabled', None)
        if enabled not in [self.ENABLED_POST_VALUE, self.DISABLED_POST_VALUE]:
            return self.operate_fail('无效参数')
        if enabled == self.DISABLED_POST_VALUE:
            self.disable_task(self.TASK_NAME)
            return self.operate_success()
        else:
            try:
                day_of_month = int(request.POST.get('day_of_month', ''))
                if day_of_month > 28 or day_of_month < 1:
                    return self.operate_fail('日期必须在1-28日之间')
                task, created = celery_models.PeriodicTask.objects.get_or_create(name="monthly_reading",
                                                                                 task="mrs_app.my_celery.tasks.monthly_reading_task")
                if created:
                    crontab = celery_models.CrontabSchedule.objects.create(day_of_month=day_of_month,
                                                                           hour=0,
                                                                           minute=0)
                    crontab.save()
                    task.crontab = crontab
                    task.enabled = True
                    task.save()
                else:
                    task.crontab.day_of_month = day_of_month
                    task.crontab.save()
                    task.enabled = True
                    task.save()
                return self.operate_success()
            except ValueError:
                return self.operate_fail('抄表日不能为空')

关闭定时

    def disable_task(self, name):
        try:
            task = celery_models.PeriodicTask.objects.get(name=name)
            task.enabled = False
            task.save()
            return True
        except celery_models.PeriodicTask.DoesNotExist:
            return True

后台控制界面



5 参考资料&进阶





展开阅读全文
打赏
2
3 收藏
分享
加载中
可以帮我看看这个问题吗?十分头大,感觉到是真不会了。http://www.oschina.net/question/2007826_178262
2014/10/26 01:29
回复
举报
Kinegratii博主

引用来自“CrazyHarry”的评论

有没有遇到过这样的问题:
Target WSGI script not found or unable to stat: /var/www/html/test/wsgi/django.wsgihome
然而/test/home是我的一个请求路径,现在项目部署到apache上只能访问base.html
暂时没有碰到过,可以到网上搜下,应该会有解决方案的
2014/10/23 09:39
回复
举报
有没有遇到过这样的问题:
Target WSGI script not found or unable to stat: /var/www/html/test/wsgi/django.wsgihome
然而/test/home是我的一个请求路径,现在项目部署到apache上只能访问base.html
2014/10/22 18:30
回复
举报

引用来自“CrazyHarry”的评论

我现在是使用python manage.py celery worker --beat可以按指定的时间运行task 但是admin页面上的tasks记录里在运行完指定的task后并没有添加进去历史记录。另外这个worker表示的是什么意思,是不是需要添加work才可以使用python manage.py celery beat启动后会自动运行task

引用来自“Kinegratii”的评论

1 work相当于处队列,负责任务的调度和执行等等,可以使用默认的配置,也可以自己配置 2 现在celery_taskmeta表可以看到相关信息的,至于djcelery_taskstate可能是没有配置worker的原因吧,暂时还在研究中
好吧,期待你的研究成功
2014/10/21 10:06
回复
举报
Kinegratii博主

引用来自“CrazyHarry”的评论

我现在是使用python manage.py celery worker --beat可以按指定的时间运行task 但是admin页面上的tasks记录里在运行完指定的task后并没有添加进去历史记录。另外这个worker表示的是什么意思,是不是需要添加work才可以使用python manage.py celery beat启动后会自动运行task
1 work相当于处队列,负责任务的调度和执行等等,可以使用默认的配置,也可以自己配置 2 现在celery_taskmeta表可以看到相关信息的,至于djcelery_taskstate可能是没有配置worker的原因吧,暂时还在研究中
2014/10/21 10:01
回复
举报
Kinegratii博主

引用来自“CrazyHarry”的评论

你好,我有以下几个问题:
1.python manag.py syncdb后数据库中是生成了djcelery的相关table,但是里面没数据
2.在创建task之后是需要手动在数据库中插入配置信息吗
3.如果手动在数据库中配置后,是通过启动心跳后,当时间条件满足了task的要求就会自动调用task吗?
4.能否提供一个数据配置的记录,比如args以及kwags

引用来自“Kinegratii”的评论

之前没有说明任务和定时任务的区别和关系,下午作了一些修改。1这个的确没有自己的数据,添加定时任务需要通过admin或则自己在views写代码等。2数据库是不保存task的信息的;3这个会自动调用;4定时任务有args和kwargs有这两个字段,执行的时候会传递到task的run方法

引用来自“CrazyHarry”的评论

args和kwargs这两个字段要求json格式的数据,比如实例中的add(x,y)那么这个task的args和kwargs应该如何填入?

引用来自“Kinegratii”的评论

是要求json数据,比如kwargs =' {"x":4,"y":4}'这个样子,内部的验证源代码https://github.com/celery/django-celery/blob/master/djcelery/admin.py#L266

引用来自“CrazyHarry”的评论

我启动心跳后 会有这样的log出现在终端 " [2014-10-19 23:49:42,849: INFO/MainProcess] Scheduler: Sending due task 自动测试 (autotester.script.tasks.executetest) " 这样是意味着已执行task吗 ?如果是已执行为何没有看到task里方法执行的过程以及结果?也没有报错,如果顺利执行了 是不是在admin页面的djcelery.task里应该可以看到task执行的情况

引用来自“Kinegratii”的评论

这个Log是说明定时器起作用了,任务执行情况应该看另外一个CMD窗口,如果成功后的会显示“Task autotester.script.tasks.executetest”successed in 3.00000s之类的信息。

引用来自“CrazyHarry”的评论

那么任务执行失败的话不是也应该有log吗,这个错误的信息如何查看
在cmd也可以看到,还可以重写Task.on_failure函数,当任务失败的回调函数
2014/10/21 09:38
回复
举报
我现在是使用python manage.py celery worker --beat可以按指定的时间运行task 但是admin页面上的tasks记录里在运行完指定的task后并没有添加进去历史记录。另外这个worker表示的是什么意思,是不是需要添加work才可以使用python manage.py celery beat启动后会自动运行task
2014/10/20 18:40
回复
举报

引用来自“CrazyHarry”的评论

你好,我有以下几个问题:
1.python manag.py syncdb后数据库中是生成了djcelery的相关table,但是里面没数据
2.在创建task之后是需要手动在数据库中插入配置信息吗
3.如果手动在数据库中配置后,是通过启动心跳后,当时间条件满足了task的要求就会自动调用task吗?
4.能否提供一个数据配置的记录,比如args以及kwags

引用来自“Kinegratii”的评论

之前没有说明任务和定时任务的区别和关系,下午作了一些修改。1这个的确没有自己的数据,添加定时任务需要通过admin或则自己在views写代码等。2数据库是不保存task的信息的;3这个会自动调用;4定时任务有args和kwargs有这两个字段,执行的时候会传递到task的run方法

引用来自“CrazyHarry”的评论

args和kwargs这两个字段要求json格式的数据,比如实例中的add(x,y)那么这个task的args和kwargs应该如何填入?

引用来自“Kinegratii”的评论

是要求json数据,比如kwargs =' {"x":4,"y":4}'这个样子,内部的验证源代码https://github.com/celery/django-celery/blob/master/djcelery/admin.py#L266

引用来自“CrazyHarry”的评论

我启动心跳后 会有这样的log出现在终端 " [2014-10-19 23:49:42,849: INFO/MainProcess] Scheduler: Sending due task 自动测试 (autotester.script.tasks.executetest) " 这样是意味着已执行task吗 ?如果是已执行为何没有看到task里方法执行的过程以及结果?也没有报错,如果顺利执行了 是不是在admin页面的djcelery.task里应该可以看到task执行的情况

引用来自“Kinegratii”的评论

这个Log是说明定时器起作用了,任务执行情况应该看另外一个CMD窗口,如果成功后的会显示“Task autotester.script.tasks.executetest”successed in 3.00000s之类的信息。
那么任务执行失败的话不是也应该有log吗,这个错误的信息如何查看
2014/10/20 18:18
回复
举报
Kinegratii博主

引用来自“CrazyHarry”的评论

你好,我有以下几个问题:
1.python manag.py syncdb后数据库中是生成了djcelery的相关table,但是里面没数据
2.在创建task之后是需要手动在数据库中插入配置信息吗
3.如果手动在数据库中配置后,是通过启动心跳后,当时间条件满足了task的要求就会自动调用task吗?
4.能否提供一个数据配置的记录,比如args以及kwags

引用来自“Kinegratii”的评论

之前没有说明任务和定时任务的区别和关系,下午作了一些修改。1这个的确没有自己的数据,添加定时任务需要通过admin或则自己在views写代码等。2数据库是不保存task的信息的;3这个会自动调用;4定时任务有args和kwargs有这两个字段,执行的时候会传递到task的run方法

引用来自“CrazyHarry”的评论

args和kwargs这两个字段要求json格式的数据,比如实例中的add(x,y)那么这个task的args和kwargs应该如何填入?

引用来自“Kinegratii”的评论

是要求json数据,比如kwargs =' {"x":4,"y":4}'这个样子,内部的验证源代码https://github.com/celery/django-celery/blob/master/djcelery/admin.py#L266

引用来自“CrazyHarry”的评论

我启动心跳后 会有这样的log出现在终端 " [2014-10-19 23:49:42,849: INFO/MainProcess] Scheduler: Sending due task 自动测试 (autotester.script.tasks.executetest) " 这样是意味着已执行task吗 ?如果是已执行为何没有看到task里方法执行的过程以及结果?也没有报错,如果顺利执行了 是不是在admin页面的djcelery.task里应该可以看到task执行的情况
这个Log是说明定时器起作用了,任务执行情况应该看另外一个CMD窗口,如果成功后的会显示“Task autotester.script.tasks.executetest”successed in 3.00000s之类的信息。
2014/10/20 16:52
回复
举报

引用来自“CrazyHarry”的评论

你好,我有以下几个问题:
1.python manag.py syncdb后数据库中是生成了djcelery的相关table,但是里面没数据
2.在创建task之后是需要手动在数据库中插入配置信息吗
3.如果手动在数据库中配置后,是通过启动心跳后,当时间条件满足了task的要求就会自动调用task吗?
4.能否提供一个数据配置的记录,比如args以及kwags

引用来自“Kinegratii”的评论

之前没有说明任务和定时任务的区别和关系,下午作了一些修改。1这个的确没有自己的数据,添加定时任务需要通过admin或则自己在views写代码等。2数据库是不保存task的信息的;3这个会自动调用;4定时任务有args和kwargs有这两个字段,执行的时候会传递到task的run方法

引用来自“CrazyHarry”的评论

args和kwargs这两个字段要求json格式的数据,比如实例中的add(x,y)那么这个task的args和kwargs应该如何填入?

引用来自“Kinegratii”的评论

是要求json数据,比如kwargs =' {"x":4,"y":4}'这个样子,内部的验证源代码https://github.com/celery/django-celery/blob/master/djcelery/admin.py#L266
我启动心跳后 会有这样的log出现在终端 " [2014-10-19 23:49:42,849: INFO/MainProcess] Scheduler: Sending due task 自动测试 (autotester.script.tasks.executetest) " 这样是意味着已执行task吗 ?如果是已执行为何没有看到task里方法执行的过程以及结果?也没有报错,如果顺利执行了 是不是在admin页面的djcelery.task里应该可以看到task执行的情况
2014/10/20 13:04
回复
举报
更多评论
打赏
17 评论
3 收藏
2
分享
返回顶部
顶部