文档章节

celery + redis

弘_轩
 弘_轩
发布于 2017/01/23 15:58
字数 972
阅读 99
收藏 3

celery 是一种分布式任务队列

以下是需要理解的几种概念

任务:消息队列里面的一个工作单元

分布式:独立Worker可以布在不同的机器上,一个worker可以指定并发数

Broker:消息通讯的中间人,主要有RabbitMQ, Redis(本例用的是redis,较为轻量级)

beat:定时任务,可以指定任务若干秒后,或定时时间执行

 

安装就不说了,直接用 pip 就可以了,也可以安装在venv上

先创建一下结构

celery_proj/
    __init__.py

    beat_tasks.py #定时任务放在这里

   celery.py #启动脚本
 
    config.py #配置

    tasks.py  #非定时任务,手动调的等其他任务放在这个里

        

celery.py

#!/mfs/lib/Envs/whx/bin/python2.7
# -*-coding:utf-8 -*-

# Created on 2016-05-09

# @author: whx

from __future__ import absolute_import
from celery import Celery

#指定任务py
app = Celery('celery_proj', include=['celery_proj.tasks', 'celery_proj.beat_tasks'])
#指定配置
app.config_from_object('celery_proj.config')



#def getActiveInfo(timeout=1):
#    active_dict = app.control.inspect(timeout=timeout).active()
#    return active_dict
    

#def removeTask(task_id):
#    print app.control.revoke(task_id, terminate=True)


if __name__ == '__main__':
    app.start()

beat_tasks.py

#!/mfs/lib/Envs/whx/bin/python2.7
# -*-coding:utf-8 -*-

# Created on 2016-05-09

# @author: whx

from __future__ import absolute_import
from celery_proj.celery import app
import datetime
import subprocess
import os


def run_cmd(cmd):
    "运行系统命令"
    pipe = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    stdout, stderr = pipe.communicate()
    errcode = pipe.wait()
    if errcode != 0:
        msg = "command failed:\n%s\n" % ' '.join(cmd)
        if stdout:
            msg += "Standard output:\n%s\n" % stdout
        if stderr:
            msg += "Standard error:\n%s\n" % stderr
        raise OSError, msg
    return stdout, stderr



def get_last_hour():
    return (datetime.datetime.today() - datetime.timedelta(hours=1)).strftime("%Y%m%d%H")


#定义每小时日志ETL任务
@app.task(name='log_clean')
def log_clean_traffic():
    yyyymmddhh = get_last_hour()
    tag_dir= os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
    cmd = "bash  %s/run_sh/logformat.sh  traffic  %s"%(tag_dir, yyyymmddhh)
    #print cmd
    run_cmd(['sh', '-c', cmd])

tasks.py

from __future__ import absolute_import
from celery_proj.celery import app


#定义任务超时时间
#bind 绑定一个任务实例,self
#max_retries 是重新尝试的次数
#default_retry_delay 任务重新尝试的时间间隔6秒
#异常处理的countdown,时间间隔10秒,优先于default_retry_delay
#expires:任务运行的时间限制,超过300秒,任务会直接revoke
#没有指定任务名默认为dologic
@app.task(bind=True, max_retries=3,default_retry_delay=6, expires=300)
def dologic(self, task_name, args_json):
    cmd = "bash /mfs/home/whx/Script/spark/spark_run.sh  %s"%(task_name, args_json)
    try:
        return run_cmd(['sh', '-c', cmd])
    except Exception as exc:
        raise self.retry(exc=exc, countdown=10)

config.py

#!/mfs/lib/Envs/whx/bin/python2.7
# -*-coding:utf-8 -*-

# Created on 2016-05-09

# @author: whx


from __future__ import absolute_import
from celery.schedules import crontab

#执行结果保存的路径
CELERY_RESULT_BACKEND =  'redis://:pwd@host:port/0'
#中间人
BROKER_URL =  'redis://:pwd@host:port/12'
#序列化
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
#结果保存的时间
CELERY_TASK_RESULT_EXPIRES = 86400  # 24 hours.
CELERY_TIMEZONE = 'Asia/Shanghai'  # 时区

#queue 定义执行的队列,启动celery时可以指定队列名
CELERY_ROUTES = {
        'celery_proj.tasks.dologic': {'queue': 'cq_170'},
        'log_clean': {'queue': 'cq_170'},
    }


CELERYBEAT_SCHEDULE = {
        #定义每小时的10分执行
        'log_clean': {
            "task": "log_clean",
            "schedule": crontab(minute=10),
            "args": ()
            },
        }

另外的:

依赖任务用chain,执行结果可以一级级往下传

并列任务用group

from celery import task, group, chain

@task
def add(x, y):
    return x + y



@task(name="main1")
def test1():
    
    #链级 结果向下传递: 2 + 2 + 4 + 8 = 16
    res = chain(add.s(2, 2), add.s(4), add.s(8))()
    res = (add.s(2, 2) | add.s(4) | add.s(8))()

    #链级 结果没有传递:9
    res = chain(add.si(2, 2), add.si(4, 4), add.si(1, 8))()
    res = (add.si(2, 2) | add.si(4, 4) | add.si(1, 8))()

    return res
    


@task(name="main2")
def test2():
    
    #并列执行
    join = group([
             add.s(2,2),
             add.s(4,4),
             add.s(8,8),
             add.s(16,16),

        ])()

    return res



#先依赖后并行
#res = (add.s(4, 4) | group(add.si(i, i) for i in xrange(10)))()

 

调试运行:


python -m celery -A celery_proj worker -E  -B -Q cq_170 -l info --concurrency=10 -n worker1@%h

#启动worker 
#concurrency 并发数
#-E 启动事件
#-B 同时启动beat,既 定时任务
#-Q 指定队列名:cq_170

#%h	worker1@%h	worker1@george.example.com
#%n	worker1@%n	worker1@george
#%d	worker1@%d	worker1@example.com

后台运行:

python -m celery multi start whxpc  -A celery_proj  -E  -B -Q cq_170 -c10 -n worker1@%h -l debug \
  --logfile="./log/celery.log" --pidfile="./celery.pid"

 

官方文档:http://docs.celeryproject.org/en/latest/

© 著作权归作者所有

弘_轩
粉丝 5
博文 16
码字总数 10368
作品 0
福州
高级程序员
私信 提问
celery任务调度模块

Celery是Python开发的分布式任务调度模块,Celery本身不含消息服务,它使用第三方消息服务来传递任务,目前,Celery支持的消息服务有RabbitMQ、Redis甚至是数据库。 安装celery pip install ...

粗粮面包
2017/12/19
0
0
在django中使用Celery 和 Celery-Flower

在django中使用Celery 和 Celery-Flower 1、Celery方式的选择 这里Celery的中间人,我采用Redis。也可以用Django自身和mongodb等。Celery的中间人你可以理解为在Celery执行过程中的数据支持。...

_Change_
2018/09/25
202
0
Celery 和 Redis 入门

Celery 是一个广泛应用于网络应用程序的任务处理系统。 它可以在以下情况下使用: 在请求响应周期中做网络调用。服务器应当立即响应任何网络请求。如果在请求响应周期内需要进行网络调用,则应...

OneAPM蓝海讯通
2015/08/31
200
0
python-celery使用教程

Celery Celery是Python开发的分布式任务调度模块。分为任务分发,任务队列,worker3个部分。celery的出现,解决了python运行后台任务的需求。 这篇文章介绍的celery版本是3.1.18 celery架构 ...

go-skyblue
2015/07/23
1K
0
python—Celery异步分布式

Celery异步分布式 Celery是一个python开发的异步分布式任务调度模块 Celery本身并不提供消息服务,使用第三方服务,也就是borker来传递任务,目前支持rebbimq,redis, 数据库等 使用redis连...

huangzp168
2017/12/20
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Nervos CKB 脚本编程简介[1]:验证模型

CKB 脚本编程简介[1]: 验证模型 本文作者:Xuejie 原文链接:Introduction to CKB Script Programming 1: Validation Model 本文译者:Jason,Orange 译文链接:https://talk.nervos.org/t/c...

NervosCommunity
30分钟前
4
0
消息中间件——RabbitMQ的高级特性

前言 前面我们介绍了RabbitMQ的安装、各大消息中间件的对比、AMQP核心概念、管控台的使用、快速入门RabbitMQ。本章将介绍RabbitMQ的高级特性。分两篇(上/下)进行介绍。 消息如何保障100%的...

Java架构师ya七
今天
9
0
如何编写高质量的 JS 函数(1) -- 敲山震虎篇

本文首发于 vivo互联网技术 微信公众号 链接:https://mp.weixin.qq.com/s/7lCK9cHmunvYlbm7Xi7JxQ 作者:杨昆 一千个读者,有一千个哈姆雷特。 此系列文章将会从函数的执行机制、鲁棒性、函...

vivo互联网技术
今天
7
0
学会这5个Excel技巧,让你拒绝加班

在网上,随处都可以看到Excel技巧,估计已看腻了吧?但下面5个Excel技巧会让你相见恨晚。关键的是它们个个还很实用 图一 技巧1:快速删除边框 有时当我们处理数据需要去掉边框,按Ctrl+Shif...

干货趣分享
今天
11
0
JS基础-该如何理解原型、原型链?

JS的原型、原型链一直是比较难理解的内容,不少初学者甚至有一定经验的老鸟都不一定能完全说清楚,更多的"很可能"是一知半解,而这部分内容又是JS的核心内容,想要技术进阶的话肯定不能对这个...

OBKoro1
今天
12
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部