RabbitMq的简单使用

原创
2020/09/14 22:17
阅读数 53

1.pom文件中加入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.3.3.RELEASE</version>
</dependency>

2.配置文件,配置mq

自动配置信息  这里我开启ACK消息确认

server.port=8088
#服务器配置
spring.application.name=rabbitmq-test-sending
#rabbitmq连接参数
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 开启发送确认
spring.rabbitmq.publisher-confirms=true
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true
# 开启ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual

3.Rabbit配置类,使用topic交换器,使用通配符,一个交换器对应多个queue

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitmqConfig {

    //队列
    @Bean
    public Queue queueTest1(){
        return new Queue("queueTest1",true);
    }

    /*
    * 设置消息队列的TTL(过期时间)
    * */
    @Bean
    public Queue queueTest2(){
        /**
         * 队列的名称,是否持久化,是否独享、排外的,是否自动删除,队列的其他属性参数
         * 1x-message-ttl:消息的过期时间,单位:毫秒;
         * 2x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;
         * 3x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;
         * 4x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;
         * 5x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-headreject-publishreject-publish-dlx         * 6x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
         * 7x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
         * 8x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)
         * 9x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
         * 10x-queue-modeLazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;
         * 11x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。
         */
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-message-ttl", 5000);
        return new Queue("queueTest2", true, false, false, arguments);
    }


    //交换机
    @Bean
    public TopicExchange exchangeTest(){
        //可以传exchange名字,是否支持持久化,是否可以自动删除
        return new TopicExchange("exchangeTest",true,false);
    }

    @Bean
    public Binding bindQueueTest1AndExchange(){

        return BindingBuilder.bind(queueTest1()).to(exchangeTest()).with("phone.routing.*");

    }

    @Bean
    public Binding bindQueueTest2AndExchange(){

        return BindingBuilder.bind(queueTest2()).to(exchangeTest()).with("web.routing.*");

    }
}

4.生产者

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.UUID;

/*
* 生产者,带消息确认
* */
@Service
public class PruSender implements RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //routing_key,把消息发送到相应的队列中
    public void sendMessage(String routing_key){
        //发送内容
        String context = "你好现在是 " + new Date();
        this.rabbitTemplate.setReturnCallback(this);//发送失败退回
        this.rabbitTemplate.setConfirmCallback((correlationData,ack,message)->{//手动发送消息确认
            if(!ack){
                System.out.println("消息发送失败" + message + correlationData.toString());
            }else{
                System.out.println("消息发送成功" + correlationData.toString());
            }
        });
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());
        //交换机名称、routingKey、内容、消息Id
        this.rabbitTemplate.convertAndSend("exchangeTest",routing_key, context,correlationData);
    }

    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {

        System.out.println("sender return success" + message.toString() + "===" + i + "===" + s1 + "===" + s2);
    }
}

5.消费者

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/*
* 消费者,带消息确认
*
* */
@Service
@RabbitListener(queues = "queueTest")
public class Receiver {

    //消息内容,通道,消息的属性信息
    @RabbitHandler
    public void immediateProcess(String text,Channel channel,Message message) throws IOException {

        try {
            System.out.println("Receiver" + text);

            /**
             * 手动确认,通知mq已经成功消费改条信息,可以删除了
             * //消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
             */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

        } catch (IOException e) {
            e.printStackTrace();
            /*
            *消费消息失败
            * 第二个参数是否应用于多消息,第三个参数是否从新计入队列
            * */
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
        }


    }

}

交换机类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部