文档章节

RocketMQ搭建及demo

六楼的宣言
 六楼的宣言
发布于 2017/04/04 20:06
字数 408
阅读 178
收藏 4

一、RocketMQ机器硬件要求内存最好不要低于8G, 系统linux,且已经安装好JDK

二、安装文件下载地址:
http://mirror.bit.edu.cn/apache/incubator/rocketmq/4.0.0-incubating/rocketmq-all-4.0.0-incubating-bin-release.zip

三、下载RocketMQ安装文件并上传到服务器上后

解压 

unzip rocketmq-all-4.0.0-incubating-bin-release.zip

进入到解压目录下的bin目录中

启动 NameServer:

nohup sh mqnamesrv &

启动 broker

nohup sh mqbroker -n localhost:9876 &

使用jps命令可以看到有以下两个java程序运行中

为了便于其它机器调试访问,可临时将防火墙关闭:

service firewalld stop

 

四、Java 程序demo

maven依赖包:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.0.0-incubating</version>
</dependency>

消息生产者demo代码:

package com.classtest.rocketmq;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args){
        DefaultMQProducer producer = new DefaultMQProducer("Producer");
        producer.setNamesrvAddr("192.168.133.141:9876");
        try {
            producer.start();

            Message msg = new Message("PushTopic",
                    "push",
                    "1",
                    "Just for test1.".getBytes());

            SendResult result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() +
                    " result:" + result.getSendStatus());

            msg = new Message("PushTopic",
                    "push",
                    "2",
                    "Just for test2.".getBytes());

            result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() +
                    " result:" + result.getSendStatus());

            msg = new Message("PushTopic",
                    "push",
                    "1",
                    "Just for test3.".getBytes());

            result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() +
                    " result:" + result.getSendStatus());
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            producer.shutdown();
        }
    }
}

消息消费者Java代码demo:

package com.classtest.rocketmq;

import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

public class Consumer {
    public static void main(String[] args){
        DefaultMQPushConsumer consumer =
                new DefaultMQPushConsumer("PushConsumer");
        consumer.setNamesrvAddr("192.168.133.141:9876");
        try {
            //订阅PushTopic下Tag为push的消息
            consumer.subscribe("PushTopic", "push");
            //程序第一次启动从消息队列头取数据
            consumer.setConsumeFromWhere(
                    ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener(
                    new MessageListenerConcurrently() {
                        public ConsumeConcurrentlyStatus consumeMessage(
                                List<MessageExt> list,
                                ConsumeConcurrentlyContext Context) {
                            Message msg = list.get(0);
                            System.out.println(msg.toString());
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                    }
            );
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

可以先运行消费者demo, 然后运行生产者demo时可以消费者demo的运行窗口输出消息

 

参考:

https://my.oschina.net/jayronwang/blog/861396

http://rocketmq.apache.org/docs/quick-start/

 

本文转载自:

六楼的宣言

六楼的宣言

粉丝 6
博文 44
码字总数 12536
作品 0
海淀
后端工程师
私信 提问
北京社区 | Apache RocketMQ 首届开发者训练营

时间:2019.06.29(周六) 18:00-21:00 地点:北京市海淀区中关村大街46号院 北京众海投资-东门(人民大学地铁站A2出口附近) 每位到场开发者都可获得极客时间99元课程卡 课程目的 掌握Rocke...

ApacheRocketMQ社区
06/22
20
0
Apache RocketMQ 4.5.0 发布,实现自动容灾切换

近日,分布式消息开源项目 Apache RocketMQ 发布了 4.5.0版本,该版本引入了 Dledger 的多副本技术,可实现多地多中心场景下的自动容灾切换,并保障切换过程中数据的完整性和一致性,同时,发...

阿里巴巴中间件
04/10
2.8K
3
RocketMQ源码分析之从官方示例窥探:RocketMQ事务消息实现基本思想

摘要: RocketMQ源码分析之从官方示例窥探RocketMQ事务消息实现基本思想。 RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理。首先从官方给出的Demo实例入手,以...

阿里云云栖社区
01/08
87
0
发力IOT、大数据,Apache RocketMQ 构建云时代的新生态

摘要 2018年9月1日,由阿里巴巴中间件举办的RocketMQ开发者沙龙在北京举行。这是RocketMQ今年举办的第二次大型线下技术交流活动,出席技术沙龙的嘉宾包括Apache RocketMQ创始人、OpenMessagi...

中间件小哥
2018/09/21
0
0
Sentinel 如何通过匀速请求和冷启动来保障服务的稳定性

这是围绕 Sentinel 的使用场景、技术对比和实现、开发者实践等维度推出的系列文章的第二篇。 第一篇:Dubbo 的流量防卫兵 Sentinel如何通过限流实现服务的高可用性 - 链接 本期将通过 Sentin...

许此一生
2018/08/28
56
0

没有更多内容

加载失败,请刷新页面

加载更多

编程作业20190210900169

1编写一个程序,提示用户输入名和姓,然后以“名,姓”的格式打印出来。 #include <stdio.h>#include <stdlib.h> int main(){ char firstName[20]; char lastName[20]; print......

1李嘉焘1
15分钟前
2
0
补码的优点及原理分析

只讨论整数 1.计算机内部为什么没有减法器? 减法运算本身其实就是加法,如x - y即x +(-y),所以只需要将负数成功表示出来并可以参加加法运算,那加法器就可同时实现“+”和“-”的运算。这...

清自以敬
31分钟前
59
0
Docker 可视化管理 portainer

官网安装指南: https://portainer.readthedocs.io/en/latest/deployment.html docker-compose.yml 位置,下载地址:https://downloads.portainer.io/docker-compose.yml...

Moks角木
58分钟前
5
0
Spring Security 实战干货:必须掌握的一些内置 Filter

1. 前言 上一文我们使用 Spring Security 实现了各种登录聚合的场面。其中我们是通过在 UsernamePasswordAuthenticationFilter 之前一个自定义的过滤器实现的。我怎么知道自定义过滤器要加在...

码农小胖哥
今天
8
0
常见分布式事务解决方案

1 微服务的发展 微服务倡导将复杂的单体应用拆分为若干个功能简单、松耦合的服务,这样可以降低开发难度、增强扩展性、便于敏捷开发。当前被越来越多的开发者推崇,很多互联网行业巨头、开源...

asdf08442a
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部