文档章节

Celery-RabbitMQ-Django-Cron

扫把就是扫把
 扫把就是扫把
发布于 2016/03/18 16:37
字数 1244
阅读 229
收藏 2

celery,rabbitmq,django,db,计划任务,后台管理 。基本的命令和代码。

0. 安装

    0.1  install

1. celery 基础命令

    1.1. 启动 celery

    1.2. 后台进程启动celery worker

    1.3. 重启 celery worker 后台进程

    1.4. 停止 celery worker 后台进程

    1.5. 等待 停止 celery worker 后台进程

    1.6. 指定 pidfile & logfile

    1.7. 启动 多个 worker 并且指定不同参数

    1.8. 手动杀死所有worker进程

    1.9. 较完整的celery的启动命令

2. 管理相关命令

    2.1. 启动 flower

    2.2. 需要 目录

    2.3. 使用 librabbitmq


0.1  install

sudo apt-get install build-essential python-dev
sudo pip install celery
sudo pip install librabbitmq
sudo pip install flower
sudo apt-get install rabbitmq-server
sudo rabbitmq-plugins enable rabbitmq_management
sudo service rabbitmq-server restart
sudo rabbitmq-plugins disable rabbitmq_management

访问 http://host:15672 即可进入管理界面。

默认用户名,密码都是 guest


1.1. 启动 celery

celery -A task worker -l info

启动时指定要使用的 queue

celery -A task worker -l info -Q brand_queue,new_queue -E


1.2. 后台进程启动celery worker

celery multi start w1 -A task -l info -Q brand_queue,new_queue -E

可通过命令查看后台启动的进程:

ps -aux | grep celery
[celeryd: w1@x :MainProcess] -active- (worker -E -A task -l info -Q brand_queue,new_queue --logfile=w1.log --pidfile=w1.pid --hostname=w1@x)

可以看到默认添加了几个参数:

--logfile=w1.log    默认在当前文件夹新建 w1.log 文件
--pidfile=w1.pid    默认在当前文件夹新建 w1.pid 文件
--hostname=w1@x     默认实例名 woker_name/机器名

1.3. 重启 celery worker 后台进程

celery multi restart w1 -A task -l info -Q brand_queue,new_queue -E

1.4. 停止 celery worker 后台进程

celery multi stop w1 -A task -l info -Q brand_queue,new_queue -E

stop 命令是异步的,worker 会立即停止,即使当时仍然有任务在执行,

并且不会写停止worker相关的日志

1.5. 等待 停止 celery worker 后台进程

celery multi stopwait w1 -A task -l info -Q brand_queue,new_queue -E

这个停止命令,会等待正在运行的所有任务都完成再停止。

1.6. 指定 pidfile & logfile

celery multi start w1 -A task -l info -Q brand_queue,new_queue -E --pidfile=/var/www/api/space/run/%n.pid --logfile=/var/www/api/space/logs/%n%I.log

1.7. 启动 多个 worker 并且指定不同参数

celery multi start 10 -A task -l info -E -Q:1-3 images,video -Q:4,5 data -Q default -L:4,5 debug

启动了10个worker:

worker 1,2,3 使用了队列 images, video

worker 4,5   使用了队列 data

worker 其他   使用了队列 default

-L 是什么参数?

1.8. 手动杀死所有worker进程

ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9

1.9. 较完整的celery的启动命令

celery multi start w1 -A task -l info -Q brand_queue,new_queue,time_queue,cron_queue -E -B -s /var/www/api/space/run/celerybeat-schedule --pidfile=/var/www/api/space/run/%n.pid --logfile=/var/www/api/space/logs/%n%I.log


2.1 启动 flower

celery flower --broker=amqp://tiger:tiger@192.168.0.6:5672/vr_tiger --address=192.168.0.4 --port=5555 --broker_api=http://tiger:tiger@192.168.0.6:15672/api/ --basic_auth=tiger:tiger
rabbitmq 主机地址: 192.168.0.6
本机地址         : 192.168.0.4
本地监听端口      : 5555

2.2 需要目录

run/
log/

2.3 使用 librabbitmq

If you’re using RabbitMQ (AMQP) as the broker then you can install the librabbitmq module to use an optimized client written in C:
$ pip install librabbitmq

注意

1. 后台运行 celery 的是否,worker 信息不会保存。所以,每次对 worker 操作时都需要加上相同的参数。特别是 pidfile 和 logfile 需要相同。

2. 我没有使用 celery 提供的任务结果存储。我在业务中自己处理 过程及结果。

代码示例

配置文件:

# -*- coding=utf-8 -*-
# File Name: task_conf.py

from __future__ import absolute_import

from celery   import Celery
from datetime import timedelta
from celery.schedules import crontab

'''
task_random: 是任务的名称
broker: 通过 amqp://用户名:密码@ip/虚拟主机连接 amqp
include: 任务程序
'''

# 消息队列配置
mq_host = '192.168.0.6'
mq_name = 'tiger'
mq_pass = 'tiger'
mq_vr   = 'vr_tiger'

broker  = 'amqp://%s:%s@%s/%s' % (mq_name, mq_pass, mq_host, mq_vr)

# 初始化 app
app = Celery('name_wash', broker=broker, include=['task'])

# 指定任务存储队列
app.conf.update(

            CELERY_ROUTES = {
                'task.exe_task':{'queue':'brand_queue'},
                'task.task_sms_send':{'queue':'new_queue'},
                'task.task_sec':{'queue':'time_queue'},
                'task.task_cron':{'queue':'cron_queue'}
            },

            CELERYBEAT_SCHEDULE = {
                    'exe-every-10-seconds': {
                        'task': 'task.task_sec',
                        'schedule': timedelta(seconds=30),
                        'args': [1],
                    },

                    'add-every-monday-morning': {
                        'task': 'task.task_cron',
                        'schedule': crontab(hour=15, minute=47, day_of_week=5),
                        'args': (15232897835,),
                    },
            },

            #CELERY_TASK_SERIALIZER     = 'json',
            #CELERY_ACCEPT_CONTENT      = ['json'],  # Ignore other content
            #CELERY_RESULT_SERIALIZER   = 'json',
            CELERY_EVENT_QUEUE_TTL      = 5,
            CELERY_TIMEZONE             = 'Asia/Shanghai',
            CELERY_ENABLE_UTC           = True,
            CELERY_DISABLE_RATE_LIMITS  = True,
            CELERY_IGNORE_RESULT        = True

        )

if __name__ == '__main__':
    app.start()


任务文件:

# -*- coding=utf-8 -*-
# File Name: task.py

'''
task
'''

from __future__ import absolute_import

import time
import traceback

from job.task_conf import app


@app.task(ignore_result=True)
def exe_task(task_id, number):
    ''' 根据参数执行任务 '''

    try:
        print 'exe task: ', task_id
        time.sleep(number)
    except:
        traceback.print_exc()
        return (task_id, number, -1)

    return 'true :)'


@app.task
def task_sms_send(mobile, content):
    ''' 任务-发送短信 '''

    try:
        print 'send sms: mobile-> %s , content-> %s' % (mobile, content)
    except:
        traceback.print_exc()
        return 'Fail :('

    return 'Success :)'


@app.task
def task_sec(mobile):
    ''' 测试 任务 时间 定制 '''

    try:
        print 'send sms: mobile-> %s.' % mobile
    except:
        traceback.print_exc()
        return 'F'

    return 'S'

@app.task
def task_cron(mobile):
    ''' 测试 任务 时间 定制 Cron '''

    try:
        print 'send sms: mobile-> %s.' % mobile
    except:
        traceback.print_exc()
        return 'F - cron'

    return 'S - cron'



def main():
    res = exe_task(2, 2)
    print 'res: ', res

if __name__ == '__main__':
    main()


添加任务代码:

# -*- coding=utf-8 -*-
# File Name: add_task.py

import time
import traceback
import random

from task import exe_task, task_sms_send


def action():
    tries = 0

    while 1:
        try:
            tries += 1
            if tries >= 20:
                break

            task_id = tries
            number = random.randint(1, 5)

            exe_task.apply_async(args=[task_id, number], queue='brand_queue')
            print 'added one task'
            time.sleep(1)
        except:
            traceback.print_exc()
            pass

    print 'add task done'


def add_task_by_django(task_id, number):
    ''' 测试 从 django 添加 任务 '''

    exe_task.apply_async(args=[task_id, number], queue='brand_queue')


def add_task_sms(mobile, content):
    ''' 添加 发送 短信 任务 '''

    # 列表参数或者字典参数
    # task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})

    task_sms_send.apply_async(args=[mobile, content], queue='new_queue')
    print 'task added: sms'


def main():

    action()

if __name__ == '__main__':

    main()


© 著作权归作者所有

扫把就是扫把
粉丝 2
博文 37
码字总数 23616
作品 0
成都
程序员
私信 提问
加载中

评论(2)

扫把就是扫把
扫把就是扫把

引用来自“Zeopean”的评论

你好,请问这个例子咋调用呢?我运行了只能重复 added one task 。。。
你运行的 add_task.py 是往队列中添加任务。

现在,另一个shell中开启celery, 指定好 task(task.py) 文件 和 消息队列(brand_queue)
celery就会执行你添加到队列中的任务了。

就像这样:celery -A task worker -l info -Q brand_queue
Zeopean
Zeopean
你好,请问这个例子咋调用呢?我运行了只能重复 added one task 。。。
djcelery入门:实现运行定时任务

基于Django与Celery实现异步对列任务 - Python - 伯乐在线 http://python.jobbole.com/81953/ 更新于2015-08-26 注:本文根据官方文档结合具体例子整理,供celery入门学习,更多内容请移步参...

Kinegratii
2014/07/18
0
17
使用 django+celery+RabbitMQ 实现异步执行

RabbitMQ大家应该不陌生,著名的消息队列嘛。可惜我最近才听说它的大名,了解之后不禁惊呼,世界上居然还有这种东西! 立刻觉得手里有了锤子,就看什么都是钉子了,主网站不愿意干的操作统统...

renwofei423
2011/07/29
9.5K
4
Python 并行分布式框架之 Celery

Celery (芹菜)是基于Python开发的分布式任务队列。它支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。 架构设计 Celery的架构由三部分组成,消息中间件(message broker)...

naughty
2015/03/12
0
6
Python 并行分布式框架:Celery

Celery (芹菜)是基于Python开发的分布式任务队列。它支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。 一、架构设计 Celery的架构由三部分组成,消息中间件(message bro...

openthings
2015/05/20
0
0
python任务调度模块celery

python任务调度模块celery celery简介 Celery是一个python开发的异步分布式任务调度模块。 Celery本身并不提供消息服务,使用第三方服务,也就是borker来传递任务,一般使用rabbitMQ或者Red...

laoba
2018/05/28
0
0

没有更多内容

加载失败,请刷新页面

加载更多

JWT学习总结

官方 https://jwt.io 英文原版 https://www.ietf.org/rfc/rfc7519.txt 或 https://tools.ietf.org/html/rfc7519 中文翻译 https://www.jianshu.com/p/10f5161dd9df 1. 概述 JSON Web Token(......

冷基
28分钟前
2
0
AOP的学习(1)

AOP 理解AOP编程思想(面向方法、面向切面) spring AOP的概念 方面 -- 功能 目标 -- 原有方法 通知 -- 对原有方法增强的方法 连接点 -- 可以用来连接通知的地方(方法) 切入点 -- 将用来插入...

太猪-YJ
今天
4
0
一张图看懂亮度、明度、光度、光亮度、明亮度

亮度、明度、光亮度,Luminance和Brightness、lightness其实都是一个意思,只是起名字太难了。 提出一个颜色模型后,由于明度的取值与别人的不同,为了表示区别所以就另想一个词而已。 因此在...

linsk1998
昨天
8
0
Python应用:python链表示例

前言 python链表应用源码示例,需要用到python os模块方法、函数和类的应用。 首先,先简单的来了解下什么是链表?链表是一种物理存储单元上非连续、非顺序的存储结构,数据元素的逻辑顺序是...

python小白1
昨天
4
0
Source Insight加载源码

Source Insight是一个图形化的源代码查看工具(当然也可以作为编译工具)。如果一个项目的源代码较多,此工具可以很方便地查找到源代码自建的依赖关系。 1.创建工程 下图为Snort源代码的文件...

天王盖地虎626
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部