rocketmq与Spring整合

原创
2017/08/01 22:58
阅读数 527

Spring与rocketmq整合包括生产者和消费者 完全纳入Spring容器

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cache="http://www.springframework.org/schema/cache"
	xsi:schemaLocation="http://www.springframework.org/schema/beans    
                        http://www.springframework.org/schema/beans/spring-beans-4.3.xsd  
                        http://www.springframework.org/schema/cache 
                        http://www.springframework.org/schema/cache/spring-cache-4.3.xsd">
                        
	<!-- 普通消息生产者 -->
	<bean id="defaultMQProducer" class="com.alibaba.rocketmq.client.producer.DefaultMQProducer" init-method="start" destroy-method="shutdown"  scope="singleton">
		<constructor-arg name="producerGroup" value="${defaulGroupName}" />
		<property name="namesrvAddr" value="${defaulNamesrvAddr}" />
		<property name="retryTimesWhenSendFailed" value="${defaulSendFailed}" />
	</bean>
	
	<!-- 事物消息生产者 -->
	<bean id="transactionCheckListener" class="com.base.system.mq.TransactionCheckListener"/>
	<bean id="transactionMQProducer" class="com.alibaba.rocketmq.client.producer.TransactionMQProducer" init-method="start" destroy-method="shutdown"  scope="singleton">
		<constructor-arg name="producerGroup" value="${transactionGroupName}" />
		<property name="namesrvAddr" value="${transactionNamesrvAddr}" />
		<property name="retryTimesWhenSendFailed" value="${transactionSendFailed}" />
		<property name="transactionCheckListener" ref="transactionCheckListener" />
	</bean>

	<!-- 消息消费者 -->
    <!-- 枚举变量Spring 的注入方式 -->
	<bean id="consumeFromWhere" class="org.springframework.beans.factory.config.FieldRetrievingFactoryBean">    
        <property name="staticField" value="com.alibaba.rocketmq.common.consumer.ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET" />    
    </bean>
    <bean id="listener" class="com.base.system.mq.MessageListenerConcurrently"/>
    <!-- 继承 DefaultMQPushConsumer方便Spring使用属性方式注入参数-->
	<bean id="mQConsumer" class="com.base.system.mq.MQConsumer" init-method="init" destroy-method="shutdown"  scope="singleton">
		<constructor-arg name="consumerGroup" value="${consumerGroup}" />
		<property name="consumeFromWhere" ref="consumeFromWhere" />
		<property name="namesrvAddr" value="${consumerNamesrvAddr}" />
		<property name="topic" value="${topic}" />
		<property name="subExpression" value="${subExpression}" />
		<property name="consumeThreadMin" value="${consumeThreadMin}" />
		<property name="consumeThreadMax" value="${consumeThreadMax}" />
		<property name="messageListener" ref="listener" />
	</bean>
</beans>

<!--TransactionCheckListener  rocketmq事务回查已被阉割,但是不设置会报错。 -->
package com.base.system.mq;
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.common.message.MessageExt;
public class TransactionCheckListener implements com.alibaba.rocketmq.client.producer.TransactionCheckListener{
	public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
		System.err.println("Check localTransaction......");
		return LocalTransactionState.COMMIT_MESSAGE;
	}
}

<!--一般消息监听器 在这里做 收到消息的处理  -->
package com.base.system.mq;
import java.util.List;
import javax.annotation.Resource;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.base.system.entity.User;
import com.base.system.service.IUserService;
public class MessageListenerConcurrently implements com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently {
   @ Resource
	private IUserService iUserService;
	
	public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
		MessageExt messageExt = msgs.get(0);
		String account=new String(messageExt.getBody());
		User user = iUserService.findByLoginName(account);
		System.out.println(user);
		return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
	}
}


<!--消息消费者 -->
package com.base.system.mq;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.MessageListener;
import com.alibaba.rocketmq.client.exception.MQClientException;
public class MQConsumer extends DefaultMQPushConsumer {

	private String topic;

	private String subExpression;

	private MessageListener messageListener;

	public MQConsumer(String consumerGroup) {
		super(consumerGroup);
	}

	public String getSubExpression() {
		return subExpression;
	}

	public void setSubExpression(String subExpression) {
		this.subExpression = subExpression;
	}

	public String getTopic() {
		return topic;
	}

	public void setTopic(String topic) {
		this.topic = topic;
	}

	public MessageListener getMessageListener() {
		return messageListener;
	}

	public void setMessageListener(MessageListener messageListener) {
		this.messageListener = messageListener;
	}

	public void init() throws MQClientException {
		this.setNamesrvAddr(this.getNamesrvAddr());
		this.subscribe(topic, subExpression);
		this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
		this.start();
	}
}


<!--rocketmq 配置参数  -->
# mqnamesrv.exe  mqbroker -n 127.0.0.1:9876

defaulGroupName=defaulGroupName
defaulNamesrvAddr=127.0.0.1:9876
defaulSendFailed=10

transactionGroupName=transactionGroupName
transactionNamesrvAddr=127.0.0.1:9876
transactionSendFailed=10

consumerGroup=consumerGroup
consumerNamesrvAddr=127.0.0.1:9876
topic=Topic
subExpression=*
consumeThreadMax=10
consumeThreadMin=2

参数根据自己需求注入,若不好实现,可采用继承使用属性方式注入如文中:MQConsumer 主要是解决 DefaultMQPushConsumer.subscribe(topic, subExpression)参数的问题。

余下DefaultMQPullConsumer 暂时没有用到不做笔记,但整合原理一致。

pom:

<!-- RocketMQ -->
<dependency>
	<groupId>com.alibaba.rocketmq</groupId>
	<artifactId>rocketmq-all</artifactId>
	<version>3.2.6</version>
	<type>pom</type>
</dependency>
<dependency>
	<groupId>com.alibaba.rocketmq</groupId>
	<artifactId>rocketmq-client</artifactId>
	<version>3.2.6</version>
</dependency>

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.0.Final</version>
</dependency>    
展开阅读全文
打赏
0
3 收藏
分享
加载中
更多评论
打赏
0 评论
3 收藏
0
分享
返回顶部
顶部