zookeeper+activemq+levelDB集群消息中间件搭建
zookeeper+activemq+levelDB集群消息中间件搭建
王华超 发表于1年前
zookeeper+activemq+levelDB集群消息中间件搭建
  • 发表于 1年前
  • 阅读 18
  • 收藏 0
  • 点赞 0
  • 评论 0

腾讯云 技术升级10大核心产品年终让利>>>   

方法/步骤

  1. 下载activemq 和zookeeper,levelDB会activemq自带有,所以不需要下载,把下载好的压缩包发送到所有的linux上,(我这里是把下载好的压缩包放在当前window下的tomcat,root目录下,可在linux终端上使用命令:curl -o activeMq.tar.gz http://10.0.10.37:8080/apache-linux-activemq-5.11.1-bin.tar.gz把activemq下载下来)关键就是配置好zookeeper集群和activemq集群

  2. 登录linux系统,在某个目录下新建文件activemq,使用curl -o activeMQ.tar.gz  http://10.0.10.37:8080/apache-linux-activemq-5.11.1-bin.tar.gz 下载activeMQ.tar.gz,相应的把zookeeper.tar.gz下载到activemq目录下,分别解压这两个文件放在activeMQ文件夹下,和zookeeper文件夹下。

  3. 当linux系统的所有的压缩包都解压好了,所有的文件都准备好了, zookeeper比如在目录/home/activemq/zookeeper目录下,activemq在/home/activemq/activemq目录下,那么我们先来配置zookeeper,在zookeeper/conf目录下找到zoo_sample.cfg文件,使用命令cp  -f zoo_sample.cfg zoo.cfg 复制成zoo.cfg文件,使用命令vi zoo.cfg  再按键盘I,进入编辑状态,修改zoo.cfg内容为

    # The number of milliseconds of each tick

    tickTime=2000

    # The number of ticks that the initial

    # synchronization phase can take

    initLimit=10

    # The number of ticks that can pass between

    # sending a request and getting an acknowledgement

    syncLimit=5

    # the directory where the snapshot is stored.

    # the port at which the clients will connect

    clientPort=2181

    # the directory where the snapshot is stored.

    dataDir=/home/ activemq/zookeeper/data/data

    # the port at which the clients will connect

    dataLogDir=/home/activemq/zookeeper/data/log

    # add diffenrent server

    这里有dataDir=/home/ activemq/zookeeper/data/data和

    dataLogDir=/home/activemq/zookeeper/data/log

    如果没有这些目录,我们需要新建这些目录

             建好目录之后,在dataDir目录下,新建myid文件,使用命令:touch  myid 新建文件,内容为当前机子ip所对应的zoo.cfg中server.x中的x,比如:按照上面配置zoo.cfg当前电脑ip为10.0.88.12,则myid的内容就为3,这里注意:myid文件中不能留任何空格,空行,zoo.cfg文件中的注释#与注释内容之间需要加一个空格,不是注释的,前面不能留空格和不能重写相同的变量。

  4. 上面配置完之后,需要配置一下环境变量,方便操作,先使用命令才cd /进入根目录,使用root权限,使用vi /etc/profile,编辑profile文件,在PATH面前添加配置如下:

    export ZOOKEEPER=/home/activemq/zookeeper

    export PATH=$PATH:$ZOOKEEPER/bin:$ZOOKEEPER/conf

    这里注意conf目录也要添加进来,要不可能会报错的

    配置好之后,使用命令:source  /etc/profile 使配置生效。

  5. 配置好上述之后,接下来就是测试了

    使用zkServer.sh start 命令启动zookeeper,中途可能会出现一些错误,当出现错误或者查看zkServer.sh status命令的时候,不正常,可以查看dataDir或者dataLogDir目录下的zookeeper.out文件,里面有相应的启动记录,启动每个电脑的zookeeper服务器时,使用zkServer.sh status 查看状态,如果如图有出现红圈中的mode字样说明配置成功了:

    zookeeper+activemq+levelDB集群消息中间件搭建

  6. 在配置过程中,我遇到的问题,有端口被占用的,弄了好久,后来还是解决了,端口占用的进程名QuorumPeerMain这也是zookeeper进程名称,使用命令

    ps -ef | grep QuorumPeerMain 查出进程ID,使用kill -9  进程ID  结束进程。

    Zookeeper配置大概就这些了

     

    下面我们配置activemq

    配置activemq比较简单

     自从activemq5.9.0开始,activemq的集群实现方式取消了传统的Master-Slave方式,增加了基于zookeeper+leveldb的实现方式,其他两种方式:目录共享和数据库共享依然存在。本文主要阐述基于zookeeper和leveldb搭建activemq集群,这里需要特别提醒,本文实现的集群仅提供主备功能,避免单点故障,没有负载均衡功能。

    在conf目录下找到activemq.xml 把里面内容的

    <kahaDB directory="${activemq.data}/kahadb"/>

    替换成

    <replicatedLevelDB

     

          directory="${activemq.data}/leveldb"

     

          replicas="3"

     

          bind="tcp://0.0.0.0:0"

     

          zkAddress="10.0.88.10:2181,10.0.88.11:2181,10.0.88.12:2181"

     

         <!-- zkPassword="password"-->

     

     hostname="10.0.88.10"

     

          sync="local_disk"

     

          zkPath="/activemq/leveldb-stores"

     

          />

    这里的hostname要写当前机子ip

    接下来配置环境变量,方便操作。由于我的系统是64位的,所以我在profile后新添加一条语句:

    export  $PATH:/home/ activemq /activemq /bin/linux-x86-64

    不同的位数系统,选择不同的activemq,如果是32位的好像是

    export  $PATH:/home/ activemq /activemq /bin/linux-x86-32

    那么配置完成了,使用source  /etc/profile使命令生效

    activemq start 启动

    在这里要先启动zookeeper 在启动activemq

    配置好了,测试一下,使用命令:http://10.0.88.10:8161/admin/index.jsp如果能运行,说明activemq启动成功,如果把10.0.88.10上的activemq关闭了,这个链接还可以访问,说明配置已经成功。

  7. 注意事项:如果activemq启动出错的,我们可以在启动目录下的wrapper.log文件查看哪里出错。

    配置注意事项

    1、  zoo.cfg文档开头和结尾不能留空格

    2、  myid前后不能留换行和空格

    3、  zookeeper 进程名为QuorumPeerMain

    4、  zoo.cfg配置文件写法需要注意,不能重写同一个变量

    activemq在不同权限下可以启动两个不同的activemq服务,如果某个机子上有activemq关闭了,客户端还可以访问,说明还有一个activemq在启动

    优化配置待续:

    修改activemq文件改变内存ACTIVEMQ_OPTS_MEMORY="-Xms256M -Xmx384M -XX:PermSize=256M -XX:MaxPermSize=384M"

    注意的问题:

    1、         通过failover方式进行连接,多个AMQ实例地址使用英文逗号隔开,当某个实例断开时会自动重连,但如果所有实例都失效,failover默认情况下会无限期的等待下去,不会有任何提示

    2、        不要频繁的建立和关闭连接JMS使用长连接方式,一个程序,只要和JMS服务器保持一个连接就可以了,不要频繁的建立和关闭连接。频繁的建立和关闭连接,对程序的性能影响还是很大的。这一点和jdbc还是不太一样的。

    3、      JMS的Connection的start()和stop()方法代价很高,不能经常调用。我们试用的时候,写了个jms的connection pool,每次将connection取出pool时调用start()方法,归还时调用stop()方法,然而后来用jprofiler发现,一般的 cpu时间都耗在了这两个方法上

  8. 代码:activemq.xml

    <!--

        Licensed to the Apache Software Foundation (ASF) under one or more

        contributor license agreements.  See the NOTICE file distributed with

        this work for additional information regarding copyright ownership.

        The ASF licenses this file to You under the Apache License, Version 2.0

        (the "License"); you may not use this file except in compliance with

        the License.  You may obtain a copy of the License at

     

        http://www.apache.org/licenses/LICENSE-2.0

     

        Unless required by applicable law or agreed to in writing, software

        distributed under the License is distributed on an "AS IS" BASIS,

        WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

        See the License for the specific language governing permissions and

        limitations under the License.

    -->

    <!-- START SNIPPET: example -->

    <beans

      xmlns="http://www.springframework.org/schema/beans"

      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

      http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

     

        <!-- Allows us to use system properties as variables in this configuration file -->

        <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">

            <property name="locations">

                <value>file:${activemq.conf}/credentials.properties</value>

            </property>

        </bean>

     

       <!-- Allows accessing the server log -->

        <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"

              lazy-init="false" scope="singleton"

              init-method="start" destroy-method="stop">

        </bean>

     

        <!--

            The <broker> element is used to configure the ActiveMQ broker.

        -->

            

        <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" persistent="true" >

            <destinationPolicy   >

                <policyMap>

                  <policyEntries>

                                 

                    <policyEntry topic=">" topicPrefetch="10000" producerFlowControl="false" useCache="true" >

                        <!-- The constantPendingMessageLimitStrategy is used to prevent

                             slow topic consumers to block producers and affect other consumers

                             by limiting the number of messages that are retained

                             For more information, see:

     

                             http://activemq.apache.org/slow-consumer-handling.html

     

                        -->

                                                   <!-- http://www.csdn123.com/html/exception/704/704841_704836_704834.htm-->

                                                   <pendingSubscriberPolicy> 

                        <vmCursor /> 

                      </pendingSubscriberPolicy>

                                                   <pendingDurableSubscriberPolicy>

                                                   <storeDurableSubscriberCursor/> 

                                                   </pendingDurableSubscriberPolicy> 

                     <!-- <pendingMessageLimitStrategy>

                        <constantPendingMessageLimitStrategy limit="1000"/>

                      </pendingMessageLimitStrategy>-->

                    </policyEntry>

                                         <policyEntry queue=">" producerFlowControl="true" queuePrefetch="10000" memoryLimit="100mb" useCache="true">

                                         <pendingQueuePolicy> 

                        <vmQueueCursor/> 

                      </pendingQueuePolicy>

                                          

                                           <pendingDurableSubscriberPolicy>

                                                   <storeDurableSubscriberCursor/> 

                                                   </pendingDurableSubscriberPolicy> 

     

                                                   <pendingQueuePolicy> 

                                                   <storeCursor /> 

                                         </pendingQueuePolicy> 

     

                                         <slowConsumerStrategy> 

                                <abortSlowConsumerStrategy  abortConnection="false"/><!-- 30second --> 

                                </slowConsumerStrategy>

                                         </policyEntry>

                  </policyEntries>

                </policyMap>

            </destinationPolicy>

    <networkConnectors>

          <networkConnector uri="static:(tcp://10.0.88.10:61616,tcp://10.0.88.11:61616,tcp://10.0.88.12:61616)"/>

    </networkConnectors>

     

     

            <!--

                The managementContext is used to configure how ActiveMQ is exposed in

                JMX. By default, ActiveMQ uses the MBean server that is started by

                the JVM. For more information, see:

     

                http://activemq.apache.org/jmx.html

            -->

            <managementContext>

                <managementContext createConnector="false"/>

            </managementContext>

     

            <!--

                Configure message persistence for the broker. The default persistence

                mechanism is the KahaDB store (identified by the kahaDB tag).

                For more information, see:

     

                http://activemq.apache.org/persistence.html

            -->

            <persistenceAdapter>

              <replicatedLevelDB

     

          directory="${activemq.data}/leveldb"

     

          replicas="3"

     

          bind="tcp://0.0.0.0:0"

     

          zkAddress="10.0.88.10:2181,10.0.88.11:2181,10.0.88.12:2181"

     

          hostname="10.0.88.12"

     

          sync="local_disk"

     

          zkPath="/activemq/leveldb-stores"

     

          />

                               

            </persistenceAdapter>

     

     

              <!--

                The systemUsage controls the maximum amount of space the broker will

                use before disabling caching and/or slowing down producers. For more information, see:

                http://activemq.apache.org/producer-flow-control.html

              -->

              <systemUsage>

                <systemUsage>

                    <memoryUsage>

                        <memoryUsage percentOfJvmHeap="70" />

                    </memoryUsage>

                    <storeUsage>

                        <storeUsage limit="100 gb"/>

                    </storeUsage>

                    <tempUsage>

                        <tempUsage limit="50 gb"/>

                    </tempUsage>

                </systemUsage>

            </systemUsage>

     

            <!--

                The transport connectors expose ActiveMQ over a given protocol to

                clients and other brokers. For more information, see:

     

                http://activemq.apache.org/configuring-transports.html

            -->

            <transportConnectors>

                <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->

                <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=10000&amp;wireFormat.maxFrameSize=1048576000"/>

                <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

                <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

                <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

                <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

            </transportConnectors>

     

            <!-- destroy the spring context on shutdown to stop jetty -->

            <shutdownHooks>

                <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />

            </shutdownHooks>

     

        </broker>

     

        <!--

            Enable web consoles, REST and Ajax APIs and demos

            The web consoles requires by default login, you can disable this in the jetty.xml file

     

            Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details

        -->

            

        <import resource="jetty.xml"/>

     

    </beans>

共有 人打赏支持
粉丝 5
博文 10
码字总数 1601
×
王华超
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: