文档章节

Celery 入门-简单任务开发

胡佐治
 胡佐治
发布于 2016/09/16 23:32
字数 652
阅读 196
收藏 4

安装

pip install celery  #

pip install flower #celery的及时监控组件

配置

默认celery的配置文件名是celeryconfig.py,本例的内容如下:

BROKER_URL ='amqp://guest:guest[@localhost](https://my.oschina.net/u/570656):5672' 
CELERY_ENABLE_UTC = True 
CELERY_TIMEZONE='Asia/Shanghai' 
CELERY_IMPORTS = ('celery_demo.tasks','celery_demo.tasks2')
# other config option 

默认flower的配置文件名是flowerconfig.py,本例的内容如下:

broker_api = 'http://guest:guest[@localhost](https://my.oschina.net/u/570656):15672/api/'
logging = 'DEBUG'
basic_auth = ['admin:admin'] #加上授权保护

address = '127.0.0.1'
port = 5556

#开发任务 项目结构如下:

├── celery_demo
│   ├── celeryconfig.py 
│   ├── flowerconfig.py
│   ├── __init__.py 
│   ├── README.md
│   ├── tasks2.py 
│   ├── tasks.py 
├── __init__.py     
└── test
    ├── __init__.py 
    ├── tasks_test.py 

为演示目的,tasks.py和tasks2.py的代码是相同的,tasks.py的代码如下:

from celery import Celery
from celery.exceptions import Reject,MaxRetriesExceededError
from celery.utils.log import task_logger

import celeryconfig
app = Celery()
import requests
from requests.exceptions import ConnectTimeout 

class ErrorURLStartException(Exception):
    def __init__(self):
        self.message = '必须是HTTP或HTTPS开头' 

@app.task(bind=True, acks_late=True)
def task1(self, url):
    print url
    try:
        if url.startswith('http')==False :
            raise ErrorURLStartException()
        res = requests.get(url, timeout=5)
        task_logger.info('status_code:%s' % res.status_code)
    except ConnectTimeout as e:
        raise self.retry(exc=e, countdown=5, max_retries=5)
    except ErrorURLStartException as e:
        raise Reject(reason=e, requeue=False) #requeue=True可能会造成任务一致被循环的处理,永远不会结束
    else:
        task_logger.info('任务执行完毕') 

if __name__ == '__main__':
    app.config_from_object(celeryconfig)
    app.worker_main(['tasks', '-lDEBUG'])

tasks.task1任务的目的是当一个合法的HTTP URL过来的时候去GET内容,如果出现连接超时的异常就重试5次,如果出现非法的URL就直接将任务抛弃。

单元测试的代码如下:

import unittest
from unittest import TestCase 
from celery_demo.tasks import task1
from celery_demo.tasks2 import task1 as task2




class TasksTestCase(TestCase):
    def test_task1(self):
        print 'test_task1'
        url = 'http://www.baidu.com'
        task1.apply_async(args=[url])
     

    def test_task2(self):
        print 'test_task2'
        url = 'http://169.24.1.100'
        task1.apply_async(args=[url])
    

    def test_task3(self):
        print 'test_task3'
        url = 'adfhttp://169.24.1.100'
        task1.apply_async(args=[url])
    

    def test_task4(self):
        print 'test_task4'
        url = 'http://169.24.1.100'
        task2.apply_async(args=[url])

启动任务

进入到 celery_demo的上一级目录

george@george-pc:~/workspace/work_demo$ ll
总用量 24
drwxr-xr-x 2 george george 4096 9月  16 22:45 celery_demo
-rw-r--r-- 1 george george   56 9月  15 22:36 __init__.py
drwxr-xr-x 2 george george 4096 6月   4 11:29 rabbitmq_demo
-rw-r--r-- 1 george george  254 9月  15 21:40 tasks.py
-rw-r--r-- 1 george george  547 9月  16 00:12 tasks.pyc
drwxr-xr-x 2 george george 4096 9月  16 12:58 test

分别实行下面的三条指令:

  • 启动flower的实例

    celery flower --conf=celery_demo/flowerconfig.py

  • 启动celery worker的实例

    celery worker --config=celery_demo.celeryconfig -n worker1.%h -l DEBUG

  • 运行单元测试

    python -m unittest test.tasks_test

运行结果

可以通过在浏览器中输入http://localhost:5556/ 进行flower的访问,如图:输入图片说明

总结

使用celery来进行异步任务的处理让程序扩展性得到提升然而并没有增加应用的复杂性.开发人员几乎不用关注broker的存在,只把精力关注在如何设计task上。粗浅的理解。

© 著作权归作者所有

胡佐治
粉丝 10
博文 32
码字总数 22828
作品 0
上海
后端工程师
私信 提问
分布式任务队列Celery入门与进阶

一、简介   Celery是由Python开发、简单、灵活、可靠的分布式任务队列,其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。Celery侧重于实时操作,但对调度支持也...

W-D
2018/08/22
0
0
基于Celery的并行处理工程-OpenWorker快速安装

Celery 是一个简单灵活的Python并行处理框架,但是相关的几个工程需要独自安装和配置,给小白的使用带来困难。 OpenWorker是基于Python的并行处理框架,将集成Celery、Flower、Jobtastic和R...

openthings
2015/08/10
644
4
Celery 和 Redis 入门

Celery 是一个广泛应用于网络应用程序的任务处理系统。 它可以在以下情况下使用: 在请求响应周期中做网络调用。服务器应当立即响应任何网络请求。如果在请求响应周期内需要进行网络调用,则应...

OneAPM蓝海讯通
2015/08/31
215
0
基于Celery的并行处理工程-OpenWorker

OpenWorker-初始建立(2015-05-29),欢迎参与:https://github.com/supergis/OpenWorker。 OpenWorker-基于Python的并行处理框架,将集成Celery、Flower、Jobtastic和Rodeo工程,可以通过...

openthings
2015/05/29
461
1
异步任务神器 Celery 简明笔记

Celery 在程序的运行过程中,我们经常会碰到一些耗时耗资源的操作,为了避免它们阻塞主程序的运行,我们经常会采用多线程或异步任务。比如,在 Web 开发中,对新用户的注册,我们通常会给他发...

funhacks
2017/11/29
0
0

没有更多内容

加载失败,请刷新页面

加载更多

利用CSS禁止手机长按出现气泡: 复制、选择等功能

可以用 * ,也可作用于一个div div{  -webkit-touch-callout:none;  /*系统默认菜单被禁用*/  -webkit-user-select:none; /*webkit浏览器*/  -khtml-user-select:none; /*早期浏览...

蓝小驴
51分钟前
7
0
前端的一些雕虫小技,从100%和滚动条说起

1、100%和滚动条 当我们在css中把html和body同时设为100%时,会出现滚动条 html, body { width: 100%; height: 100%; } 原因是html和b...

wphmoon
今天
8
0
电力区块链应用案例【2019】

随着区块链技术的日益普及,出现了大量创业企业尝试使用区块链技术来解决能源与电力行业中存在的问题。在本文中,我们将介绍其中的三个能源区块链项目。 能源行业以价格不透明著称:消费者很...

汇智网教程
今天
12
0
聊聊rocketmq的adjustThreadPoolNumsThreshold

序 本文主要研究一下rocketmq的adjustThreadPoolNumsThreshold DefaultMQPushConsumer rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.ja......

go4it
今天
10
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部