#sora#celery笔记——call the task

原创
2015/04/26 14:45
阅读数 2.5K

基本的两种task调用方式:

apply_async()和delay(),前者要把参数放在元组或字典中,后者直接使用参数


快速参考:


T.delay(arg, kwarg=value)

always a shortcut to .apply_async.


T.apply_async((arg, ), {'kwarg': value})


T.apply_async(countdown=10)

executes 10 seconds from now.


T.apply_async(eta=now + timedelta(seconds=10))

executes 10 seconds from now, specifed using eta


T.apply_async(countdown=60, expires=120)

executes in one minute from now, but expires after 2 minutes.


T.apply_async(expires=now + timedelta(days=2))

expires in 2 days, set using datetime.


例子:

task.delay(arg1, arg2, kwarg1='x', kwarg2='y')

task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})


其他的特性:


##任务延时(设置countdown参数)##

>>> result = add.apply_async((2, 2), countdown=3)

>>> result.get()    # this takes at least 3 seconds to return

20


##任务过期(设置expires参数)##

>>> # Task expires after one minute from now.

>>> add.apply_async((10, 10), expires=60)

When a worker receives an expired task it will mark the task as REVOKED



##任务重试(设置retry参数)##

add.apply_async((2, 2), retry=True, retry_policy={

    'max_retries': 3,

    'interval_start': 0,

    'interval_step': 0.2,

    'interval_max': 0.2,

})


。Retry Policy


A retry policy is a mapping that controls how retries behave, and can contain the following keys:


。max_retries


Maximum number of retries before giving up, in this case the exception that caused the retry to fail will be raised.


A value of 0 or None means it will retry forever.


The default is to retry 3 times.


。interval_start


Defines the number of seconds (float or integer) to wait between retries. Default is 0, which means the first retry will be instantaneous.


。interval_step


On each consecutive retry this number will be added to the retry delay (float or integer). Default is 0.2.


。interval_max


Maximum number of seconds (float or integer) to wait between retries. Default is 0.2.



##序列化工具(设置serializer参数)##

有几种可用的序列化工具:JSON,pickle,YAML,msgpack

JSON虽然不错,但是对于传输二进制数据有点无力,因为基于Base64编码,数据体积变大,而且可用的数据格式有限

pickle适用于无需支持其他语言开发的组件的情况,传输二进制文件更轻量级和快速

YAML对跨语言的兼容性不错,支持更多的数据格式。但是处理它的python模块比较慢

msgpack是一种新的交换格式,功能类似JSON,不过还是too young


序列化工具的配置优先级如下

1、The serializer execution option.

2、The Task.serializer attribute

3、The CELERY_TASK_SERIALIZER setting.


e.g >>> add.apply_async((10, 10), serializer='json')



##压缩工具(设置compression)##

如果发送的消息比较大,可以考虑压缩消息

e.g >>> add.apply_async((2, 2), compression='zlib')


##不同的消息队列#

e.g add.apply_async(queue='priority.high')

e.g $ celery -A proj worker -l info -Q celery,priority.high


高级参数:

。exchange


Name of exchange (or a kombu.entity.Exchange) to send the message to.


。routing_key


Routing key used to determine.


。priority


A number between 0 and 9, where 0 is the highest priority.


Supported by: redis, beanstalk



展开阅读全文
打赏
1
3 收藏
分享
加载中
更多评论
打赏
0 评论
3 收藏
1
分享
返回顶部
顶部