文档章节

HornetQ集群配置及spring集成示例

langke
 langke
发布于 2016/08/03 17:19
字数 1934
阅读 40
收藏 0

HornetQ是一个支持集群和多种协议,可嵌入、高性能的异步消息系统。HornetQ完全支持JMS,HornetQ不但支持JMS1.1 API同时也定义属于自己的消息API,这可以最大限度的提升HornetQ的性能和灵活性。还支持RESTful API、STOMP(Stomp的客户端可以用多种编程语言来实现 

)、AMQP(HornetQ will shortly be implementing AMQP )。

  • HornetQ拥有超高的性能,HornetQ在持久化消息方面的性能可以轻易的超于其它常见的非持久化消息引擎的性能。当然,HornetQ的非持久化消息的性能会表现的更好!
  • HornetQ完全使用POJO,纯POJO的设计让HornetQ可以尽可能少的以来第三方的包。从设计模式来说,HornetQ这样的设计入侵性也最小。HornetQ既可以独立运行,也可以与其它Java应用程序服务器集成使用。
  • HornetQ拥有完善的错误处理机制,HornetQ提供服务器复制和故障自动转移功能,该功能可以消除消息丢失或多个重复信息导致服务器出错。
  • HornetQ提供了灵活的集群功能,通过创建HornetQ集群,您可以享受到到消息的负载均衡带来的性能提升。您也可以通过集群,组成一个全球性的消息网络。您也可以灵活的配置消息路由。
  • HornetQ拥有强大的管理功能。HornetQ提供了大量的管理API和监控服务器。它可以无缝的与应用程序服务器整合,并共同工作在一个HA环境中。

用途:松散地联系各系统,不用受其它服务器的制约,有效的减少线程Block的时间. 不同于RPC , 采用的Request/Reponse 的方式. 

 

hornetq支持内容Body 
Stream -- StreamMessage 包含顺序读取值的流  
Text -- TextMessage) 
Map -- MapMessage (key/value)) 
Object --  ObjectMessage  Support Serializable序列化的对象. 
Bytes --  BytesMessage  字节信息(如存放图像)  

 

下载:wget http://downloads.jboss.org/hornetq/hornetq-2.2.14.Final.zip

 

yum install libaio

 

中文文档: http://www.jboss.org/hornetq/chinesedocs.html

 

1.单机配置:

 

1.1编写启动脚本:start.sh

 

IP=`/sbin/ip a |grep 'inet '|awk -F'/' '{print $1}'|awk '{print $2}'|grep -v 127.0.0.1|head -1` export CLUSTER_PROPS="-Dhornetq.remoting.netty.host=$IP -Djnp.host=$IP"
echo $CLUSTER_PROPS sh run.sh &

 

 

1.2或者修改配置文件

 

以下两个文件把localhost替换为本机IP

 

config/stand-alone/non-clustered/hornetq-configuration.xml  

 

config/stand-alone/non-clustered/hornetq-beans.xml 

 

bindAddress">${jnp.host:192.168.100.241}

 

rmiBindAddress">${jnp.host:192.168.100.241}

 

${hornetq.remoting.netty.host:192.168.100.241 }

 

....

 

1.3客户端需要的包

 

hornetq-core-client.jar

 

netty.jar

 

hornetq-jms-client.jar

 

jboss-jms-api.jar

 

 jnp-client.jar

 

 

 

1.4配置一个队列,添加配置onfig/stand-alone/non-clustered/hornetq-jms.xml 

 

 <queue name="OrderQueue">

 

        <entry name="queues/OrderQueue"/>

 

    </queue>

 

配置一个主题

 

<topic name="topic1">

 

  <entry name="/my/Topic1"/>

 

</topic>

 

hornetq-configuration.xml

 

在<configuration>节点下增加

 

<security-enabled>false</security-enabled> 

 

1.5收发消息demo

public void sendToQueue(String destinationName,Serializable payload) throws Exception { InitialContext ic = new InitialContext(); ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory"); Queue queue = (Queue)ic.lookup(destinationName); Connection connection = cf.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer publisher = session.createProducer(queue); connection.start(); ObjectMessage message = session.createObjectMessage(payload); message.setObject(payload); publisher.send(message); if (connection != null) { connection.close(); } } @TransactionAttribute(value = TransactionAttributeType.REQUIRED) public void onMessage(Message message) { ObjectMessage obj = (ObjectMessage) message; try { Serializable ser = obj.getObject(); log.info("[NotificationInbound] onMessage!"); } catch (Exception e) { log.error("[NotificationInbound] ERROR[" + e.getMessage() + "]!!!****"); throw new IllegalStateException(); } }

2 集群配置

2.1单机集群启动脚本

start-cluster0.bat

set CLUSTER_PROPS=-Ddata.dir=../data-server2 -Djnp.port=2099 -Djnp.rmiPort=2098 -Dhornetq.remoting.netty.port=6445
run   ../config/stand-alone/clustered 
start-cluster1.bat
set CLUSTER_PROPS=-Ddata.dir=../data-server3 -Djnp.port=3099 -Djnp.rmiPort=3098 -Dhornetq.remoting.netty.port=7445
run   ../config/stand-alone/clustered

 

2.2集群节点启动脚本

start-node.sh

IP=`/sbin/ip a |grep 'inet '|awk -F'/' '{print $1}'|awk '{print $2}'|grep -v 127.0.0.1|head -1` export CLUSTER_PROPS="-Dhornetq.remoting.netty.host=$IP -Djnp.host=$IP"
echo $CLUSTER_PROPS sh run.sh   ../config/stand-alone/clustered

 

2.2.1集群节点停止脚本

stop-node.sh

sh stop.sh ../config/stand-alone/clustered

 

2.3 .集群配置说明

2.3.1集群发现使用udp协议进行组播

hornetq-configuration.xml

<discovery-groups>
<discovery-group name="my-discovery-group">
<local-bind-address>172.16.9.7</local-bind-address>
<group-address>231.7.7.7</group-address>
<group-port>9876</group-port>
<refresh-timeout>10000</refresh-timeout>
</discovery-group>
</discovery-groups>
<connection-factory name="ConnectionFactory">
<discovery-group-ref discovery-group-name="my-discovery-group"/>
<entries>
<entry name="/ConnectionFactory"/>
</entries>
</connection-factory>

 

2.3.2客户端连接代码 :

final String groupAddress = "231.7.7.7"; final int groupPort = 9876; ConnectionFactory jmsConnectionFactory = HornetQJMSClient.createConnectionFactory(groupAddress, groupPort); Connection jmsConnection1 = jmsConnectionFactory.createConnection(); Connection jmsConnection2 = jmsConnectionFactory.createConnection();

 

 

2.3.3Server Side load balancing

hornetq-configuration.xml

<cluster-connections>
<cluster-connection name="my-cluster">
<address>jms</address>
<retry-interval>500</retry-interval>
<use-duplicate-detection>true</use-duplicate-detection>
<forward-when-no-consumers>false</forward-when-no-consumers>
<max-hops>1</max-hops>
<discovery-group-ref discovery-group-name="my-discovery-group"/>
</cluster-connection>
</cluster-connections>

 

 

2.3.4Client Side load balancing

hornetq-jms.xml

<connection-factory name="ConnectionFactory">
<discovery-group-ref discovery-group-name="my-discovery-group"/>
<entries>
<entry name="/ConnectionFactory"/>
</entries>
<ha>true</ha>
<connection-load-balancing-policy-class-name> org.hornetq.api.core.client.loadbalance.RandomConnectionLoadBalancingPolicy </connection-load-balancing-policy-class-name>
</connection-factory>

3.与spring集成示例

3.1spring配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:lang="http://www.springframework.org/schema/lang" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">


      <bean id="messageTopic" class="org.hornetq.api.jms.HornetQJMSClient" factory-method="createTopic">
              <constructor-arg value="topic1" />
      </bean>

    <bean id="searchAddMessageQueue" class="org.hornetq.api.jms.HornetQJMSClient" factory-method="createQueue">
        <constructor-arg value="ExpiryQueue"></constructor-arg>
    </bean>
    <!-- <bean id="transportConfiguration" class="org.hornetq.api.core.TransportConfiguration"> <constructor-arg value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory" /> <constructor-arg> <map key-type="java.lang.String" value-type="java.lang.Object"> <entry key="host" value="localhost"></entry> <entry key="port" value="5445"></entry> </map> </constructor-arg> </bean> -->
      <bean id="transportConfiguration" class="org.hornetq.api.core.DiscoveryGroupConfiguration">
              <constructor-arg name="groupAddress" value="231.7.7.7" />
              <constructor-arg name="groupPort" value="9876">
              </constructor-arg>
      </bean>

      <bean id="connectionFactory" class="org.hornetq.api.jms.HornetQJMSClient" factory-method="createConnectionFactoryWithHA" destroy-method="close">
              <constructor-arg type="org.hornetq.api.jms.JMSFactoryType" value="CF" />
              <constructor-arg ref="transportConfiguration" />
      </bean>

      <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
              <property name="connectionFactory" ref="connectionFactory" />
              <property name="pubSubDomain" value="true" />      
      </bean>

      <bean id="topicService" class="org.langke.hornetq.ClientServiceImpl">
              <property name="jmsTemplate" ref="jmsTemplate" />
              <property name="topic" ref="messageTopic" />
      </bean>
      

    <bean id="sendMessageService" class="org.langke.hornetq.SendMessageServiceImpl">
        <property name="jmsTemplate" ref="jmsTemplate"></property>
        <property name="searchAddMessageQueue" ref="searchAddMessageQueue"></property>
    </bean>
    
    <!-- this is the Message Driven POJO (MDP) <bean id="messageListener" class="org.langke.hornetq.MessageListenerImpl"> </bean> -->
    <bean id="receiveMessageListener" class="org.langke.hornetq.ReceiveMessageListenerImpl"></bean>

    <!-- and this is the message listener container -->
    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <!-- <property name="destination" ref="messageTopic" /> -->
        <property name="destination" ref="searchAddMessageQueue"></property>
        <property name="messageListener" ref="receiveMessageListener" />
    </bean>

</beans>
 
package org.langke.common.hornetq; public interface MessageService { public boolean sendMessage(SerializableObject message) ; }

3.2发送消息

package org.langke.common.hornetq; import java.io.Serializable; public class SerializableObject implements Serializable{ /** * */
    private static final long serialVersionUID = 1L; private Object obj ; private Boolean isRetry = true; public Object getObj() { return obj; } public void setObj(Object obj) { this.obj = obj; } public Boolean getIsRetry() { return isRetry; } public void setIsRetry(Boolean isRetry) { this.isRetry = isRetry; } }
package org.langke.common.hornetq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.Session; import org.apache.log4j.Logger; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class SendMessageServiceImpl implements MessageService { private static final Logger logger = Logger.getLogger(SendMessageServiceImpl.class); private JmsTemplate jmsTemplate; private Queue searchAddMessageQueue; @Override public boolean sendMessage(SerializableObject message) { return sendQueue(message); } private boolean sendQueue(final SerializableObject so) { try { logger.info("start to send queue to " + searchAddMessageQueue.getQueueName() + ", message : " + so); jmsTemplate.send(searchAddMessageQueue, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { ObjectMessage om = session.createObjectMessage(so); return om; } }); return true; } catch (Exception e) { logger.error("Error: send topic failure:" + e.getMessage(), e); return false; } } public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public Queue getSearchAddMessageQueue() { return searchAddMessageQueue; } public void setSearchAddMessageQueue(Queue searchAddMessageQueue) { this.searchAddMessageQueue = searchAddMessageQueue; } }

3.3接收消息

package org.langke.common.hornetq; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import org.apache.log4j.Logger; public class ReceiveMessageListenerImpl implements MessageListener { private AtomicInteger count = new AtomicInteger(0); private static Logger logger = Logger.getLogger(ReceiveMessageListenerImpl.class); @Override public void onMessage(Message message) { try{ if(message instanceof ObjectMessage){ ObjectMessage objectMessage = (ObjectMessage)message; if(objectMessage.getObject() instanceof SerializableObject){ SerializableObject so = (SerializableObject) objectMessage.getObject(); logger.info(so.getObj()); }else{ logger.info(objectMessage); } }else{ System.out.println(message); } } catch (JMSException e) { logger.error( "Error: receive message from topic failure: "
                            + e.getMessage(), e); }finally{ System.out.println(count.incrementAndGet()); } } }

3.4调用示例

package org.langke.common.hornetq; import java.io.File; import java.util.HashMap; import java.util.Map; import org.springframework.context.ApplicationContext; import org.springframework.context.support.FileSystemXmlApplicationContext; public class Test { private static ApplicationContext ctx; private static Test instance=new Test(); public  static Test getInstance(){ return instance; } private Test() { if(ctx == null) { String location = null; if(System.getProperty("os.name").toLowerCase().contains("windows")){ location = "conf/applicationContext.xml"; }else{ location = "../conf/applicationContext.xml"; } File file = new File(location); ctx = new FileSystemXmlApplicationContext(location); } } /** * @param args */
    public static void main(String[] args) { getInstance(); MessageService service = ctx.getBean("sendMessageService", MessageService.class); for(int i=0;i<3000;i++){ Map map = new HashMap(); map.put("ooxx", i); SerializableObject so = new SerializableObject(); so.setObj(map); service.sendMessage(so); } } }

 

4.其它功能

4.1Message expire

HornetQ will not deliver a message to a consumer after it's time to

live has been exceeded.

If the message hasn't been delivered before the time to live is

reached, the server can discard it.

// message will expire in 5000ms from now

message.setExpiration(System.currentTimeMillis() + 5000);

Expiry-address

<!-- expired messages in exampleQueue will be sent to the expiry

address expiryQueue -->

<address-setting match="jms.queue.exampleQueue">

<expiry-address>jms.queue.expiryQueue</expiry-address>

</address-setting>

4.2

Scheduled messages

TextMessage message = session.createTextMessage("MSG");

message.setLongProperty("_HQ_SCHED_DELIVERY", System.currentTimeMillis() + 5000);

producer.send(message);

...

// message will not be received immediately but 5 seconds later

TextMessage messageReceived = (TextMessage) consumer.receive();

4.3Message group

Message groups are sets of messages that have the following characteristics:

• Messages in a message group share the same group id; that is, they have the same group

identifier property (JMSXGroupID for JMS, _HQ_GROUP_ID for HornetQ Core API).

• Messages in a message group are always consumed by the same consumer, even if there

are many consumers on a queue. They pin all messages with the same group id to the same

consumer.

If that consumer closes another consumer is chosen and will receive all messages with the

samegroup id.

Based on message

Message message = ...

message.setStringProperty("JMSXGroupID", "Group-0");

producer.send(message);

message = ...

message.setStringProperty("JMSXGroupID", "Group-0");

producer.send(message);

Based on connection factory...

<connection-factory name="ConnectionFactory">

<connectors>

<connector-ref connector-name="netty-connector"/>

</connectors>

<entries>

<entry name="ConnectionFactory"/>

</entries>

<group-id>Group-0</group-id>

</connection-factory>

 

 

© 著作权归作者所有

langke
粉丝 1
博文 70
码字总数 3645
作品 0
架构师
私信 提问
HornetQ 2.3.0.Final 发布,异步消息系统

HornetQ 2.3.0.Final 发布了,包含新特性有:复制、多备份、多故障转移、回滚和 stomp 1.2 支持等等。 HornetQ是一个支持集群和多种协议,可嵌入、高性能的异步消息系统。HornetQ完全支持JMS...

红薯
2013/04/26
1K
2
HornetQ 2.3 Beta2 发布,异步消息系统

HornetQ 2.3 Beta2 发布了,该版本修复了复制方面的一些bug,这将是最后一个 Beta 版本,正式版的发布计划在明年的早些时候。 HornetQ是一个支持集群和多种协议,可嵌入、高性能的异步消息系...

红薯
2012/11/28
881
1
jboss eap 的 domain 形式自动关闭

版本:jboss eap 6.4 我在linux上使用jboss eap的domain形式来搭建server集群,由于资源有限,只启动了一个domain服务,即只有master,没有slave。修改了host.xml文件的ip之后,启动了domain....

jboss_小白
2017/07/31
65
1
JBoss 系列五十五:JBoss 7/WildFly 集群之 HornetQ Messaging - III(示例补充说明)

概述 如下两个链接为JBoss 7/WildFly HornetQ Messaging 集群示例中用到的配置文件 https://github.com/kylinsoong/cluster/blob/master/demo/jms/standalone-full-ha-node1.xml https://gi......

无鸯
2014/02/04
0
0
异步消息 HornetQ 2.0.0.CR1发布

HornetQ是一个支持集群和多种协议,可嵌入、高性能的异步消息系统。HornetQ完全支持JMS,HornetQ不但支持JMS1.1 API同时也定义属于自己的消息API,这可以最大限度的提升HornetQ的性能和灵活性...

红薯
2009/12/08
268
0

没有更多内容

加载失败,请刷新页面

加载更多

编写高质量代码:改善Java程序的151个建议(第5章:数组和集合___建议70~74)

本节内容有些吹毛求疵、晦涩难懂! 建议70:子列表只是原列表的一个视图 List接口提供了subList方法,其作用是返回一个列表的子列表,这与String类subSting有点类似。 注意:subList产生的列...

青衣霓裳
8分钟前
0
0
一文入门Drools规则引擎

Drools是一款基于Java的优秀的规则引擎,将复杂多变的业务规则从硬编码中解放出来,以规则脚本的形式存放在文件中,使得修改规则不需要改动代码重启服务就能生效。 Drools有一定的学习成本,...

阿提说说
12分钟前
0
0
企业数据化转型,大数据分析,业务中台,不可缺少的大数据可视化工具

全球经济发展进入了数字化转型时期。每个企业都希望赶上这趟转型的列车。为什么大家都如此看重数字化转型?因为这是一个生存问题,企业必须这样做。 数字化转型涉及企业管理的改变,不同的思...

NBI大数据可视化
21分钟前
0
0
时尚电商新赛道:揭秘 FashionAI 技术

阿里妹导读:雷音是阿里巴巴研究员、淘系技术部 FashionAI 负责人,在淘系技术嘉年华硅谷站,他分享了《时尚电商新赛道— FashionAI 中的技术》 ,旨在揭秘:从面向机器学习的知识重建切入,...

阿里云云栖社区
27分钟前
1
0
Java基础-Queue

Queue实现 1、没有实现阻塞接口的 LinkedList:实现了java.util.Queue接口和java.util.AbstractQueue接口   内置的不阻塞队列:PriorityQueue和ConcurrentLinkedQueue   PriorityQueue和...

MyOldTime
57分钟前
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部