文档章节

分布式消息队列Kafka的集群部署

FEINIK
 FEINIK
发布于 2018/05/05 22:05
字数 1365
阅读 624
收藏 23

1 概述

Apache Kafka 是一个分布式高吞吐量的流消息系统,Kafka建立在ZooKeeper同步服务之上。它与Apache Storm和Spark完美集成,用于实时流数据分析,与其他消息传递系统相比,Kafka具有更好的吞吐量,内置分区,数据副本和高度容错功能,因此非常适合大型消息处理应用场景。

Kafka架构简介请查看:https://my.oschina.net/feinik/blog/1806488

2 部署图

3 Kafka集群部署前环境准备

3.1 安装Java

推荐安装Java 8,请自行安装。

3.2 部署Zookeeper集群

3.2.1 下载Zookeeper安装包

这里部署的zk版本是:zookeeper-3.4.9.tar.gz

3.2.2 安装

1、首先在server1中安装

(1)解压:tar -zxvf zookeeper-3.4.9.tar.gz

(2)cd zookeeper-3.4.9/conf

(3)cp zoo_sample.cfg zoo.cfg

(4)修改zoo.cfg配置文件,内容如下

tickTime=2000

# zk数据目录
dataDir=/home/hadoop/app/zookeeper/data

# 客户端连接端口配置
clientPort=2181

initLimit=10

syncLimit=5

# 服务地址,2888为集群内个节点通信的端口,3888为leader选举时使用的端口
server.1=slave1:2888:3888

server.2=slave2:2888:3888

server.3=slave3:2888:3888

注:配置完后,要在dataDir配置属性值的目录下创建myid文件,用作集群的节点标识,内容为server.id属性指定的值,如这里server.id中的id的值为1,所以myid文件内容为1,其他zk节点分别为2、3

2 、拷贝相同的一份zookeeper-3.4.9到server2、server3服务器中

3、 配置Zookeeper的环境变量并分别启动即可完成zk集群的部署

4 部署Kafka集群

4.1 安装并配置

这里安装的版本为:kafka_2.12-1.1.0.tgz

注:先在server1中安装,然后在拷贝一份至server2、server3服务器中

(1)解压

$tar -zxvf kafka_2.12-1.1.0.tgz -C /home/app

(2)重命名

$mv kafka_2.12-1.1.0 kafka

(3)配置Kafka的环境变量

(4)修改Kafka配置文件server.properties,修改如下配置项

  • 修改broker(代理)id标识,集群中需要保证唯一

        broker.id=1

  •   修改日志存储目录配置

        log.dirs=/home/app/kafka/log-data

  • 修改Zookeeper的连接地址,Kafka自带了Zookeeper,但是这里我们配置成自己的zk集群地址

        zookeeper.connect=server1:2181,server2:2181,server3:2181

(5)拷贝server1中部署好的kafka包到server2、server3服务器中

(6)修改server2中kafka的server.properties配置文件

        broker.id=2

(7)修改server3中kafka的server.properties配置文件

        broker.id=3

5 启动集群

5.1 先启动Zookeeper集群

分别在server1、server2、server3中使用如下命令启动

$zkServer.sh start

注:也可以通过脚本来启动Zookeeper集群,前提是需要配置无密登录,脚本内容如下

#!/bin/bash
if(( $# != 1 )) ; then
   echo "Usage: zk.sh {start|stop}";
   exit;
fi

cuser=`whoami`;

for i in {server1,server2,server3};
do
   echo ---------- $i---------------;
   ssh $cuser@$i "cd /home/app/zookeeper; ./bin/zkServer.sh $@";
done

 

 

5.2 启动Kafka集群

分别在server1、server2、server3中使用如下命令启动

$kafka-server-start.sh -daemon /home/app/kafka/config/server.properties

注:也可以通过脚本来启动Kafka集群,脚本内容如下

#!/bin/bash
cuser=`whoami`;

for i in {server1,server2,server3};
do
   echo ---------- $i--------------;
   ssh $cuser@$i "/home/app/kafka/bin/kafka-server-start.sh -daemon /home/app/kafka/config/server.properties";
   echo "start complate!"
done

 

5.3 查看集群启动情况

通过jps命令来查看服务启动进程,server1、server2、server3都包含Kafka、QuorumPeerMain服务进程表示集群启动成功

$jps
5506 Kafka
5733 Jps
5212 QuorumPeerMain

6 Kafka Java API访问

6.1 生产者发送消息

public class ProducerClient {

    private Producer<String, String> producer;

    @Before
    public void init() {
        Properties props = new Properties();
        /**
         * broker地址列表,无需指定集群中的所有broker地址,Producer会从给定的broker中找到其他broker的地址信息,
         * 推荐这里配置两个,可以防止broker宕机产生无法连接的问题
         */
        props.put("bootstrap.servers", "server1:9092,server2:9092");
        /**
         * 指定key的序列化方式,Kafka 默认提供了常用的几种Java对象类型的序列化类
         */
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(props);
    }


    @Test
    public void send() throws Exception {
 //此处未指定key,那么发送的多条消息会被均匀的分布在Topic的所有可用分区中
        ProducerRecord<String, String> record = new ProducerRecord<>("test",
                "hello word");
//消息的异步发送
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                System.out.println("消息发送完成!");
            }
        });
    }

    @After
    public void close() {
        producer.close();
    }
}

注:消息的发送有三种方式:同步发送、异步发送、fire-and-forget(发送完并不关心发送结果)

同步发送:调用send方法后,返回Future对象,通过调用Future的get方法来同步等待消息的发送结果。

异步发送:调用send方法的时候指定一个回调函数,broker在接收成功消息后会回调该函数

fire-and-forget:调用send方法后并不关心发送的结果处理

6.2 消费者订阅并消费消息

public class ConsumerClient {

    private Consumer<String, String> consumer;

    @Before
    public void init() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "server1:9092,server2:9092");
        //指定消费者群组标识
        props.put("group.id", "g1");
        //key与value的反序列化器
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<>(props);
    }

    @Test
    public void consume() {
        //订阅主题为test的消息
        consumer.subscribe(Collections.singletonList("test"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                String value = record.value();
                System.out.println("接收到消息:" + value);
            }
        }
    }

    @After
    public void close() {
        consumer.close();
    }
}

7 总结

本文主要介绍了Kafka的分布式集群部署方式,以及Kafka依赖的第三方组件Zookeeper的集群部署,最后通过Kafka Java API来演示了生产者发送消息与消费者消费消息的示例代码,关于Kafka的其他使用细节,请查阅官网:http://kafka.apache.org/

 

© 著作权归作者所有

FEINIK
粉丝 227
博文 61
码字总数 61705
作品 0
广州
高级程序员
私信 提问
kafka原理及Docker环境部署

技术原理 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架...

kekefund
2018/10/26
0
0
分布式的、高吞吐量、高可扩展性消息队列服务Kafka商业化发布!

摘要:消息队列Kafka是一个分布式的、高吞吐量、高可扩展性消息队列服务,广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等,是大数据生态中不可或缺的产品之一,阿里云提供...

萌萌怪兽
2018/07/30
0
0
消息中间件kafka+zookeeper集群部署、测试与应用(1)

业务系统中,通常会遇到这些场景:A系统向B系统主动推送一个处理请求;A系统向B系统发送一个业务处理请求,因为某些原因(断电、宕机。。),B业务系统挂机了,A系统发起的请求处理失败;前端...

明日的我
2017/10/26
1K
10
消息服务中间件 - WQS

WQS 是微博开源的消息服务中间件。 功能特性 多租户支持; 多 IDC 支持; 多协议支持 memcached、http 1/2、motan 轻客户端。(去 zk 依赖、无 partition 感知) 支持 pub/sub、long pollin...

匿名
2017/12/05
569
0
ArangoDB 3.3 Milestone 1 发布,多模型 NoSQL 数据库

ArangoDB 是一个开源的分布式原生多模型数据库 (Apache 2 license)。 ArangoDB 3.3 Milestone 1 发布了,主要改进内容包括 ArangoDB 对多数据中心的初步支持。可让你在不同的数据中心分别运行...

周其
2017/10/17
439
2

没有更多内容

加载失败,请刷新页面

加载更多

[top]cpu内存

%Cpu(s): 96.0 us用户进程整理cpu的占比,按整个cpu算。 PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND ......

Danni3
25分钟前
4
0
JavaScript权威指南笔记2

第二章、词法结构 1、字符集 JavaScript程序:Unicode字符集编写 Unicode:ASCII和Latin-1的超集,支持所有在用的语言。 ECMAScript 3要求JavaScript的实现必须支持Unicode 2.1及后续版本 EC...

_Somuns
32分钟前
6
0
数据安全管理:RSA算法,签名验签流程详解

本文源码:GitHub·点这里 || GitEE·点这里 一、RSA算法简介 1、加密解密 RSA加密是一种非对称加密,在公开密钥加密和电子商业中RSA被广泛使用。可以在不直接传递密钥的情况下,完成加解密操...

知了一笑
今天
7
0
Podman 使用指南

> 原文链接:Podman 使用指南 Podman 原来是 CRI-O 项目的一部分,后来被分离成一个单独的项目叫 libpod。Podman 的使用体验和 Docker 类似,不同的是 Podman 没有 daemon。以前使用 Docker...

米开朗基杨
今天
6
0
拯救 项目经理个人时间的5个技巧

优秀的项目经理都有一个共同点,那就是良好的时间管理能力。专业的项目经理会确保他们的时间投入富有成效,尽可能避免时间浪费。 时间管理叫做GTD,即Getting Things Done——“把事情做完”...

Airship
今天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部