一天,把 Pulsar 客户端的性能提升3倍+!

原创
03/15 19:25
阅读数 9.3K

导读

大佬希望我帮忙排查一下一个性能问题,我⼀听内心当然是拒绝的啦,这种 Commit 挣得太难了,修个 NPE 之类的不香吗?

于是我拿起手机回复大佬:好的,我看⼀下。



作者介绍


林琳


腾讯云高级工程师

活跃于开源社区,Apache Pulsar Commiter

专注于中间件领域,在消息队列和微服务方向具有丰富的经验,负责 CKafka/CMQ/移动开发平台 的后端设计于开发工作,目前致力于打造稳定、高效和可扩展的基础组件与服务



故事的开端


话说,这个事情已经过去很久了,应该是在2020年的10月份,今天就拿出来炒一下冷饭吧。当时业务赶着上线,国庆节连续肝了6天,最后⼀天我想休息⼀下,于是像咸鱼一样躺在床上,如下所示:



突然 Pulsar 社区的大佬找我,希望我帮忙排查一下一个性能问题:Pulsar 的客户端消费分为 ConsumerImpl 和 MultiTopicsConsumerImpl 两种。


通常情况下,当⼀个 Topic 只有⼀个分区的时候,Builder 只会创建 ConsumerImpl,每个 ConsumerImpl 能连接到⼀个 Partition 消费。


当⼀个 Topic 有多个 Partition 的时候,则 Builder 会创建 MultiTopicsConsumerImpl,每个 MultiTopicsConsumerImpl 包含了多个 ConsumerImpl,即它会为每个 Partition 创建⼀个 ConsumerImpl。


多个 ConsumerImpl 各自接收消息,最终所有消息会汇总进 MultiTopicsConsumerImpl 的队列里,供业务使用。如下所示:



按道理,这么多个 ConsumerImpl ⼀起接收消息,又是多个 Partition 并行拉数据, MultiTopicsConsumerImpl 的性能应该远远超过单个 ConsumerImpl 才对。然而,现实是 MultiTopicsConsumerImpl 的性能只有 Consumer 的⼀半。


我⼀听,内心当然是拒绝的啦,这种 Commit 挣得太难了,修个 NPE 之类的不香吗?


于是我拿起手机回复大佬:好的,我看⼀下。



问题排查


要排查这个问题,首先我得有个 Pulsar 集群呀。于是我找了3台虚拟机,开始部署⼀个集群,然后......


(此处省略1W字)


最后,我用 Pulsar 自带的 perf 工具开始分别模拟单个 ConsumerImpl 和 MultiTopicsConsumerImpl 的消费,测试环境的配置如下:


  • 3台8核16G机器
  • Pulsar 的 Topic 创建4个 Partition
  • 消费时间2分钟
  • 使用 Pulsar 自带的 perf 工具,MultiTopicsConsumerImpl 的测试命令:

bin/pulsar-perf consume -u 'http://x.x.x.x:8080' -s my-sub-6 -sp Earliest -q100000 persistent://public/default/p-topic


第⼀次的测试结果出来后,我发现大佬有点太乐观,因为 MultiTopicsConsumerImpl 的性能根本就没有 Consumer 的⼀半,情况比估计的还要差,只有七分之一左右。


MultiTopicsConsumerImpl 的性能结果:


Aggregated throughput stats --- 11715556 records received --- 68813.420 msg/s --- 537.605 Mbit/s


ConsumerImpl 的性能结果:


Aggregated throughput stats --- 78403434 records received --- 462640.204 msg/s --- 3614.377 Mbit/s


看完这个结果,我感觉这个起点很低,有巨大的上升空间。首先,我们需要知道时间都去哪了,于是我拉了⼀个火焰图看看CPU时间的消耗。



初步排查,发现业务线程占用了总体时间的 40.65%,其中 MessageReceived 里占用的总时间的 14%, 另外可重入的读写锁占用 8.22%。而 MessageReceived 里,其实也是锁占了很大的比重。锁差不多占用了 20% 的时间。于是,第⼀个优化方向就出现了 —— 去锁


去锁的方向主要有以下几个:


  1. 利用现有线程安全的 BlockingQueue,不再重复加锁。
  2. 降低锁获取频率。对于无法消除的锁,通过前置判断降低最终锁获取的频率。
  3. 修改逻辑实现方式,去除明显可以移除的锁。
  4. 都多写少的地方使用读写锁替换可重入锁。


移除后,重新跑性能测试,发现性能有了明显的提高,感觉公屏上飘过的都是 666:


//优化前Aggregated throughput stats --- 11715556 records received --- 68813.420 msg/s --- 537.605 Mbit/s //优化后Aggregated throughput stats --- 25062077 records received --- 161656.814 msg/s--- 1262.944 Mbit/s


还有有些地方强制去锁,对现有的⼀些逻辑有小变动,被拒绝了。例如有这样的⼀段代码:


//忽略掉加锁的部分Message<T> msgPeeked = incomingMessages.peek();while (msgPeeked != null && messages.canAdd(msgPeeked)) { Message<T> msg = incomingMessages.poll();if (msg != null) { decreaseIncomingMessageSize(msg);Message<T> interceptMsg = beforeConsume(msg); messages.add(interceptMsg);}msgPeeked = incomingMessages.peek();}result.complete(messages);//忽略掉释放锁的部分


这段代码的逻辑大意是,先看看队列里第⼀条消息能不能放进集合里,可以就出队并放入。


我把这段 while 改为 do-while ,即不加锁,直接出队往集合里塞,直到塞完后的大小超过了,则结束。这种方式把先前的严格小于变为最多超过集合容量1条消息的大小,但是能借助本身就是线程安全的 BlockingQueue 实现去锁。但是由于行为有变化,CR 没有通过,只能还原回去。


继续回到刚才的火焰图,我们发现除了锁占用了很多的 CPU,Netty 相关的 API 也占用了不少,⼀共有 12.63%



主要是 AbstractEventExecutorGroup,也就是我们常见的 EventLoopGroup 消耗了大量的 CPU 资源。


Pulsar 中几乎所有的操作都是异步的,大量使用了 Java8 里的 CompletableFuture ,但是为什么会有这么多的 EventLoop 呢。看代码发现,Pulsar 里面为了实现异步延迟+循环拉取消息,又为了避免循环调用自己出现栈溢出,使用 Netty 的 EventLoop 作为线程池。


起初我以为是 EventLoop 处理请求比较繁忙引起的,顺着堆栈找到对应的代码,发现并不是。


Netty 的 EventLoop 采用了生产-消费模型,添加任务的线程如果是当前线程,则自己就消费掉了,没有唤醒动作。但是如果使用了 EpollEventLoop 并且添加任务的线程与处理线程不是同⼀个,生产线程会唤醒消费线程来处理任务,进而触发系统调用:


Native.eventFdWrite(this.eventFd.intValue(), 1L);


这个处理起来就比较简单了,如果能避免出现频繁的系统调用就能提升性能,直接使用Java自带的 ThreadPoolExecutor 替换掉就好了,ThreadPoolExecutor 设置使用 BlockingQueue 作为任务队列,性能比每次调用 eventFdWrite 要好。


为了对比单次 EventLoop 优化的效果,因此并没有在移除锁的基础上做,而是单独拉了⼀个分支。前后性能效果的对比如下:


//优化前Aggregated throughput stats --- 11715556 records received --- 68813.420 msg/s --- 537.605 Mbit/s //优化后Aggregated throughput stats --- 18392800 records received --- 133314.602 msg/s--- 1041.520 Mbit/s



剧终


去锁 + EventLoop 一共提升了近4倍的性能:


//MultiTopicsConsumerImpl优化前Aggregated throughput stats --- 11715556 records received --- 68813.420 msg/s --- 537.605 Mbit/s//MultiTopicsConsumerImpl优化后Aggregated throughput stats --- 40140549 records received --- 275927.749 msg/s--- 2155.686 Mbit/s//ConsumerImplAggregated throughput stats --- 78403434 records received --- 462640.204 msg/s--- 3614.377 Mbit/s


最终的火焰图如下:



虽然整个优化的过程比较简单,技术含量高,但由于起点比较低,所以优化的效果还是很好的。最终性能只有单个 ConsumerImpl 的 50% 左右,因此还有继续提升的空间。这次的优化是基于已有的架构, 仅对实现做了调整,如果我们尝试对架构进行微调可以有更多的提升,欢迎小伙伴们⼀起来优化。



往期

推荐


《超有料!万字详解腾讯微服务平台 TSF 的敏捷开发流程》

《火速围观!鹅厂中间件产品遭遇暴风吐槽!》

《看这里!鹅厂大佬深度解析 Apache Pulsar 五大应用场景》





扫描下方二维码关注本公众号,

了解更多微服务、消息队列的相关信息!

解锁超多鹅厂周边!


戳原文,了解更多腾讯微服务平台TSF的信息

点亮在看,你最好看

本文分享自微信公众号 - 腾讯云中间件(gh_6ea1bc2dd5fd)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
打赏
2
4 收藏
分享
加载中
更多评论
打赏
0 评论
4 收藏
2
分享
返回顶部
顶部