rabbitMQ 在spring 的使用

2019/01/22 23:45
阅读数 267

一、准备工作

maven依赖

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>4.0.2</version>
</dependency>
 
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>1.7.9.RELEASE</version>
</dependency>

创建配置文件spring-rabbitmq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                           http://www.springframework.org/schema/beans/spring-beans.xsd
                           http://www.springframework.org/schema/rabbit
                           http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
 
</beans>


 二、配置消息生产者

1、配置连接

<!--连接工厂-->
<rabbit:connection-factory id="rabbitConnectionFactory" host="127.0.0.1" port="5672"
                           username="test" password="test"  virtual-host="/test"
                           channel-cache-size="50" />

 注:该配置还有publisher-confirms、publisher-returns等参数,用于消息确认。

 

2、配置admin:producer中的exchange,queue会自动的利用该admin自动在spring中生成

<!-- 定义admin,producer中的exchange,queue会自动的利用该admin自动在spring中生成 -->
<rabbit:admin connection-factory="rabbitConnectionFactory"/>


3、定义rabbitmq模板(消息生产者通过模板类进行推送数据)

<!-- 定义rabbitmq模板,指定连接工厂、exchange、queue等 -->
<rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" exchange="XXX" queue="XXX"/>


注意:

    消费发送是通过rabbitTemplate.convertAndSend()这个方法进行发送的。

    rabbitTemplate.convertAndSend()用三个参数,分别代表exchange、routing-key(queue)、message,如果调用时不写exchange,第二个参数代表queue。

    比如rabbitTemplate.convertAndSend(queue,message)表示将消息直接发布到queue队列中

    如果在模板中指定了默认exchange和queue,如果消息在发布时没指定exchange和queue,则消息直接通过默认的exchange将消息推送给对应的queue,如果只配置了queue,则表示直接将消息发布到queue队列中。

    我通常的做法是不指定exchange和queue,通过代码进行指定。


4、配置队列

<!-- 队列声明 :
     durable:true、false true:在服务器重启时,能够存活
     exclusive :当连接关闭后是否自动删除队列;是否私有队列,如果私有,其他通道不能访问当前队列
     autodelete:当没有任何消费者使用时,自动删除该队列 -->
<!-- 用于发布/订阅模式的队列 -->
<rabbit:queue name="myFanoutQueue" durable="true" exclusive="false" auto-delete="false"/>
<!-- 用于路由模式的队列 -->
<rabbit:queue name="myDirectQueue" durable="true" exclusive="false" auto-delete="false"/>
<!-- 用于主题模式的队列 -->
<rabbit:queue name="myTopicQueue_error"  durable="true" exclusive="false" auto-delete="false"/>
<rabbit:queue name="myTopicQueue_warn"  durable="true" exclusive="false" auto-delete="false"/>


5、设置exchange,并且配置与队列queue的关系(durable、auto-delete与队列参数同一个意思)

<!-- 定义交互机 发布/订阅模式 -->
<rabbit:fanout-exchange name="myFanoutExchange" durable="true" auto-delete="false">
    <rabbit:bindings>
        <rabbit:binding queue="myFanoutQueue"></rabbit:binding>
    </rabbit:bindings>
</rabbit:fanout-exchange>
 
<!-- 定义交互机 路由模式(需要routing-key) -->
<rabbit:direct-exchange name="myDirectExchange" durable="true" auto-delete="false">
    <rabbit:bindings>
        <rabbit:binding queue="myDirectQueue" key="error"></rabbit:binding>
    </rabbit:bindings>
</rabbit:direct-exchange>
 
 
<!--定义交互机 主题模式 -->
<rabbit:topic-exchange name="myTopicExchange" durable="true" auto-delete="false">
    <rabbit:bindings>
       <rabbit:binding queue="myTopicQueue_error" pattern="error.#" ></rabbit:binding>
        <rabbit:binding queue="myTopicQueue_error" pattern="warn.#" ></rabbit:binding>
       <rabbit:binding queue="myTopicQueue_warn" pattern="warn.*"></rabbit:binding>
    </rabbit:bindings>
</rabbit:topic-exchange>


注意:

    路由模式需要指定key,表示exchange通过key(routing-key)将消息发布到queue中;
    主题模式是通过pattern参数来表示routing-key的

 

6、定义消息发布类

public class SpringSender {
    public static void sendMessage(String exchange,String routingKey,Object  message){
        //加载配置文件
        AbstractApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring-rabbitmq.xml");
        //获取rabbitmqTemplate模板
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        //发送消息
        rabbitTemplate.convertAndSend(exchange,routingKey,message);
    }
 
    public static void main(String[] args) throws Exception{
        SpringSender.sendMessage("myTopicExchange","warn.item","主题模式,发布警告信息");
    }
}


注意:

    根据上面消息的配置,我们可以知道该消息会发布到myTopicQueue_error和myTopicQueue_warn两个队列中,因为routing-key为“warn.item”,符合上述其与exchange的绑定关系。

    rabbitTemplate.convertAndSend()通过多态实现的。如果exchange为空,等同与调用rabbitTemplate.convertAndSend(routingKey,message),表示将message发送到名为“routingKey”的队列中。

    如果exchange、routingKey为空时,等同于调用rabbitTemplate.convertAndSend(message),此时rabbitmq模板需要指定默认queue。

    如果rabbitmq模板指定了默认exchange、queue,但是程序里调用发布消息的方法也指定了exchange、queue,那么以程序里面的参数为准

    如果rabbitmq模板指定了默认exchange,但是调用rabbitTemplate.convertAndSend(routingKey,message)时,routingKey(queue)跟exchange没有绑定关系,发送数据失败。

 

rabbitTemplate.convertAndSend方法说明:    

/**
 * 发送消息到默认的交换机和队列(不带有自定义参数)
 * @param messageObject 消息对象
 * @return boolean 发送标记 
 */
RabbitTemplate.convertAndSend(messageObject);
 
/**
 * 发送消息到默认的交换机和队列
 * @param messageObject 消息对象
 * @param messageObject 自定义参数,在监听器ConfirmCallback中可以取到。
 * @return boolean 发送标记 
 */
RabbitTemplate.correlationConvertAndSend(messageObject,correlationdata);
 
/**
 * 发送消息到指定的队列
 * @param queue           队列名称
 * @param messageObject   消息对象
 * @param messageObject 自定义参数,在监听器ConfirmCallback中可以取到。
 * @return boolean 发送标记 
 */
RabbitTemplate.convertAndSend(queue, messageObject,correlationdata);
 
/**
 * 发送消息到指定的交换机和队列
 * @param exchange       交换机名称
 * @param queue          队列名称
 * @param messageObject 自定义参数,在监听器ConfirmCallback中可以取到。
 * @return boolean 发送标记 
 */
RabbitTemplate.convertAndSend(exchange,queue,messageObject,correlationdata);
 
/**
 * 发送消息到默认的交换机和队列(不带有自定义参数)
 Send方法还有很多,此处只列举一种
 * @param Message AMQP封装的消息对象
 * @return void
 */
RabbitTemplate.send(Message message);
 

 

三、配置消息消费者

1、定义主题模式的两个实现类

/**
 * 用于接收routing-key为warn或error的消息
 */
public class TopicErrorReceiver implements ChannelAwareMessageListener{
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try{
            System.out.println("************************ddd********************************");
            System.out.println("主题模式 warn/error 接收信息:"+new String(message.getBody()));
            System.out.println("********************************************************");
            //设置手工应答
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (Exception e){
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
    }
}
 
/**
 * 用于接收routing-key为warn的消息
 */
public class TopicWarnReceiver implements ChannelAwareMessageListener{
 
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try{
            System.out.println("************************ddd********************************");
            System.out.println("主题模式 warm 接收信息:"+new String(message.getBody()));
            System.out.println("********************************************************");
            //设置手工应答
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (Exception e){
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
    }
}

    注:接收类都是实现了ChannelAwareMessageListener接口,并重写了onMessage方法

 

2、定义监听,并绑定接收者与队列的关系(即接收者监听哪些队列)

<!-- 定义监听 -->
<rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="manual" >
    <!-- 发布订阅模式监听-->
    <rabbit:listener ref="directReceiver"  queue-names="myFanoutQueue"/>
    <!-- 路由模式监听 -->
    <rabbit:listener ref="directReceiver"  queue-names="myDirectQueue"/>
    <!-- 主题模式监听-->
    <rabbit:listener ref="topicErrorReceiver" queue-names="myTopicQueue_error"/>
    <rabbit:listener ref="topicWarnReceiver" queue-names="myTopicQueue_warn"/>
</rabbit:listener-container>
 
<bean id="directReceiver" class="com.chensr.until.rabbitmq.springRabbitmq.DirectReceiver"/>
<bean id="fanoutReceiver" class="com.chensr.until.rabbitmq.springRabbitmq.FanoutReceiver"/>
<bean id="topicErrorReceiver" class="com.chensr.until.rabbitmq.springRabbitmq.TopicErrorReceiver"/>
<bean id="topicWarnReceiver" class="com.chensr.until.rabbitmq.springRabbitmq.TopicWarnReceiver"/>

注意:

    acknowledge="manual" 表示手工应答,如果值为auto则表示自动应答
    DirectReceiver、FanoutReceiver写法与TopicErrorReceiver 类似,这里不重复

 

3、测试

    从上面的例子可以看出,接收者跟所谓的模式没有关系,它只跟绑定的队列有关队列有数据即进行接受。至于队列的数据是通过什么模式得到的,都与接收者无关。

 

四、confirm-callback监听(用于监听exchange是否接收成功)

1、在配置工厂连接的时候,设置publisher-confirms="true"

<!--连接工厂-->
<rabbit:connection-factory id="rabbitConnectionFactory" host="127.0.0.1" port="5672"
                           username="test" password="test"  virtual-host="/test"
                           channel-cache-size="50" publisher-confirms="true"/>

 

2、在定义rabbitmq模板时,指定confirm-callback的实现类

<!-- 定义rabbitmq模板,指定连接工厂、exchange、queue等 -->
<rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" exchange="myDirectExchange"
                 confirm-callback="confirmCallback" />


3、创建实现类ConfirmCallback,实现RabbitTemplate.ConfirmCallback接口

public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
    /**
     * CorrelationData 是在发送消息时传入回调方法的参数,可以用于区分消息对象。 CorrelationData对象中只有一个属性 String id。
     * 通过这个参数,我们可以区分当前是发送哪一条消息时的回调,并通过ack参数来进行失败重发功能
     * 
     * @param correlationData 回调的相关数据.
     * @param ack true for ack, false for nack
     * @param cause 专门给NACK准备的一个可选的原因,其他情况为null。
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("********************************************************");
        System.out.println("exChange确认"+ ack +"   "+cause);
        System.out.println("********************************************************");
    }
}


4、在配置文件中定义confirmCallback

<bean id="confirmCallback" class="com.chensr.until.rabbitmq.springRabbitmq.ConfirmCallback"/>


5、测试:执行SpringSender.sendMessage("myTopicExchange","error","queue message fanout" );

    不管发送成功与否都会执行这个方法,只有在配置中找不到exchange,ack才会是false。

    发送数据是exchange为空,或者不填的时候,ack都为true即调用rabbitTemplate.convertAndSend(“”,routingKey,message)或者rabbitTemplate.convertAndSend(routingKey,message),ack都是true。

 

五、confirm-callback监听(basicpublish推送消息到queue失败时回调)

1、在定义rabbitmq模板时,指定return-callback的实现类,并且设置mandatory="true"

<!-- 定义rabbitmq模板,指定连接工厂、exchange、queue等 -->
<rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory"
                 confirm-callback="confirmCallback" return-callback="returnCallback" mandatory="true"/>

    注:mandatory为true表示推送消息到queue失败时调用return-callback

 

2、创建实现类ReturnCallback,实现RabbitTemplate.ReturnCallback接口

public class ReturnCallback implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("********************************************************");
        System.out.println("失败确认:"+message+" | "+replyCode+" | "+replyText+" | "+exchange+" | "+routingKey);
        System.out.println("********************************************************");
    }
}


3、在配置文件中定义returnedMessage

<bean id="returnCallback" class="com.chensr.until.rabbitmq.springRabbitmq.ReturnCallback"/>


4、测试:执行SpringSender.sendMessage("myTopicExchange","error123","queue message" );,

    由于routing-key没匹配到对应的队列,所以控制台打印报错信息。

    如果执行SpringSender.sendMessage("","error123","queue message" );由于没有找到对应的队列“error123”,所以调用ReturnCallback.returnedMessage方法,confirm-callback监听ack返回false,不管是否匹配到队列,都不会执行ReturnCallback.returnedMessage方法。

    因为exchange接收数据是否,此时还没走到推送数据到队列这一步,所以不会以失败处理。

 

六、FastJsonMessageConverter转换类(可以将map自动转成json格式)

1、添加maven依赖

<dependency>
  <groupId>org.codehaus.jackson</groupId>
  <artifactId>jackson-mapper-asl</artifactId>
  <version>1.9.13</version>
</dependency>


2、在定义rabbitmq模板时,指定转换器message-converter="jsonMessageConverter"

<!-- 定义rabbitmq模板,指定连接工厂、exchange、queue等 -->
<rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" message-converter="jsonMessageConverter"
                 confirm-callback="confirmCallback" return-callback="returnCallback" mandatory="true"/>


3、配置bean(也可以重写)

<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>


4、测试:(将map转成json格式)

public class SpringSender {
    public static void sendMessage(String exchange,String routingKey,Object  message){
        //加载配置文件
        AbstractApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring-rabbitmq.xml");
        //获取rabbitmqTemplate模板
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        //发送消息
        rabbitTemplate.convertAndSend(exchange,routingKey,message);
    }
 
    public static void main(String[] args) throws Exception{
        HashMap<String ,Object> map = new HashMap<String, Object>();
        map.put("message","queue message fanout");
        SpringSender.sendMessage("myFanoutExchange","myFanoutQueue",map);
    }
}

 

输出:

********************************************************

发布/订阅 接收信息:{"message":"queue message fanout"}

********************************************************

注:如果发生的消息是字符串,接收到的信息为字符串

 

七、完整配置文件spring-rabbitmq.xml内容

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                           http://www.springframework.org/schema/beans/spring-beans.xsd
                           http://www.springframework.org/schema/rabbit
                           http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
 
    <!-- 引入rabbitmq配置文件 -->
    <bean id="configProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="locations">
            <list>
                <value>classpath:conf/rabbitmq.properties</value>
            </list>
        </property>
    </bean>
 
    <!--连接工厂-->
    <rabbit:connection-factory id="rabbitConnectionFactory" host="127.0.0.1" port="5672"
                               username="test" password="test"  virtual-host="/test"
                               channel-cache-size="50" publisher-confirms="true"/>
 
    <!-- 定义admin,producer中的exchange,queue会自动的利用该admin自动在spring中生成 -->
    <rabbit:admin connection-factory="rabbitConnectionFactory"/>
 
    <!-- 定义rabbitmq模板,指定连接工厂、exchange、queue等 -->
    <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" message-converter="jsonMessageConverter"
                     confirm-callback="confirmCallback" return-callback="returnCallback" mandatory="true"/>
 
 
    <!-- 队列声明 :
         durable:true、false true:在服务器重启时,能够存活
         exclusive :当连接关闭后是否自动删除队列;是否私有队列,如果私有,其他通道不能访问当前队列
         autodelete:当没有任何消费者使用时,自动删除该队列 -->
    <!-- 用于发布/订阅模式的队列 -->
    <rabbit:queue name="myFanoutQueue" durable="true" exclusive="false" auto-delete="false"/>
    <!-- 用于路由模式的队列 -->
    <rabbit:queue name="myDirectQueue" durable="true" exclusive="false" auto-delete="false"/>
    <!-- 用于主题模式的队列 -->
    <rabbit:queue name="myTopicQueue_error"  durable="true" exclusive="false" auto-delete="false"/>
    <rabbit:queue name="myTopicQueue_warn"  durable="true" exclusive="false" auto-delete="false"/>
 
 
    <!-- 定义交互机 发布/订阅模式 -->
    <rabbit:fanout-exchange name="myFanoutExchange" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="myFanoutQueue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:fanout-exchange>
 
    <!-- 定义交互机 路由模式(需要routing-key) -->
    <rabbit:direct-exchange name="myDirectExchange" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="myDirectQueue" key="error"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>
 
 
    <!-- 定义交互机 主题模式 ;-->
    <rabbit:topic-exchange name="myTopicExchange" durable="true" auto-delete="false">
        <rabbit:bindings>
           <rabbit:binding queue="myTopicQueue_error" pattern="error.#" ></rabbit:binding>
            <rabbit:binding queue="myTopicQueue_error" pattern="warn.#" ></rabbit:binding>
           <rabbit:binding queue="myTopicQueue_warn" pattern="warn.*"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
 
 
    <!-- 定义监听 -->
    <rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="manual" >
        <!-- 发布订阅模式监听-->
        <rabbit:listener ref="fanoutReceiver"   queue-names="myFanoutQueue" />
        <!-- 路由模式监听 -->
        <rabbit:listener ref="directReceiver"  queue-names="myDirectQueue"/>
        <!-- 主题模式监听-->
        <rabbit:listener ref="topicErrorReceiver" queue-names="myTopicQueue_error"/>
        <rabbit:listener ref="topicWarnReceiver" queue-names="myTopicQueue_warn"/>
    </rabbit:listener-container>
 
    <!--接收者实现类-->
    <bean id="directReceiver" class="com.chensr.until.rabbitmq.springRabbitmq.DirectReceiver"/>
    <bean id="fanoutReceiver" class="com.chensr.until.rabbitmq.springRabbitmq.FanoutReceiver"/>
    <bean id="topicErrorReceiver" class="com.chensr.until.rabbitmq.springRabbitmq.TopicErrorReceiver"/>
    <bean id="topicWarnReceiver" class="com.chensr.until.rabbitmq.springRabbitmq.TopicWarnReceiver"/>
 
    <!--confirmCallback回调-->
    <bean id="confirmCallback" class="com.chensr.until.rabbitmq.springRabbitmq.ConfirmCallback"/>
    <!--returnCallback回调-->
    <bean id="returnCallback" class="com.chensr.until.rabbitmq.springRabbitmq.ReturnCallback"/>
    <!--消息转换器,转成json格式-->
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>
</beans>

    注:由于接收者都是继承ChannelAwareMessageListener,实现onMessage方法,所以这里不提供相应的代码。

展开阅读全文
打赏
0 评论
4 收藏
0
分享
返回顶部
顶部