Springboot集成RabbitMQ

2018/12/21 16:15
阅读数 0

1、

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、根据实际情况配置情况

#IP地址
spring.rabbitmq.host=localhost
#rabbitmq默认端口号
spring.rabbitmq.port=5672
#账户名和密码
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

3、配置交换机(Exchang)与队列(Queue)绑定关系(RouteKey)

@Configuration
public class RabbitConfig {
    /**topic模式 名字可自由定义*/
    public static final String TOPIC_QUEUE1 = "topic.queue1";
    public static final String TOPIC_QUEUE2 = "topic.queue2";
    public static final String TOPIC_EXCHANGE = "topic.exchange";
    /**fanout模式 名字可自由定义*/
    public static final String FANOUT_QUEUE1 = "fanout.queue1";
    public static final String FANOUT_QUEUE2 = "fanout.queue2";
    public static final String FANOUT_EXCHANGE = "fanout.exchange";
    /**redirect模式 名字可自由定义*/
    public static final String DIRECT_QUEUE1 = "direct.queue1";
    public static final String DIRECT_QUEUE2 ="direct.queue2" ;
    public static final String DIRECT_EXCHANGE = "direct.exchange";

    /**
     * Topic模式  非常灵活的,极力推荐
     * 可以自定义RouteKey
     * 符号“#” 匹配一个或多个词
     * 符号“*” 匹配不多不少一个词
     * 容易出现当使用的队列key符合绑定的队列key时,
     * 同一消息会出现在不同的队列中,但一般使用这种模式的不会使用固定的RouteKey
     */
     /**创建TOPIC队列1*/
    @Bean
    public Queue topicQueue1() {
        return new Queue(TOPIC_QUEUE1);
    }
     /**创建TOPIC队列1*/
    @Bean
    public Queue topicQueue2() {
        return new Queue(TOPIC_QUEUE2);
    }
    /**创建TOPIC交换机*/
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }
    /**绑定交换机与队列1*/
    @Bean
    public Binding topicBinding1() {
        return BindingBuilder.bind(topicQueue1()).
            to(topicExchange()).with("topic.message");
    }
    /**绑定交换机与队列2*/
    @Bean
    public Binding topicBinding2() {
        return BindingBuilder.bind(topicQueue2()).
        to(topicExchange()).with("topic.#");
    }

    /**
     * Fanout模式
     * Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,
     * 绑定了这个交换机的所有队列都收到这个消息,
     * 会发生同一个消息出现在不同的队列里
     * @return
     */
    @Bean
    public Queue fanoutQueue1() {
        return new Queue(FANOUT_QUEUE1);
    }
    @Bean
    public Queue fanoutQueue2() {
        return new Queue(FANOUT_QUEUE2);
    }
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE);
    }
    @Bean
    public Binding fanoutBinding1() {
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }
    @Bean
    public Binding fanoutBinding2() {
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }

    /**
     * direct模式
     * 消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 
     * 交换器就将消息发到对应的队列中。路由键与队列名完全匹配  
     * 根据定义的绑定队列KEY使用对应的队列
     * @return
     */
    @Bean
    public Queue directQueue1() {
        return new Queue(DIRECT_QUEUE1);
    }
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(DIRECT_EXCHANGE);
    }
    @Bean
    public Binding directBinding1() {
        return BindingBuilder.bind(directQueue1()).
            to(directExchange()).with("jw");
    }

4、Fanout模式

 

发送者
@Component
public class FanoutSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(MqUserDto user) {
        this.rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE,"", user);
    }
}
接收者
@Component
public class FanoutReceiver {
    /**queues是指要监听的队列的名字*/
    @RabbitListener(queues = RabbitConfig.FANOUT_QUEUE1)
    public void receiveFanout1(MqUserDto user) {
        System.out.println("【receiveFanout1监听到消息】" + user.getId()+" : "+user.getName());
    }
    @RabbitListener(queues = RabbitConfig.FANOUT_QUEUE2)
    public void receiveFanout2(MqUserDto user) {
        System.out.println("【receiveFanout2监听到消息】" + user.getId()+" : "+user.getName());
    }
}

5、Direct模式

发送者
@Component
public class DirectSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    public void send(MqUserDto user) {
        this.rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE,"jw", user);
    }
}
接收者
@Component
public class DirectReceiver {
    /**queues是指要监听的队列的名字*/
    @RabbitListener(queues = RabbitConfig.DIRECT_QUEUE1)
    public void receiveDirect1(MqUserDto user) {
        System.out.println("【receiveDirect1监听到消息】" + user.getId() + " : "+user.getName());
    }

    @RabbitListener(queues = RabbitConfig.DIRECT_QUEUE2)
    public void receiveDirect2(MqUserDto user) {
        System.out.println("【receiveDirect2监听到消息】" + user.getId() + " : "+user.getName());
    }
}

6、Topic模式

发送者
@Component
public class TopicSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    // 第一个参数:TopicExchange名字
    // 第二个参数:Route-Key
    // 第三个参数:要发送的内容
    public void send(MqUserDto user) {
        this.rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE,"jw.message", user);
    }
}
接收者
@Component
public class TopicReceiver {
    // queues是指要监听的队列的名字
    @RabbitListener(queues = RabbitConfig.TOPIC_QUEUE1)
    public void receiveTopic1(MqUserDto user) {
        System.out.println("【receiveTopic1监听到消息】" + user.getId() + " : "+user.getName());
    }

    @RabbitListener(queues = RabbitConfig.TOPIC_QUEUE2)
    public void receiveTopic2(MqUserDto user) {
        System.out.println("【receiveTopic2监听到消息】" + user.getId() + " : "+user.getName());
    }
}

7、DTO

@Getter
@Setter
@ToString
public class MqUserDto implements Serializable {
    private String id;
    private String name;
}

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部