文档章节

Apache Kafka源码剖析:第12篇 延迟操作系列1-DelayedProduce

强子大叔的码田
 强子大叔的码田
发布于 2017/08/19 17:35
字数 707
阅读 362
收藏 2

下面开始介绍延迟操作组件,本节讲的是TimingWheel.

DelayedOperationPurgatory是一个相对独立的组件,主要功能是管理延迟操作。

底层依赖Kafka提供的时间轮实现。

有的读者会说为啥不用JDK本身的实现,这是因为Kafka这种分布式系统的请求量巨大,性能要求高,

-

在高性能的框架中,为了将定时任务的存取操作以及取消操作的时间复杂度降低为O(1)

一般会使用其他方式实现定时任务组件,例如,使用时间轮的方式。

 

Kafka的时间轮实现是TimeWheel, 是1个存储定时任务的环形队列。

底层使用数组实现,数组中的每个元素可以存放一个TimerTaskList对象。

TimerTaskList是一个环形双向链表,在其中的链表项TimerTaskEntry中封装了真正的定时任务TimerTask.

 

TimingWheel提供了层级时间轮的概念

------------------------------------------------------------------

前面提到ProducerRequest,对于它来说,如果其中的acks字段设置为-1,表示这个请求到达Leader副本后,需要ISR集合中所有副本都同步(或者超时)后,才能返回响应给客户端。

ISR集合中的副本分布在不同的broker上,也就是不同的机器上,与Leader副本进行通信就涉及到网络通信,

一般情况下,我们认为网络传输时不可靠的而且比较慢!

所以通常采用异步的方式处理来避免线程长时间等待!

---

当FetchRequest发送给Leader副本后,会积累一定量的消息才返回给消费者或者Follow副本,实现批量发送!

---

我们先看看ProducerRequest的acks为-1时,服务端的处理流程:

处理函数

它会调用下面的函数

 // call the replica manager to append messages to the replicas
      replicaManager.appendRecords(
        timeout = produceRequest.timeout.toLong,
        requiredAcks = produceRequest.acks,
        internalTopicsAllowed = internalTopicsAllowed,
        isFromClient = true,
        entriesPerPartition = authorizedRequestInfo,
        responseCallback = sendResponseCallback)

 

之所以会用到replicaManager是因为有多个副本,不是说我把这个消息存本地就行了,还要有其它ISR集合里的broker收到消息才可以!

先看appendRecords的实现

然后生成DelayedProduce对象!

 val produceStatus = localProduceResults.map { case (topicPartition, result) =>
        topicPartition ->
                ProducePartitionStatus(
                  result.info.lastOffset + 1, // required offset
                  new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime)) // response status
      }

生成ISR集合里的响应结果容器

然后判断是否需要同步等待各个ISR的响应

  // If all the following conditions are true, we need to put a delayed produce request and wait for replication to complete
  //
  // 1. required acks = -1
  // 2. there is data to append
  // 3. at least one partition append was successful (fewer errors than partitions)
  private def delayedProduceRequestRequired(requiredAcks: Short,
                                            entriesPerPartition: Map[TopicPartition, MemoryRecords],
                                            localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
    requiredAcks == -1 &&
    entriesPerPartition.nonEmpty &&
    localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size
  }

然后生成1个异步响应的容器

  // create delayed produce operation
        val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
        val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)

---

 

 

© 著作权归作者所有

强子大叔的码田

强子大叔的码田

粉丝 910
博文 1439
码字总数 1221048
作品 9
南京
架构师
私信 提问
apache kafka技术分享系列(目录索引)

目录索引: Kafka使用场景 1.为何使用消息系统 2.我们为何需要搭建ApacheKafka分布式系统 3.消息队列中点对点与发布订阅区别 kafka开发与管理: 1)apachekafka消息服务 2)kafak安装与使用 ...

dannyhe
2015/09/06
716
1
jQuery源码剖析(四) - Deferred异步回调解决方案

jQuery 源码解析代码及更多学习干货:猛戳GitHub 本篇代码为 my-jQuery 1.0.4.js 建议阅读本篇先弄懂上一篇Callbacks 原理分析,因为Deferred异步回调是基于Callbacks。下载源码然后根据文章思...

极客James
07/25
0
0
Kafka科普系列 | 轻松理解Kafka中的延时操作

版权声明:本文为博主原创文章,未经博主朱小厮允许不得转载。 https://blog.csdn.net/u013256816/article/details/89325701 本文起源于之前去面试的一道面试题,面试题大致上是这样的:消费...

朱小厮
04/16
0
0
MoreWindows博客目录(微软最有价值专家,原创技术文章152篇)

为了方便大家查找和学习,现将本人博客中所有博客文章列出目录。 一. 白话经典算法 目前有17篇,分为七大排序和经典面试题讲解两大类 1. 《白话经典算法系列之一 冒泡排序的三种实现》 2. 《...

morewindows
2013/12/24
0
0
Java8新特性及实战视频教程完整版

百度网盘 资料 第1讲:课程介绍 第2讲:课程介绍续 第3讲:Lambda表达式初步与函数式接口 第4讲:深入函数式接口与方法引用 第5讲:Lambda表达式深入与流初步 第6讲:Function接口详解 第7讲...

远近高低各不同
2018/11/09
6
0

没有更多内容

加载失败,请刷新页面

加载更多

MongoDB系列-解决面试中可能遇到的MongoDB复制集(replica set)问题

关注我,可以获取最新知识、经典面试题以及微服务技术分享   MongoDB复制集(replica set):MongoDB复制集维护相同数据集的一组mongod进程,复制集是生产部署的基础,具有数据冗余以及高可用...

ccww_
25分钟前
2
0
SpringBoot系列:Spring Boot集成Spring Cache,使用RedisCache

前面的章节,讲解了Spring Boot集成Spring Cache,Spring Cache已经完成了多种Cache的实现,包括EhCache、RedisCache、ConcurrentMapCache等。 这一节我们来看看Spring Cache使用RedisCache。...

杨小格子
34分钟前
2
0
OpenJDK之CountDownLatch

OpenJDK8,本人看的是openJDK。以前就看过,只是经常忘记,所以记录下 图1 CountDownLatch是Doug Lea在JDK1.5中引入的,作用就不详细描述了, await()方法,如果还有线程在执行,那么当前线程...

克虏伯
40分钟前
3
0
简单编程

1.编写一个程序,提示用户输入名和姓,然后以“名,姓”的格式打印出来。 #include<stdio.h>int main(){char name[3];char family[3];printf("Please input your name and family:\n...

电子工程197沈志初
44分钟前
4
0
详解Mysql分布式事务XA(跨数据库事务)

在开发中,为了降低单点压力,通常会根据业务情况进行分表分库,将表分布在不同的库中(库可能分布在不同的机器上)。在这种场景下,事务的提交会变得相对复杂,因为多个节点(库)的存在,可...

slagga
49分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部