RabbitMQ 之 订阅模式 Publish/Subscribe

2018/12/04 18:31
阅读数 22

模型图

我们之前学习的都是一个消息只能被一个消费者消费,那么如果我想发一个消息 能被多个消费者消费,这时候怎么办? 这时候我们就得用到了消息中的发布订阅模型

在前面的教程中,我们创建了一个工作队列,都是一个任务只交给一个消费者。这次我们做 将消息发送给多个消费者。这种模式叫做“发布/订阅”。

举列:    

类似微信订阅号 发布文章消息 就可以广播给所有的接收者。(订阅者)

那么咱们来看一下图,我们学过前两种有一些不一样,work 模式 是不是同一个队列 多个消费者,而 ps 这种模式呢,是一个队列对应一个消费者,pb 模式还多了一个 X(交换机 转发器) ,这时候我们要获取消息 就需要队列绑定到交换机上,交换机把消息发送到队列 , 消费者才能获取队列的消息

解读:
1、1 个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机(转发器)
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的

生产者

 1 package cn.wh.simple;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 
 7 import cn.wh.util.RabbitMqConnectionUtil;
 8 import com.rabbitmq.client.Channel;
 9 import com.rabbitmq.client.Connection;
10 
11 public class Send {
12 
13     private static final String  EXCHANGE_NAME="test_exchange_fanout";
14     public static void main(String[] args) throws IOException, TimeoutException {
15         
16         Connection connection = RabbitMqConnectionUtil.getConnection();
17         
18         Channel channel = connection.createChannel();
19         
20         //声明交换机
21         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//分发
22         
23         //发送消息
24         String msg="hello ps";
25         
26         channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
27         
28         System.out.println("Send :"+msg);
29         
30         channel.close();
31         connection.close();
32     }
33 }

消费者1

 1 package cn.wh.simple;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 import cn.wh.util.RabbitMqConnectionUtil;
 7 
 8 import com.rabbitmq.client.Channel;
 9 import com.rabbitmq.client.Connection;
10 import com.rabbitmq.client.Consumer;
11 import com.rabbitmq.client.DefaultConsumer;
12 import com.rabbitmq.client.Envelope;
13 import com.rabbitmq.client.AMQP.BasicProperties;
14 
15 public class Recv1 {
16     
17     private static final String QUEUE_NAME="test_queue_fanout_email";
18     private static final String  EXCHANGE_NAME="test_exchange_fanout";
19     public static void main(String[] args) throws IOException, TimeoutException {
20         Connection connection = RabbitMqConnectionUtil.getConnection();
21         final Channel channel = connection.createChannel();
22         
23         //队列声明
24         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
25         
26         //绑定队列到交换机 转发器
27         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
28         
29         
30         channel.basicQos(1);//保证一次只分发一个  
31         
32         //定义一个消费者
33         Consumer consumer=new DefaultConsumer(channel){
34             //消息到达 触发这个方法
35             @Override
36             public void handleDelivery(String consumerTag, Envelope envelope,
37                     BasicProperties properties, byte[] body) throws IOException {
38              
39                 String msg=new String(body,"utf-8");
40                 System.out.println("[1] Recv msg:"+msg);
41                 
42                 try {
43                     Thread.sleep(2000);
44                 } catch (InterruptedException e) {
45                     e.printStackTrace();
46                 }finally{
47                     System.out.println("[1] done ");
48                     channel.basicAck(envelope.getDeliveryTag(), false);
49                 }
50             }
51         };
52         
53         boolean autoAck=false;//自动应答 false
54         channel.basicConsume(QUEUE_NAME,autoAck , consumer);
55     }
56 }

消费者2

package cn.wh.simple;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


import cn.wh.util.RabbitMqConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Recv2 {
    
    private static final String QUEUE_NAME="test_queue_fanout_sms";
    private static final String  EXCHANGE_NAME="test_exchange_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMqConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();

        //队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //绑定队列到交换机 转发器
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        channel.basicQos(1);//保证一次只分发一个

        //定义一个消费者
        Consumer consumer=new DefaultConsumer(channel){
            //消息到达 触发这个方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    BasicProperties properties, byte[] body) throws IOException {

                String msg=new String(body,"utf-8");
                System.out.println("[2] Recv msg:"+msg);

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally{
                    System.out.println("[2] done ");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean autoAck=false;//自动应答 false
        channel.basicConsume(QUEUE_NAME,autoAck , consumer);
    }
}

 测试

一个消息 可以被多个消费者

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部