RabbitMQ如何应对消费出现异常的情况

原创
2015/10/25 01:39
阅读数 2.1W

1,生产者

new_task.py

import pika

if __name__ == '__main__':
    connection=pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel=connection.channel()
    channel.queue_declare("Kadima")
    message="You are awsome!"
    for i in range(0,100):#循环100次发送消息
        channel.basic_publish(exchange="",routing_key='Kadima',body=message+" "+str(i))
    print "sending ",message

2,多个消费者

消费者1,work.py

#-*- coding: UTF-8 -*-
import time
import pika
import sys

__author__ = 'Yue'

var=0

def callback(ch, method, properties, body):
    # temp=var+1 #这里有趣的是不能写成var+=1或者var=var+1,要知道为什么,就需要清楚“Python全局变量和局部变量”
    #global var
    #var+=1
    #if var==20:
        #sys.exit()
    print "1 received %r" % (body,)
    time.sleep(body.count("."))
    print "Done"


if __name__ == '__main__':
    connection=pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel=connection.channel()
    channel.queue_declare("Kadima")
    channel.basic_consume(callback,queue="Kadima",no_ack=True)
    print ' [1] Waiting for messages'
    channel.start_consuming()

work2.py

import time
import pika

__author__ = 'Yue'


def callback(ch, method, properties, body):
    print "2 received %r" % (body,)
    time.sleep(body.count("."))
    print "Done"


if __name__ == '__main__':
    connection=pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel=connection.channel()
    channel.queue_declare("Kadima")
    channel.basic_consume(callback,queue="Kadima",no_ack=True)
    print ' [2] Waiting for messages'
    channel.start_consuming()

3,执行work,work2,new_task

我的启动顺序是work,work2,从执行结果可以看出,RabbitMQ是将task分别依次分发给按照时间顺序注册的work上的,

也就是,task1,task2,task3,task4,它会将task1,task3分发给work,另外两个分发给task3,task4


接下来,有趣的事情就要发生了:

当把work.py中的callback函数的注释内容打开后(起作用是让work处理19个task,便退出程序),MQ并没有将本该分发给work的task分发给work2,那到底去哪里了呢?我暂时假设为work退出时并没有告诉MQ他不干了(他出现异常啦),MQ还是会将task分发给work

4,那没有执行完的任务怎么办呢?

Message acknowledgment :ack默认是打开的

修改work代码如下

#-*- coding: UTF-8 -*-
import time
import pika
import sys

__author__ = 'Yue'

var=0

def callback(ch, method, properties, body):
    print ch, method, properties, body
#     <pika.adapters.blocking_connection.BlockingChannel object at 0x02973BF0> <Basic.
# Deliver(['consumer_tag=ctag1.8b367697d96c4579ba78914d8a4760a8', 'delivery_tag=50
# ', 'exchange=', 'redelivered=False', 'routing_key=Kadima'])> <BasicProperties> Y
# ou are awsome! 98

    temp=var+1 #这里有趣的是不能写成var+=1或者var=var+1,要知道为什么,就需要清楚“Python全局变量和局部变量”
    global var
    var+=1
    if var==20:
        print var , body
        sys.exit()
    print "1 received %r" % (body,)
    # time.sleep(0.1)
    print "Done"
    #设置返回ack的标志,method.delivery_tag是MQ分发给Work时的一个标记
    ch.basic_ack(delivery_tag = method.delivery_tag)


if __name__ == '__main__':
    connection=pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel=connection.channel()
    channel.queue_declare("Kadima")
    channel.basic_consume(callback,queue="Kadima")
    print ' [1] Waiting for messages'
    channel.start_consuming()

work2

def callback(ch, method, properties, body):
    print "2 received %r" % (body,)
    # time.sleep(0.1)
    print "Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)



if __name__ == '__main__':
    connection=pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel=connection.channel()
    channel.queue_declare("Kadima")
    channel.basic_consume(callback,queue="Kadima")
    print ' [2] Waiting for messages'
    channel.start_consuming()

由此可见,MQ会重新将没有执行,或执行失败的任务重新分发给存活的work2,而且,他的分发顺序也很有趣,是在原本应该分发给work2的task执行结束后再去分发未执行的任务。‘


5,思考,如果在work2中

channel.basic_consume(callback,queue="Kadima",no_ack=True)

会出现什么情况。。。











展开阅读全文
打赏
0
3 收藏
分享
加载中
更多评论
打赏
0 评论
3 收藏
0
分享
OSCHINA
登录后可查看更多优质内容
返回顶部
顶部