文档章节

RocketMQ搭建及demo

六楼的宣言
 六楼的宣言
发布于 2017/04/04 20:06
字数 408
阅读 50
收藏 4

一、RocketMQ机器硬件要求内存最好不要低于8G, 系统linux,且已经安装好JDK

二、安装文件下载地址:
http://mirror.bit.edu.cn/apache/incubator/rocketmq/4.0.0-incubating/rocketmq-all-4.0.0-incubating-bin-release.zip

三、下载RocketMQ安装文件并上传到服务器上后

解压 

unzip rocketmq-all-4.0.0-incubating-bin-release.zip

进入到解压目录下的bin目录中

启动 NameServer:

nohup sh mqnamesrv &

启动 broker

nohup sh mqbroker -n localhost:9876 &

使用jps命令可以看到有以下两个java程序运行中

为了便于其它机器调试访问,可临时将防火墙关闭:

service firewalld stop

 

四、Java 程序demo

maven依赖包:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.0.0-incubating</version>
</dependency>

消息生产者demo代码:

package com.classtest.rocketmq;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args){
        DefaultMQProducer producer = new DefaultMQProducer("Producer");
        producer.setNamesrvAddr("192.168.133.141:9876");
        try {
            producer.start();

            Message msg = new Message("PushTopic",
                    "push",
                    "1",
                    "Just for test1.".getBytes());

            SendResult result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() +
                    " result:" + result.getSendStatus());

            msg = new Message("PushTopic",
                    "push",
                    "2",
                    "Just for test2.".getBytes());

            result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() +
                    " result:" + result.getSendStatus());

            msg = new Message("PushTopic",
                    "push",
                    "1",
                    "Just for test3.".getBytes());

            result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() +
                    " result:" + result.getSendStatus());
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            producer.shutdown();
        }
    }
}

消息消费者Java代码demo:

package com.classtest.rocketmq;

import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

public class Consumer {
    public static void main(String[] args){
        DefaultMQPushConsumer consumer =
                new DefaultMQPushConsumer("PushConsumer");
        consumer.setNamesrvAddr("192.168.133.141:9876");
        try {
            //订阅PushTopic下Tag为push的消息
            consumer.subscribe("PushTopic", "push");
            //程序第一次启动从消息队列头取数据
            consumer.setConsumeFromWhere(
                    ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener(
                    new MessageListenerConcurrently() {
                        public ConsumeConcurrentlyStatus consumeMessage(
                                List<MessageExt> list,
                                ConsumeConcurrentlyContext Context) {
                            Message msg = list.get(0);
                            System.out.println(msg.toString());
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                    }
            );
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

可以先运行消费者demo, 然后运行生产者demo时可以消费者demo的运行窗口输出消息

 

参考:

https://my.oschina.net/jayronwang/blog/861396

http://rocketmq.apache.org/docs/quick-start/

 

本文转载自:

共有 人打赏支持
六楼的宣言

六楼的宣言

粉丝 6
博文 44
码字总数 12552
作品 0
海淀
后端工程师
《Apache RocketMQ用户指南》官方文档

RocketMQ–导读 原文链接 译者:小村长 最近两个项目中用到了RocketMQ消息中间件,每次都是在网上找几个Demo,而没有去看它的官方文档。年前面试某大型互联网企业。RocketMQ多有提及。今借此...

小村长
01/23
0
0
发力IOT、大数据,Apache RocketMQ 构建云时代的新生态

摘要 2018年9月1日,由阿里巴巴中间件举办的RocketMQ开发者沙龙在北京举行。这是RocketMQ今年举办的第二次大型线下技术交流活动,出席技术沙龙的嘉宾包括Apache RocketMQ创始人、OpenMessagi...

中间件小哥
09/21
0
0
Sentinel 如何通过匀速请求和冷启动来保障服务的稳定性

这是围绕 Sentinel 的使用场景、技术对比和实现、开发者实践等维度推出的系列文章的第二篇。 第一篇:Dubbo 的流量防卫兵 Sentinel如何通过限流实现服务的高可用性 - 链接 本期将通过 Sentin...

许此一生
08/28
0
0
RocketMQ 的保险丝| Sentinel 如何通过匀速请求和冷启动来保障服务的稳定性

这是围绕 Sentinel 的使用场景、技术对比和实现、开发者实践等维度推出的系列文章的第二篇。 第一篇:Dubbo 的流量防卫兵 | Sentinel如何通过限流实现服务的高可用性 - 链接 本期将通过 Sent...

中间件小哥
08/16
0
0
RocketMQ初探(二)之RocketMQ3.26版本搭建(含Demo测试)

  作为一名程序猿,要敢于直面各种现实,脾气要好,心态要棒,纵使Bug虐我千百遍,我待它如初恋,方法也有千万种,一条路不行,换条路走走,方向对了,只要前行,总会上了罗马的道。   A...

Hello____JAVA
07/27
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Java并发编程:volatile关键字解析

volatile这个关键字可能很多朋友都听说过,或许也都用过。在Java 5之前,它是一个备受争议的关键字,因为在程序中使用它往往会导致出人意料的结果。在Java 5之后,volatile关键字才得以重获生...

engeue
15分钟前
0
0
通过ajax访问远程天气预报服务

http://www.webxml.com.cn/zh_cn/index.aspx 更改wsdl文件 打开文件将15行,51行,101行去掉 然后把文件复制到c盘 然后在桌面上面就生成了文件 将文件打成jar包 package cn.it.ws.weather;...

江戸川
今天
1
0
聊聊storm的tickTuple

序 本文主要研究一下storm的tickTuple 实例 TickWordCountBolt public class TickWordCountBolt extends BaseBasicBolt { private static final Logger LOGGER = LoggerFactory.getLogg......

go4it
今天
1
0
自动装箱和自动拆箱

自动装箱和自动拆箱 Java 提供了 8 种基本数据类型,每种数据类型都有其对应的包装类型,包装类是面向对象的类,是一种高级的数据类型,可以进行一些比较复杂的操作,它们是引用类型而不再基...

tsmyk0715
今天
2
0
简易审计系统

1、有时候我们需要对线上用户的操作进行记录,可以进行追踪,出现问题追究责任,但是linux自带的history并不会实时的记录(仅仅在内存中,当用户正常退出(exit logout )时才会记录到history文件里...

芬野de博客
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部