文档章节

springboot+rabbitmq整合

狼王黄师傅
 狼王黄师傅
发布于 2018/10/19 13:06
字数 2784
阅读 42
收藏 10

1.安装好rabbitmq

2.新建一个springBoot项目:rabbitmq_demo

3.添加pom依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>  
        <groupId>org.springframework.boot</groupId>  
        <artifactId>spring-boot-starter-test</artifactId>  
        <scope>test</scope>  
    </dependency>  
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

4.application.properties:

server.port=8080
spring.application.name=rabbitmq_demo
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/

5.启动类声明一个Queue,用于测试:

package com;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class RabbitmqDemoApplication {
    @Bean
    public Queue helloQueue() {
        return new Queue("helloQueue");
    }

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqDemoApplication.class, args);
    }
}

多场景实现:

1.单生产者和单消费者

    生产者1:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Description:
 * 生产者1
 */
@Component
public class Sender1 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String sendMsg = "hello1 " + new Date();
        System.out.println("Sender1:" + sendMsg);
        rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }
}

    消费者1:

package com.demo.receiver;

import com.demo.model.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * helloQueue消费者1
 */
@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver1 {
    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver1:" + hello);
    }
}

    测试controller:

package com.demo.controller;

import com.demo.model.User;
import com.demo.sender.Sender1;
import com.demo.sender.Sender2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description: 测试类
 */
@RestController
public class RabbitController {

    @Autowired
    private Sender1 helloSender1;

    @RequestMapping("/hello")
    public String hello() {
        helloSender1.send();
        return "ok";
    }
}

    运行项目,访问http:localhost:8080/hello :

    Sender1:hello1 Thu May 11 17:23:31 CST 2017

    Receiver1:hello1 Thu May 11 17:23:31 CST 2017

2.单生产者-多消费者

    生产者1不变

    增加消费者2:

package com.demo.receiver;

import com.demo.model.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * helloQueue消费者2
 */
@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver2 {
    @RabbitHandler
    public void process(String mesg) {
        System.out.println("Receiver2:" + mesg);
    }
}

    测试controller:

package com.demo.controller;

import com.demo.model.User;
import com.demo.sender.Sender1;
import com.demo.sender.Sender2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description: 测试类
 */
@RestController
public class RabbitController {

    @Autowired
    private Sender1 helloSender1;

    @RequestMapping("/hello")
    public String hello() {
        helloSender1.send();
        helloSender1.send();
        return "ok";
    }
}

    运行项目,访问http:localhost:8080/hello :

    Sender1:hello1 Thu May 11 17:23:31 CST 2017
    Sender1:hello1 Thu May 11 17:23:31 CST 2017

    Receiver1:hello1 Thu May 11 17:23:31 CST 2017
    Receiver2:hello1 Thu May 11 17:23:31 CST 2017

    消息会被多个消费者交替消费,每条消息只能被一个消费者所接收。

3.多生产者-多消费者

    增加生产者2:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Description: 生产者2
 */
@Component
public class Sender2 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String sendMsg = "hello2 " + new Date();
        System.out.println("Sender2:" + sendMsg);
        rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }
}

    消费者1、2不变

    测试controller:

package com.demo.controller;

import com.demo.model.User;
import com.demo.sender.Sender1;
import com.demo.sender.Sender2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description: 测试类
 */
@RestController
public class RabbitController {

    @Autowired
    private Sender1 helloSender1;

    @Autowired
    private Sender2 helloSender2;

    @RequestMapping("/hello")
    public String hello() {
        helloSender1.send();
        helloSender2.send();
        return "ok";
    }
}

    运行项目,访问http:localhost:8080/hello :

    Sender1:hello1 Thu May 11 17:23:31 CST 2017
    Sender2:hello2 Thu May 11 17:23:31 CST 2017

    Receiver1:hello2 Thu May 11 17:23:31 CST 2017
    Receiver2:hello1 Thu May 11 17:23:31 CST 2017

    多个生产者将消息放入helloQueue的队列中,队列中的消息会被多个消费者交替消费,每条消息只能被一个消费者所接收。

4.实体类传输

    支持对象的发送和接收,实体类只需要支持序列化即可。

    实体类

package com.demo.model;

import java.io.Serializable;

/**
 * @Description:
 */
public class User implements Serializable {
    private String userName;

    private String password;

    private String sex;

    private String level;

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public String getSex() {
        return sex;
    }

    public void setSex(String sex) {
        this.sex = sex;
    }

    public String getLevel() {
        return level;
    }

    public void setLevel(String level) {
        this.level = level;
    }
}

    生产者1:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Description:
 * 生产者1
 */
@Component
public class Sender1 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String sendMsg = "hello1 " + new Date();
        System.out.println("Sender1:" + sendMsg);
        rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }

    public void sendUser(User user){
        System.out.println("user Sender1:" + user.getUserName()+"/"+user.getPassword());
        rabbitTemplate.convertAndSend("helloQueue", user);
    }
}

    生产者2:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Description: 生产者2
 */
@Component
public class Sender2 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String sendMsg = "hello2 " + new Date();
        System.out.println("Sender2:" + sendMsg);
        rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }

    public void sendUser(User user) {
        System.out.println("user Sender2:" + user.getUserName() + "/" + user.getPassword());
        rabbitTemplate.convertAndSend("helloQueue", user);
    }
}

    消费者1:

package com.demo.receiver;

import com.demo.model.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * helloQueue消费者1
 */
@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver1 {
    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver1:" + hello);
    }

    @RabbitHandler
    public void processUser(User user) {
        System.out.println("user receive1:" + user.getUserName()+"/"+user.getPassword());
    }
}

    消费者2:

package com.demo.receiver;

import com.demo.model.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * helloQueue消费者2
 */
@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver2 {
    @RabbitHandler
    public void process(String mesg) {
        System.out.println("Receiver2:" + mesg);
    }

    @RabbitHandler
    public void processUser(User user) {
        System.out.println("user receive2:" + user.getUserName()+"/"+user.getPassword());
    }
}

    测试的controller:

package com.demo.controller;

import com.demo.model.User;
import com.demo.sender.Sender1;
import com.demo.sender.Sender2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description: 测试类
 */
@RestController
public class RabbitController {

    @Autowired
    private Sender1 helloSender1;

    @Autowired
    private Sender2 helloSender2;

    @RequestMapping("/hello")
    public String hello() {
        helloSender1.send();
        helloSender2.send();
        return "ok";
    }

    @RequestMapping("/user")
    public String user() {
        User user=new User();
        user.setUserName("a");
        user.setPassword("1");
        user.setSex("m");
        user.setLevel("1");
        helloSender1.sendUser(user);
        helloSender2.sendUser(user);
        return "ok";
    }
}

    运行项目,访问http:localhost:8080/user :

    user Sender1:a/1
    user Sender2:a/1

    user receive1:a/1
    user receive2:a/1

5.TopicExchange的使用

    启动类新增声明两个Queue,用于测试:

package com;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class RabbitmqDemoApplication {
    /***************************************队列***********************************************/
    @Bean
    public Queue helloQueue() {
        return new Queue("helloQueue");
    }

    @Bean
    public Queue topicMessage() {
        return new Queue("topicMessage");
    }

    @Bean
    public Queue topicMessages() {
        return new Queue("topicMessages");
    }
    /***************************************exchange***********************************************/
    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange("topicExchange");
    }

    /***************************************将队列和exchange绑定***********************************************/

    /**
     * 将队列topicMessage与topicExchange绑定,
     * 只有栏目名为topic.Message才能匹配,
     * 得到当前的Queue
     * @param topicMessage
     * @param topicExchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessage(Queue topicMessage, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicMessage).to(topicExchange).with("topic.Message");
    }

    /**
     * 将队列topicMessages与topicExchange绑定,
     * 以topic开头的栏目名均会模糊匹配,
     * 得到当前的Queue
     * @param topicMessages
     * @param topicExchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessages(Queue topicMessages, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicMessages).to(topicExchange).with("topic.#");
    }

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqDemoApplication.class, args);
    }
}

    生产者1:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Description:
 * 生产者1
 */
@Component
public class Sender1 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String sendMsg = "hello1 " + new Date();
        System.out.println("Sender1:" + sendMsg);
        rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }

    public void sendUser(User user){
        System.out.println("user Sender1:" + user.getUserName()+"/"+user.getPassword());
        rabbitTemplate.convertAndSend("helloQueue", user);
    }

    public void testTopPicMessage() {
        String msg = "sendTopPicMessage";
        System.out.println("sendTopPicMessage1:" + msg);
        //第一个参数:指定了exchange
        //第二个参数:指定了接受消息的栏目名
        //第三个参数:消息内容
        //到指定exchange找出第二个参数符合的正则表达式,得到对应的Queue,监听相应Queue的消费者接受到消息
        rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#两个都符合

        msg = "sendTopPicMessages";
        System.out.println("sendTopPicMessages1:" + msg);
        rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合
    }
}

    生产者2:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Description: 生产者2
 */
@Component
public class Sender2 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String sendMsg = "hello2 " + new Date();
        System.out.println("Sender2:" + sendMsg);
        rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }

    public void sendUser(User user) {
        System.out.println("user Sender2:" + user.getUserName() + "/" + user.getPassword());
        rabbitTemplate.convertAndSend("helloQueue", user);
    }

    public void testTopPicMessage() {
        String msg = "sendTopPicMessage";
        System.out.println("sendTopPicMessage2:" + msg);
        //第一个参数:指定了exchange
        //第二个参数:指定了接受消息的栏目名
        //第三个参数:消息内容
        //到指定exchange找出第二个参数符合的正则表达式,得到对应的Queue,监听相应Queue的消费者接受到消息
        rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#两个都符合

        msg = "sendTopPicMessages";
        System.out.println("sendTopPicMessages2:" + msg);
        rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合
    }
}

    topicMessage消费者:

package com.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * topicMessage消费者
 */
@Component
@RabbitListener(queues = "topicMessage")
public class TopMessageReceiver {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("topMessageReceiver:" +msg);
    }
}

     topicMessages消费者:

package com.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * topicMessages消费者
 */
@Component
@RabbitListener(queues = "topicMessages")
public class TopMessagesReceiver {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("topMessagesReceiver:" +msg);
    }
}

    测试controller:

package com.demo.controller;

import com.demo.model.User;
import com.demo.sender.Sender1;
import com.demo.sender.Sender2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description: 测试类
 */
@RestController
public class RabbitController {

    @Autowired
    private Sender1 helloSender1;

    @Autowired
    private Sender2 helloSender2;

    @RequestMapping("/hello")
    public String hello() {
        helloSender1.send();
        helloSender2.send();
        return "ok";
    }

    @RequestMapping("/user")
    public String user() {
        User user=new User();
        user.setUserName("a");
        user.setPassword("1");
        user.setSex("m");
        user.setLevel("1");
        helloSender1.sendUser(user);
        helloSender2.sendUser(user);
        return "ok";
    }

    @RequestMapping("/topMessage")
    public String topMessage() {
        helloSender1.testTopPicMessage();
        helloSender2.testTopPicMessage();
        return "ok";
    }
}

    运行项目,访问http:localhost:8080/topMessage :

    sendTopPicMessage1:sendTopPicMessage
    sendTopPicMessages1:sendTopPicMessages

    sendTopPicMessage2:sendTopPicMessage
    sendTopPicMessages2:sendTopPicMessages

    topMessageReceiver:sendTopPicMessage
    topMessagesReceiver:sendTopPicMessage
    topMessagesReceiver:sendTopPicMessages

    topMessageReceiver:sendTopPicMessage
    topMessagesReceiver:sendTopPicMessage
    topMessagesReceiver:sendTopPicMessages

    通过exchange发送的每条消息,所有的消费者都能收到。

需要注意:

    rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#两个都符合,因此两个消费者都收到消息
    rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合,只有topMessages符合接受消息的条件

6.FanoutExchange的使用

    启动类新增声明三个Queue,用于测试:

package com;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class RabbitmqDemoApplication {
    /***************************************队列***********************************************/
    @Bean
    public Queue helloQueue() {
        return new Queue("helloQueue");
    }

    @Bean
    public Queue topicMessage() {
        return new Queue("topicMessage");
    }

    @Bean
    public Queue topicMessages() {
        return new Queue("topicMessages");
    }

    @Bean
    public Queue fanoutA() {
        return new Queue("fanoutA");
    }

    @Bean
    public Queue fanoutB() {
        return new Queue("fanoutB");
    }

    @Bean
    public Queue fanoutC() {
        return new Queue("fanoutC");
    }
    /***************************************exchange***********************************************/
    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange("topicExchange");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    /***************************************将队列和exchange绑定***********************************************/

    /**
     * 将队列topicMessage与topicExchange绑定,
     * 只有栏目名为topic.Message才能匹配,
     * 得到当前的Queue
     * @param topicMessage
     * @param topicExchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessage(Queue topicMessage, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicMessage).to(topicExchange).with("topic.Message");
    }

    /**
     * 将队列topicMessages与topicExchange绑定,
     * 以topic开头的栏目名均会模糊匹配,
     * 得到当前的Queue
     * @param topicMessages
     * @param topicExchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessages(Queue topicMessages, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicMessages).to(topicExchange).with("topic.#");
    }

    /**
     * 将队列fanoutA与fanoutExchange绑定
     *
     * @param fanoutA
     * @param fanoutExchange
     * @return
     */
    @Bean
    Binding bindingExchangeA(Queue fanoutA, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutA).to(fanoutExchange);
    }

    /**
     * 将队列fanoutA与fanoutExchange绑定
     *
     * @param fanoutB
     * @param fanoutExchange
     * @return
     */
    @Bean
    Binding bindingExchangeB(Queue fanoutB, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutB).to(fanoutExchange);
    }

    /**
     * 将队列fanoutA与fanoutExchange绑定
     *
     * @param fanoutC
     * @param fanoutExchange
     * @return
     */
    @Bean
    Binding bindingExchangeC(Queue fanoutC, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutC).to(fanoutExchange);
    }

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqDemoApplication.class, args);
    }
}

    生产者1:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Description:
 * 生产者1
 */
@Component
public class Sender1 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String sendMsg = "hello1 " + new Date();
        System.out.println("Sender1:" + sendMsg);
        rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }

    public void sendUser(User user){
        System.out.println("user Sender1:" + user.getUserName()+"/"+user.getPassword());
        rabbitTemplate.convertAndSend("helloQueue", user);
    }

    public void testTopPicMessage() {
        String msg = "sendTopPicMessage";
        System.out.println("sendTopPicMessage1:" + msg);
        //第一个参数:指定了exchange
        //第二个参数:指定了接受消息的栏目名
        //第三个参数:消息内容
        //到指定exchange找出第二个参数符合的正则表达式,得到对应的Queue,监听相应Queue的消费者接受到消息
        rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#两个都符合

        msg = "sendTopPicMessages";
        System.out.println("sendTopPicMessages1:" + msg);
        rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合
    }

    public void testFanoutMessage(){
        String sendMsg = "sendFanoutMessage";
        System.out.println("fanout Sender1:" + sendMsg);
        //第二个参数不会进行正则表达式的过滤
        //但是必须要填,才能根据exchange找到相关Queue
        rabbitTemplate.convertAndSend("fanoutExchange","", sendMsg);
    }
}

    生产者2:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Description: 生产者2
 */
@Component
public class Sender2 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String sendMsg = "hello2 " + new Date();
        System.out.println("Sender2:" + sendMsg);
        rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }

    public void sendUser(User user) {
        System.out.println("user Sender2:" + user.getUserName() + "/" + user.getPassword());
        rabbitTemplate.convertAndSend("helloQueue", user);
    }

    public void testTopPicMessage() {
        String msg = "sendTopPicMessage";
        System.out.println("sendTopPicMessage2:" + msg);
        //第一个参数:指定了exchange
        //第二个参数:指定了接受消息的栏目名
        //第三个参数:消息内容
        //到指定exchange找出第二个参数符合的正则表达式,得到对应的Queue,监听相应Queue的消费者接受到消息
        rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#两个都符合

        msg = "sendTopPicMessages";
        System.out.println("sendTopPicMessages2:" + msg);
        rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合
    }

    public void testFanoutMessage(){
        String sendMsg = "sendFanoutMessage";
        System.out.println("fanout Sender2:" + sendMsg);
        //第二个参数不会进行正则表达式的过滤
        //但是必须要填,才能根据exchange找到相关Queue
        rabbitTemplate.convertAndSend("fanoutExchange","", sendMsg);
    }
}

    fanoutA消费者

package com.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * fanoutA消费者
 */
@Component
@RabbitListener(queues = "fanoutA")
public class FanoutReceiverA {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverA:" + msg);
    }

}

    fanoutB消费者

package com.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * fanoutB消费者
 */
@Component
@RabbitListener(queues = "fanoutB")
public class FanoutReceiverB {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverB:" + msg);
    }

}

    fanoutC消费者

package com.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * fanoutC消费者
 */
@Component
@RabbitListener(queues = "fanoutC")
public class FanoutReceiverC {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverC:" + msg);
    }

}

  测试controller:

package com.demo.controller;

import com.demo.model.User;
import com.demo.sender.Sender1;
import com.demo.sender.Sender2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description: 测试类
 */
@RestController
public class RabbitController {

    @Autowired
    private Sender1 helloSender1;

    @Autowired
    private Sender2 helloSender2;

    @RequestMapping("/hello")
    public String hello() {
        helloSender1.send();
        helloSender2.send();
        return "ok";
    }

    @RequestMapping("/user")
    public String user() {
        User user=new User();
        user.setUserName("a");
        user.setPassword("1");
        user.setSex("m");
        user.setLevel("1");
        helloSender1.sendUser(user);
        helloSender2.sendUser(user);
        return "ok";
    }

    @RequestMapping("/topMessage")
    public String topMessage() {
        helloSender1.testTopPicMessage();
        helloSender2.testTopPicMessage();
        return "ok";
    }

    @RequestMapping("/fanoutMessage")
    public String fanoutMessage() {
        helloSender1.testFanoutMessage();
        helloSender2.testFanoutMessage();
        return "ok";
    }
}

    运行项目,访问http:localhost:8080/fanoutMessage :

    fanout Sender1:sendFanoutMessage
    fanout Sender2:sendFanoutMessage
   
    FanoutReceiverA:sendFanoutMessage
    FanoutReceiverB:sendFanoutMessage
    FanoutReceiverC:sendFanoutMessage

    FanoutReceiverA:sendFanoutMessage
    FanoutReceiverB:sendFanoutMessage
    FanoutReceiverC:sendFanoutMessage

    通过exchange发送的每条消息,所有的消费者都能收到。

© 著作权归作者所有

狼王黄师傅
粉丝 16
博文 258
码字总数 542369
作品 0
成都
程序员
私信 提问
springboot-MQRPC —— RPC 调用框架

一个简单便捷的基于 springboot+RabbitMQ 中间件实现的 RPC 调用框架。

达尔文
2016/12/07
44
0
springboot+rabbitmq问题

spring boot + rabbitmq 两次请求为什么只能接收到一次参数 controller config配置 求解...............................................................................

桑灬桑
2017/12/07
183
1
RPC 调用框架--springboot-MQRPC

一个简单便捷的基于springboot+RabbitMQ中间件实现的RPC调用框架 远程调用过程如下 首先:消费者和生产者spring容器初始化的时候,会根据配置的的api在RabbitMQ上建立相应的队列,消费者会监听...

KL博客
2016/12/05
7.3K
6
springboot+rabbitmq 一个队列开了300个消费者 速度很慢是什么原因?

公司系统要将商品上架到拼多多商家后台中, 每天最少10万件,算了算平均 一分钟要上传最少70个商品,单线程的情况下一件商品需要1分钟左右,在本地开了50个消费者测试上架速度 有显著提升,但...

社会小青年丶
05/22
0
0
vSphere 5.5 VM整合磁盘失败之—文件被锁定无法访问

vSphere 5.5 VM整合磁盘失败之—文件被锁定无法访问 环境:vSPhere 5.5u3,虚机使用EMC的networker备份 问题现象:在vc上发现,晚上经过networker的备份之后,虚机提示需要整合磁盘 解决前相...

Makka_Pakka
2018/07/06
0
0

没有更多内容

加载失败,请刷新页面

加载更多

css3D 全景图 - 含有定位的文字Label

查看全景效果 完整代码 <style type="text/css"> body { margin: 0px; } .canvasWrap { width: 100%; height: 600px; backgro......

tianyawhl
29分钟前
1
0
JS利用新的参数刷新easyui的tab的panel的url

思路:根据浏览器地址,截图?之前的url作为head,然后把url后面的参数解析为object对象,再根据传入的参数对象,重置参数对象,最后重新拼接url JS: /** * 更新panel的url * @returns */fu...

文文1
29分钟前
1
0
jmeter集群测试。

jmeter集群测试的官方文档:http://jmeter.apache.org/usermanual/jmeter_distributed_testing_step_by_step.html 一、测试机器。 集群测试的电脑,分为2种: Master 运行JMeter GUI 界面(j...

王坤charlie
30分钟前
1
0
网页结构简介

有人说“互联网中有50%以上的流量是爬虫”,第一次听这句话也许你会觉得这个说法实在太夸张了,怎么可能爬虫比用户还多呢?毕竟会爬虫的相对与不会爬虫的简直少之又少。 但是很多爬虫工程师或...

猪哥66
32分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部