文档章节

Celery入门--定时任务的开发及运行

胡佐治
 胡佐治
发布于 2016/09/23 13:29
字数 776
阅读 255
收藏 5

定时任务就是按照执行计划去执行特定的任务。执行定时任务可以选择在配置文件(celeryconfig.py)中配置定时任务的相关信息(官方手册中提供了其他方式可以参考

CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
    'task': 'worker.mytask1.ticket_task', #指定定时任务
    'schedule': timedelta(seconds=30),#指定定时策略
    'args': ('a','b','c'), #任务的参数
    'options':dict(exchange='default',routing_key='default') # 选项参数与apply_async一致 选项参数与apply_async一致,指定exchange和routing_key表示任务会定时发送到default交换机上然后再通过routing_key路由到对应的队列上,本case中会被路由到default.
    },
}

完整的配置内容如下:

# -*- coding:utf-8 -*-
from datetime import timedelta
from kombu import Queue,Exchange

BROKER_URL="amqp://dev:dev12345@192.168.2.16:5672/test"
CELERY_IMPORTS = ('worker.mytask1', 'worker.mytask2','celery.task.http')
CELERY_ENABLE_UTC  = True
CELERY_TIMEZONE = 'Asia/Shanghai'

CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'

#CELERY_RESULT_BACKEND = 'amqp:'
CELERY_RESULT_BACKEND = 'redis://192.168.2.14:6379/8'

CELERY_QUEUES = (
    Queue('default',Exchange('default'),routing_key='default'),
    Queue('for_mytask1_task1',Exchange('for_mytask1_task1'),routing_key='for_mytask1_task1'),
    Queue('for_mytask2_task1',Exchange('for_mytask2_task1'),routing_key='for_mytask2_task1'),
)

CELERy_ROUTES = {
    'worker.mytask1.task1':{'queue':'for_mytask1_task1','routing_key':'for_mytask1_task1'},
    'worker.mytask2.task1':{'queue':'for_mytask2_task1','routing_key':'for_mytask2_task1'},
    'worker.mytask2.ticket_task':{'queue':'default','routing_key':'default'}
}

CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'worker.mytask1.ticket_task',
        'schedule': timedelta(seconds=30),
        'args': ('a','b','c'),
        'options':dict(exchange='default',routing_key='default')
    },
}

在启动worker的时候添加-B选项,定时执行任务

celery worker --config=worker.celeryconfig   -l DEBUG -n worker.mytask1 -B

样列代码:

@app.task(bind=True)
  def ticket_task(self,datetime,from_station='SHH',to_station='GIW'):
  """
  到12306网站上查询上海到贵阳的K739车次的票务情况,如果存在卧铺则发送微信消息通知
  """
  url = 'https://kyfw.12306.cn/otn/lcxxcx/query?purpose_codes=ADULT&queryDate=2016-09-28&from_station=SHH&to_station=GIW'
  res = requests.get(url,verify=False)
  if res and res.status_code == 200:
      result = json.loads(res.text)
      k739_train = filter(lambda x:x.get('station_train_code')=="K739",result.get('data').get('datas'))
      if k739_train:
          train_info = k739_train[0]
          yw_num = train_info.get('yw_num')
          canwebby = train_info.get('canWebBuy')
          yz_num = train_info.get('yz_num')
          body = u"""
          硬卧:%s
          可购买:%s
          硬座数量:%s
          """%(yw_num,canwebby,yz_num)
          task_logger.info(body)
          if yw_num !=u'无':
              url = 'http://localhost/weixin/message/send'
              data = dict(
              user='A363977771',
              content={"content": body},
              appid=5
              )
              res = requests.post(url,json=data)
              return res.text if res else 'error'

定时任务的执行情况

 [2016-09-23 11:06:12,834: INFO/Beat] Scheduler: Sending due task add-every-30-seconds (worker.mytask1.ticket_task)
 [2016-09-23 11:06:12,835: DEBUG/Beat] worker.mytask1.ticket_task sent. id->9d0d8880-5895-4de3-946d-ca4b3efac713##
 [2016-09-23 11:06:12,835: DEBUG/Beat] beat: Waking up in 29.99 seconds.
 [2016-09-23 11:06:12,838: INFO/MainProcess] Received task: worker.mytask1.ticket_task[9d0d8880-5895-4de3-946d-ca4b3efac713]

 [2016-09-23 11:06:12,838: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x7fecd20b9050> (args:(u'worker.mytask1.ticket_task', u'9d0d8880-5895-4de3-946d-ca4b3efac713', [u'a', u'b', u'c'], {}, {u'utc': True, u'is_eager': False, u'chord': None, u'group': None, u'args': [u'a', u'b', u'c'], u'retries': 0, u'delivery_info': {u'priority': 0, u'redelivered': False, u'routing_key': u'default', u'exchange': u'default'}, u'expires': None, u'hostname': 'celery@worker.mytask1', u'task': u'worker.mytask1.ticket_task', u'callbacks': None, u'correlation_id': u'9d0d8880-5895-4de3-946d-ca4b3efac713', u'errbacks': None, u'timelimit': [None, None], u'taskset': None, u'kwargs': {}, u'eta': None, u'reply_to': u'2be4ff2e-904a-3d6d-bc49-6e6cad45ab5a', u'id': u'9d0d8880-5895-4de3-946d-ca4b3efac713', u'headers': {}}) kwargs:{})
 [2016-09-23 11:07:12,881: INFO/Worker-4] Starting new HTTPS connection (1): kyfw.12306.cn
 [2016-09-23 11:07:12,916: WARNING/Worker-4] /usr/local/lib/python2.7/dist-packages/requests/packages/urllib3/connectionpool.py:821: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.org/en/latest/security.html
  InsecureRequestWarning)

 [2016-09-23 11:07:12,975: DEBUG/Worker-4] "GET /otn/lcxxcx/query?purpose_codes=ADULT&queryDate=2016-09-28&from_station=SHH&to_station=GIW HTTP/1.1" 200 None
 [2016-09-23 11:07:12,977: INFO/Worker-4] worker.mytask1.ticket_task[c4d43a06-ce70-4558-b402-7c2c5d0bc4c1]:
            硬卧:无
            可购买:Y
            硬座数量:298
 [2016-09-23 11:07:12,979: INFO/MainProcess] Task worker.mytask1.ticket_task[c4d43a06-ce70-4558-b402-7c2c5d0bc4c1] succeeded in 0.0983848370015s: None

定时任务执行情况

源码地址

© 著作权归作者所有

胡佐治
粉丝 10
博文 32
码字总数 22828
作品 0
上海
后端工程师
私信 提问
异步任务神器 Celery 简明笔记

Celery 在程序的运行过程中,我们经常会碰到一些耗时耗资源的操作,为了避免它们阻塞主程序的运行,我们经常会采用多线程或异步任务。比如,在 Web 开发中,对新用户的注册,我们通常会给他发...

funhacks
2017/11/29
0
0
djcelery入门:实现运行定时任务

基于Django与Celery实现异步对列任务 - Python - 伯乐在线 http://python.jobbole.com/81953/ 更新于2015-08-26 注:本文根据官方文档结合具体例子整理,供celery入门学习,更多内容请移步参...

Kinegratii
2014/07/18
10K
17
分布式任务队列Celery入门与进阶

一、简介   Celery是由Python开发、简单、灵活、可靠的分布式任务队列,其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。Celery侧重于实时操作,但对调度支持也...

W-D
2018/08/22
0
0
高性能异步框架Celery入坑指南

在一个应用服务中,对于时效性要求没那么高的业务场景,我们没必要等到所有任务执行完才返回结果,例如用户注册场景中,保存了用户账号密码之后,就可以立即返回,后续的账号激活邮件,可以用...

刘志军
2018/10/11
0
0
异步任务神器 Celery 简明笔记

在程序的运行过程中,我们经常会碰到一些耗时耗资源的操作,为了避免它们阻塞主程序的运行,我们经常会采用多线程或异步任务。比如,在 Web 开发中,对新用户的注册,我们通常会给他发一封激...

_Change_
2017/10/24
59
0

没有更多内容

加载失败,请刷新页面

加载更多

并发编程之Callable异步,Future模式

Callable 在Java中,创建线程一般有两种方式,一种是继承Thread类,一种是实现Runnable接口。然而,这两种方式的缺点是在线程任务执行结束后,无法获取执行结果。我们一般只能采用共享变量或...

codeobj
37分钟前
5
0
Ubuntu环境下安装PaddlePaddle

开篇 深度学习技术是目前非常热门的技术,笔者在闲暇之余决定学习一下这门技术,入门选择了百度开源的PaddlePaddle框架。 paddlepaddle介绍 飞桨(PaddlePaddle) 是国际领先的端到端开源深度学...

豫华商
今天
5
0
LeetCode 第 287 号问题:寻找重复数,一道非常简单的数组遍历题,加上四个条件后感觉无从下手

今天分享的题目来源于 LeetCode 第 287 号问题:寻找重复数。 题目描述 给定一个包含 n + 1 个整数的数组 nums,其数字都在 1 到 n 之间(包括 1 和 n),可知至少存在一个重复的整数。假设只...

五分钟学算法
今天
6
0
vuex mapActions

本文转载于:专业的前端网站➧vuex mapActions 在组件中使用 this.$store.dispatch('xxx') 分发 action,或者使用 mapActions 辅助函数将组件的 methods 映射为 store.dispatch 调用(需要先在...

前端老手
今天
5
0
使用bash -x 调试信息查看lvextend -r的调用

--1.打开调试: [root@db01 storage]# set -x --2.执行命令: [root@db01 storage]# /usr/sbin/lvextend -r -L 710M /dev/shazam/storage + /usr/sbin/lvextend -r -L 710M /dev/shazam/sto......

突突突酱
今天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部