文档章节

Kafka版本升级(无停机)

huxihx
 huxihx
发布于 2017/05/19 17:10
字数 747
阅读 699
收藏 1

    升级Kafka集群的版本其实很简单,核心步骤只需要4步,但是我们需要在升级的过程中确保每一步操作都不会“打扰”到producer和consumer的正常运转。为此,笔者在本机搭了一个测试环境进行实际的版本升级实验。在开始之前,简要介绍一下测试环境的部署情况及目标:Kafka 0.10.0.0 双broker测试环境,而目标是把该集群升级到0.10.2版本

两个broker启动时分别读取server.properties和server2.properties。

启动测试环境
    打开两个终端,分别执行startBroker1.sh和startBroker2.sh。startBroker*.sh内容很简单就是:

CURRENT_PATH=<your_path>/kafka_2.11-0.10.0.0
cd $CURRENT_PATH
JMX_PORT=9997 bin/kafka-server-start.sh ../configs/server.properties

创建测试topic
    创建一个双分区,replication-factor=2的topic:test,然后使用kafka-topics工具describe一下:

    okay,目前一切正常。

启动producer

    很简单的producer程序,每1秒发送一条消息,然后打印成功提交的消息数和提交失败的消息数。特别注意提交失败的消息数,后续我们依赖此值来确保升级流程不会影响到producer。 主要程序代码如下:

Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092,localhost:9093");
        props.put("acks", "all");
        props.put("retries", Integer.MAX_VALUE);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        final AtomicInteger success = new AtomicInteger(0);
        final AtomicInteger failed = new AtomicInteger(0);
        try {
            while (true) {
                producer.send(new ProducerRecord<String, String>("test", "a message"), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception != null) {
                            System.out.println("Current failed count: " + failed.incrementAndGet());
                        } else
                            System.out.println("Current success count: " + success.incrementAndGet()
                                    + ", failed: " + failed.get());
                    }
                });
                Thread.sleep(2000);

            }
        } finally {
            producer.close();
        }

启动consumer

    为简单起见,我使用了console-consumer,如下所示。另外, 是Kafka 0.10.0.0版本,所以一定要加上`--new-consumer`才能使用新版本consumer!

bin/kafka-console-consumer.sh --topic test --from-beginning --new-consumer --bootstrap-server localhost:9092,localhost:9093

此时,你应该可以看到producer和consumer都可以正常地工作。

----------------------------- 升级的流程正式开始 -----------------------------

切记: 每做完一步都要观察producer和consumer是否出现严重错误!

更新broker间通讯版本号和消息格式版本

向所有broker的server.properties文件中增加下面两行:

inter.broker.protocol.version=0.10.0
log.message.format.version=0.10.0

依次更新代码,重启所有broker

注意一定要依次重启,即先重启broker1,然后再重启broker2

再次更新broker间通讯版本和消息格式版本

inter.broker.protocol.version=0.10.2
log.message.format.version=0.10.2

注意,这次要更新成你要升级到的目标版本。比如我们要升级到0.10.2,那么就更新为0.10.2

再次依次重启broker

    依然要再次顺序重启broker!

    好了,当前集群版本已经升级完毕了。值得一提的是,在整个升级过程中producer应该是可以正常工作的,但consumer可能会出现位移提交失败的警告,因此有可能会造成重复消费,而broker端可能会出现“org.apache.kafka.common.errors.NotLeaderForPartitionException”异常,因为__consumers_offsets各分区的leader有可能会发生瞬时的变化,因此通常也是不必在意的。

© 著作权归作者所有

huxihx
粉丝 1
博文 18
码字总数 30674
作品 0
东城
程序员
私信 提问
zabbix3.2.6.1升级3.4.4图文心得

为了在升级过程中将停机时间和数据丢失降低到最小,建议先停机升级Zabbix server,然后再逐个停机升级和启动Zabbix proxy。当所有的Proxy升级完毕后,再启动Zabbix Server。在Zabbix server...

喵来个鱼
2017/12/18
0
0
Flume 常用插件扩展包 - scc-flume-plugin

本插件是对 Flume 现有插件的扩充,主要是: 1、kafka-source,相对 flume 自带插件,升级了 kafka 依赖版本,支持更高版本的 kafka 集群,并支持自动扫描特定 topic ,无需在配置文件指定,...

尚浩宇
2017/12/28
394
0
Flume日志收集分层架构应用实践

Flume作为一个日志收集工具,非常轻量级,基于一个个Flume Agent,能够构建一个很复杂很强大的日志收集系统,它的灵活性和优势,主要体现在如下几点: 1)模块化设计:在其Flume Agent内部可...

workming
2018/06/29
0
0
Kafka不停机,如何无感知迁移ZooKeeper集群?

Kafka 在 Yelp 的应用十分广泛,Yelp 每天通过各种集群发送数十亿条消息,在这背后,Kafka 使用 Zookeeper 完成各种分布式协调任务。 因为Yelp 非常依赖 Kafka,那么问题来了,它是否可以在不...

微笑向暖wx
02/25
28
0
nagios的计划停机时间出问题,各位大侠求解!!!

nagios的计划停机时间问题,我的nagios设定了计划停机服务,再将计划停机注释给删除掉,In Scheduled Downtime? 还是显示 YES ,nagios版本是3.3.1,之前是3.2.1不会出现这种情况,升级后就出...

ijqingyy
2011/08/26
405
0

没有更多内容

加载失败,请刷新页面

加载更多

rime设置为默认简体

转载 https://github.com/ModerRAS/ModerRAS.github.io/blob/master/_posts/2018-11-07-rime%E8%AE%BE%E7%BD%AE%E4%B8%BA%E9%BB%98%E8%AE%A4%E7%AE%80%E4%BD%93.md 写在开始 我的Arch Linux上......

zhenruyan
今天
5
0
简述TCP的流量控制与拥塞控制

1. TCP流量控制 流量控制就是让发送方的发送速率不要太快,要让接收方来的及接收。 原理是通过确认报文中窗口字段来控制发送方的发送速率,发送方的发送窗口大小不能超过接收方给出窗口大小。...

鏡花水月
今天
10
0
OSChina 周日乱弹 —— 别问,问就是没空

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @tom_tdhzz :#今日歌曲推荐# 分享容祖儿/彭羚的单曲《心淡》: 《心淡》- 容祖儿/彭羚 手机党少年们想听歌,请使劲儿戳(这里) @wqp0010 :周...

小小编辑
今天
1K
11
golang微服务框架go-micro 入门笔记2.1 micro工具之micro api

micro api micro 功能非常强大,本文将详细阐述micro api 命令行的功能 重要的事情说3次 本文全部代码https://idea.techidea8.com/open/idea.shtml?id=6 本文全部代码https://idea.techidea8....

非正式解决方案
今天
5
0
Spring Context 你真的懂了吗

今天介绍一下大家常见的一个单词 context 应该怎么去理解,正确的理解它有助于我们学习 spring 以及计算机系统中的其他知识。 1. context 是什么 我们经常在编程中见到 context 这个单词,当...

Java知其所以然
昨天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部