文档章节

RocketMQ:一个纯java的开源消息中间件--开发测试环境搭建

cloud-coder
 cloud-coder
发布于 2014/02/18 12:26
字数 1305
阅读 1.3W
收藏 72

一、简介

    RocketMQ的前身是Metaq,当Metaq3.0发布时,产品名称改为RocketMQ

    MetaQ2.x版本由于依赖了alibaba公司内部其他系统,对于公司外部用户使用不够友好,推荐使用3.0版本。

     项目地址:https://github.com/alibaba/RocketMQ

二、安装RocketMQ

     安装RocketMQ需要jdk1.6, maven,git环境

     如果本机没有安装git,请使用如下命令安装

     yum install git

     具体安装步骤可以参考RocketMQ项目组给出的步骤,参见:

     https://github.com/alibaba/RocketMQ/wiki/Quick-Start

git clone https://github.com/alibaba/RocketMQ.git

cd RocketMQ

sh install.sh

cd devenv
      安装完成后,因为install.sh脚本中创建devenv 符号链接写错了目录,需要在RocketMQ目录下执行如下命令:

       rm -rf devenv

       ln -s target/alibaba-rocketmq-3.0.7/alibaba-rocketmq devenv

       启动RocketMQ

       cd devenv/bin

       nohup sh mqnamesrv &

       nohup sh mqbroker -n "192.168.230.128:9876" &

       more nohup.out

       如果显示:

       The Name Server boot success.
       The broker[vdata.kt, 192.168.230.128:10911] boot success.

       则NameServer,Broker启动成功

三、在eclipse中开发测试

       1.创建一个maven项目,其pom.xml的内容见pom.xml

       2. 编写消息产生者Producer,见Producer.java

       3.编写消息消费者Consumer,见PushConsumer.java

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.catt</groupId>
	<artifactId>RocketMQTest</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>RocketMQTest</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>
		<dependency>
			<groupId>com.alibaba.rocketmq</groupId>
			<artifactId>rocketmq-client</artifactId>
			<version>3.0.7</version>
		</dependency>
		<dependency>
			<groupId>com.alibaba.rocketmq</groupId>
			<artifactId>rocketmq-all</artifactId>
			<version>3.0.7</version>
			<type>pom</type>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.1.1</version>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-core</artifactId>
			<version>1.1.1</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.10</version>
			<scope>test</scope>
		</dependency>
	</dependencies>
</project>

Producer.java

默认情况下,一台服务器只能启动一个Producer或Consumer实例,所以如果需要在一台服务器启动多个实例,需要设置实例的名称

producer.setNamesrvAddr("192.168.230.128:9876");
producer.setInstanceName("Producer");

package com.catt.rocketmq.example;

import java.util.concurrent.TimeUnit;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

public class Producer {
	public static void main(String[] args) throws MQClientException,
			InterruptedException {
		/**
		 * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>
		 * 注意:ProducerGroupName需要由应用来保证唯一<br>
		 * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
		 * 因为服务器会回查这个Group下的任意一个Producer
		 */
		DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
		producer.setNamesrvAddr("192.168.230.128:9876");
		producer.setInstanceName("Producer");

		/**
		 * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
		 * 注意:切记不可以在每次发送消息时,都调用start方法
		 */
		producer.start();

		/**
		 * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
		 * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>
		 * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>
		 * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
		 */
		for (int i = 0; i < 100; i++) {
			try {
				{
					Message msg = new Message("TopicTest1",// topic
							"TagA",// tag
							"OrderID001",// key
							("Hello MetaQ").getBytes());// body
					SendResult sendResult = producer.send(msg);
					System.out.println(sendResult);
				}

				{
					Message msg = new Message("TopicTest2",// topic
							"TagB",// tag
							"OrderID0034",// key
							("Hello MetaQ").getBytes());// body
					SendResult sendResult = producer.send(msg);
					System.out.println(sendResult);
				}

				{
					Message msg = new Message("TopicTest3",// topic
							"TagC",// tag
							"OrderID061",// key
							("Hello MetaQ").getBytes());// body
					SendResult sendResult = producer.send(msg);
					System.out.println(sendResult);
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
			TimeUnit.MILLISECONDS.sleep(1000);
		}

		/**
		 * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
		 * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
		 */
		producer.shutdown();
	}
}

PushConsumer.java

package com.catt.rocketmq.example;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

public class PushConsumer {

	/**
	 * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>
	 * 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br>
	 */
	public static void main(String[] args) throws InterruptedException,
			MQClientException {
		/**
		 * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
		 * 注意:ConsumerGroupName需要由应用来保证唯一
		 */
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
				"ConsumerGroupName");
		consumer.setNamesrvAddr("192.168.230.128:9876");
		consumer.setInstanceName("Consumber");

		/**
		 * 订阅指定topic下tags分别等于TagA或TagC或TagD
		 */
		consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
		/**
		 * 订阅指定topic下所有消息<br>
		 * 注意:一个consumer对象可以订阅多个topic
		 */
		consumer.subscribe("TopicTest2", "*");

		consumer.registerMessageListener(new MessageListenerConcurrently() {

			/**
			 * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
			 */
			@Override
			public ConsumeConcurrentlyStatus consumeMessage(
					List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
				System.out.println(Thread.currentThread().getName()
						+ " Receive New Messages: " + msgs.size());

				MessageExt msg = msgs.get(0);
				if (msg.getTopic().equals("TopicTest1")) {
					// 执行TopicTest1的消费逻辑
					if (msg.getTags() != null && msg.getTags().equals("TagA")) {
						// 执行TagA的消费
						System.out.println(new String(msg.getBody()));
					} else if (msg.getTags() != null
							&& msg.getTags().equals("TagC")) {
						// 执行TagC的消费
					} else if (msg.getTags() != null
							&& msg.getTags().equals("TagD")) {
						// 执行TagD的消费
					}
				} else if (msg.getTopic().equals("TopicTest2")) {
					System.out.println(new String(msg.getBody()));
				}

				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});

		/**
		 * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
		 */
		consumer.start();

		System.out.println("Consumer Started.");
	}
}

© 著作权归作者所有

cloud-coder
粉丝 247
博文 193
码字总数 141277
作品 0
广州
架构师
私信 提问
加载中

评论(15)

rhwayfun
rhwayfun
如果一个应用起多个consumer消费不同的topic会有什么问题吗
盲人摸象
盲人摸象

引用来自“yuyidi”的评论

好歹是用它的jar包自己写一个也比你这直接拿git上拉下来的demo强,

引用来自“cloud-coder”的评论

能把开源的东西用好,并能改进也是不错的。
呵呵,那是的
cloud-coder
cloud-coder 博主

引用来自“yuyidi”的评论

好歹是用它的jar包自己写一个也比你这直接拿git上拉下来的demo强,
能把开源的东西用好,并能改进也是不错的。
盲人摸象
盲人摸象
好歹是用它的jar包自己写一个也比你这直接拿git上拉下来的demo强,
cloud-coder
cloud-coder 博主
你讲的问题我未遇到过,请到https://github.com/alibaba/RocketMQ/issues/264提问,看看开发者有什么解决方案
开源中国首席----
开源中国首席----

引用来自“筱龙缘”的评论

服务正常启动了 可以报
com.alibaba.rocketmq.client.exception.MQClientException: No route info of this topic, TopicTest1
See https://github.com/alibaba/RocketMQ/issues/264 for further details.
  at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:570)
  at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:972)
  at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:109)
  at com.alibaba.rocketmq.example.simple.Producer.main(Producer.java:60)

怎么处理呢?
没有启动域名服务器!!
筱龙缘
筱龙缘
服务正常启动了 可以报
com.alibaba.rocketmq.client.exception.MQClientException: No route info of this topic, TopicTest1
See https://github.com/alibaba/RocketMQ/issues/264 for further details.
  at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:570)
  at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:972)
  at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:109)
  at com.alibaba.rocketmq.example.simple.Producer.main(Producer.java:60)

怎么处理呢?
小Pu

引用来自“cloud-coder”的评论

Could not reserve enough space for object heap

说得很清楚,内存不够,请加大服务器内存或修改运行参数

谢谢了。

cloud-coder
cloud-coder 博主
Could not reserve enough space for object heap

说得很清楚,内存不够,请加大服务器内存或修改运行参数
小Pu
请问报下面这个错误是为什么呢?
Error occurred during initialization of VM
Could not reserve enough space for object heap
Error occurred during initialization of VM
Could not reserve enough space for object heap
高手问答第 222 期 —— RocketMQ 架构设计与实现原理

OSCHINA 本期高手问答(2019年1月2日 — 2019年1月8日)我们请来了丁威老师为大家解答关于 RocketMQ 的问题。 丁威,Java、分布式服务架构、中间件等多个领域的技术专家,擅长高并发编程、Net...

局长
2019/01/01
5.4K
43
RocketMQ与Kafka对比

RocketMQ与Kafka对比(18项差异) 淘宝内部的交易系统使用了淘宝自主研发的Notify消息中间件,使用Mysql作为消息存储媒介,可完全水平扩容,为了进一步降低成本,我们认为存储部分可以进一步...

莫问viva
2015/05/08
717
0
《RocketMQ技术内幕:RocketMQ架构设计与实现原理》一导读

内容介绍 本书由RocketMQ社区早期的布道者和技术专家撰写,Apache RocketMQ创始人/Linux OpenMessaging创始人兼主席/Alibaba Messaging开源技术负责人冯嘉对其高度评价并作序推荐。 源码角度...

小编辑01
2019/01/04
0
0
初试RocketMQ消息中间件

为什么要用MQ 在使用SpringCloud或Dubbo进行SOA架构后,不同的应用层模块(web)与业务层模块(service)要建立调用关系,也就是依赖/耦合 当模块变多时,模块间的耦合度也会逐步上升,这就需要一...

LinkedBear
2018/08/02
4.1K
0
Apache RocketMQ:从孵化项目到顶级项目 这是一个新的开始

几个月前,RocketMQ 社区发生了一件大事 —— Apache RocketMQ 宣布从 Apache 社区正式毕业,成为 Apache 顶级项目 (TLP),这也是国内首个互联网中间件的 Apache 顶级项目。要知道,RocketM...

局长
2017/12/18
1K
1

没有更多内容

加载失败,请刷新页面

加载更多

深圳哪里可以开家用电器发票-中国-新闻网

深圳哪里可以开家用电器发票【152 * 9б 28 * 21 б9】陈生,诚、信、合、作,保、真、售、后、保、障、长、期、有、效。adb的全称为Android Debug Bri...

17060824738
7分钟前
15
0
深圳哪里可以开劳保用品发票-中国-新闻网

深圳哪里可以开劳保用品发票【152 * 9б 28 * 21 б9】陈生,诚、信、合、作,保、真、售、后、保、障、长、期、有、效。adb的全称为Android Debug Bri...

17095420210
11分钟前
27
0
Mac OS X 10.15 编译和安装 Nginx

想在自己电脑里装个 Nginx 来进行各种测试,下面是详细的过程记录: 1. 下载软件 首先建个临时目录 nginx-src 并下载所需软件的源码进行配置 mkdir nginx-srccd nginx-srcwget http://n...

红薯
27分钟前
42
0
0228 我的潘多拉

我的潘多拉 从一个故事说起。<br />从前,有个Java程序员非常喜欢写程序,喜欢研究源码,读英文文档。但是它在一家小公司里工作,公司的技术栈很陈旧。<br /> <br />单个系统代码中含有很多的...

李福春carter
今天
18
0
OSChina 周六乱弹 —— 屁会不会传染病毒

Osc乱弹歌单(2020)请戳(这里) 【今日歌曲】 @薛定谔的兄弟 :分享洛神有语创建的歌单「我喜欢的音乐」: 《ハレハレヤ(朗朗晴天)》- 猫瑾 手机党少年们想听歌,请使劲儿戳(这里) @空格...

小小编辑
今天
77
1

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部