文档章节

Apache Kafka源码剖析:第10篇 日志存储系列5-LogSegment & Log

强子哥哥
 强子哥哥
发布于 2017/08/18 17:34
字数 1197
阅读 150
收藏 0
点赞 0
评论 0

为了防止一个文件太大,Kafka将Log分成了若干段。每个日志文件和索引文件组合对应了1个LogSegment.

---

在LogSegment中封装了1个FileMessageSet和一个OffsetIndex对象,提供日志文件和索引文件的读写功能以及其它辅助功能!

/tmp/kafka-logs/broker0/my-replicated-topic-0# ls -al
total 24
drwxr-xr-x  2 root root 4096 Aug 13 03:33 .
drwxr-xr-x 55 root root 4096 Aug 13 04:55 ..
-rw-r--r--  1 root root    0 Aug 13 04:55 00000000000000000000.index
-rw-r--r--  1 root root  577 Aug 10 17:11 00000000000000000000.log
-rw-r--r--  1 root root   12 Aug 13 04:55 00000000000000000000.timeindex
-rw-r--r--  1 root root   10 Aug 13 03:33 00000000000000000010.snapshot
-rw-r--r--  1 root root    8 Aug 10 17:04 leader-epoch-checkpoint

---

/**
 * A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing
 * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each
 * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in
 * any previous segment.
 *
 * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file.
 *
 * @param log The message set containing log entries
 * @param index The offset index
 * @param timeIndex The timestamp index
 * @param baseOffset A lower bound on the offsets in this segment
 * @param indexIntervalBytes The approximate number of bytes between entries in the index
 * @param time The time instance
 */
@nonthreadsafe
class LogSegment(val log: FileRecords,//用于操作对应日志文件的FileMessageSet对象
                 val index: OffsetIndex,//用于操作索引文件
                 val timeIndex: TimeIndex,
                 val txnIndex: TransactionIndex,
                 val baseOffset: Long,//第一条消息的offset值
                 val indexIntervalBytes: Int,//索引项之间间隔的最小字节数
                 val rollJitterMs: Long,
                 time: Time) extends Logging {

  private var created = time.milliseconds//标志LogSegment对象的创建时间

  /* the number of bytes since we last added an entry in the offset index */
  private var bytesSinceLastIndexEntry = 0//自动上次添加索引项后,日志文件中累计加入的 Message字节数

  /* The timestamp we used for time based log rolling */
  private var rollingBasedTimestamp: Option[Long] = None

  /* The maximum timestamp we see so far */
  @volatile private var maxTimestampSoFar: Long = timeIndex.lastEntry.timestamp
  @volatile private var offsetOfMaxTimestamp: Long = timeIndex.lastEntry.offset

===

在读取日志文件之前,需要将offset转换为实际的文件物理地址才可以,通过之前的知识点,应该怎么做?

1)比如1017的offset,文件名是1000,所以相对offset就是1017-1000=17

2)将17去稀疏索引文件中查找,可以找到1个稀疏索引项.

3)根据这个索引项,从文件的绝对物理位置开始查找绝对offset为1017的消息。

当然有很多细节,比如说压缩消息的存在。导致查询有一些变化的细节,但是总体还是很简单!

通过上面的分析,主要是让大家对一些概念和机制,有个了解。
虽然可能达不到源码100%的掌握,但是对于理解Kafka的实现机制
和以后定位问题,可以起到帮助作用

更重要的是,通过这些分析,以后碰到生产上的问题,心里不慌,有底气迎战!

---聊完了LogSegment ,我们来聊Log

Log是对多个LogSegment对象的顺序组合,形成1个逻辑的日志。

为了实现快速定位LogSegment,Log使用SkipList对LogSegment进行管理!

跳表很常见,在redis和leveldb中都有使用!
JDK中也有!

跳表是一种比较随机化的数据结构,查找效率和红黑树差不多,但是插入和删除操作比红黑树简单很多。

 

在 Log中,将每个LogSegment的baseOffset作为key,LogSegment对象作为value,

放入到segments这个跳表中管理。

 

向Log中追加消息是顺序写入的,那么只有最后1个LogSegment可以写入,之前的只能读。

我们把最后1个segment称之为activeSegment.

随着数据的不断写入,当activeSegment的日志文件大小到了一定的阈值后,就要切换新的segment文件。

写数据的时候,可能需要重新开一个segment

  // maybe roll the log if this segment is full
        val segment = maybeRoll(messagesSize = validRecords.sizeInBytes,
          maxTimestampInMessages = appendInfo.maxTimestamp,
          maxOffsetInMessages = appendInfo.lastOffset)

是否切换有几个条件

1)日志大小

2)当前 activeSegment的寿命超过了配置的LogSegment最长存活时间。

3)索引文件满了。

第1个很好理解,就是文件保证不要太大

第2个怎么理解,想象一下,client写了1条消息,然后不写了,这个文件如果一直不切换的话,就无法被读到了。

可见,确实是选择一批segment来持久化,这样就把持久化的任务和写线程隔离开来,尽量不占用写的主线程的任务!

===

Log.append()方法通过加锁进行同步控制,因为涉及到多线程操作,多个线程写。

但是在read()方法中并没有加锁操作,在开始查询消息之前会将nextOffsetMetaData字段保存为方法的局部变量,来避免线程安全问题。

 

 

© 著作权归作者所有

共有 人打赏支持
强子哥哥

强子哥哥

粉丝 856
博文 551
码字总数 647493
作品 8
南京
架构师
kafka源码解析之八LogManager

8.1 kafka日志的组成 class LogManager(val logDirs: Array[File], private val logs = new PoolTopicAndPartition, Log}class Log(val dir: File, ……private val segments: ConcurrentNav......

wl044090432 ⋅ 2016/03/29 ⋅ 0

apache kafka技术分享系列(目录索引)

目录索引: Kafka使用场景 1.为何使用消息系统 2.我们为何需要搭建ApacheKafka分布式系统 3.消息队列中点对点与发布订阅区别 kafka开发与管理: 1)apachekafka消息服务 2)kafak安装与使用 ...

dannyhe ⋅ 2015/09/06 ⋅ 1

Kafka代码走读-LogManager

https://github.com/haogrgr/haogrgr-test/blob/master/logs/kafka_source.txt 源码阅读(0.8.2.2): (一)概览 1.调用kafka.Kafka中的main方法启动 2.通过启动参数获取配置文件的路径 3.通过S...

德胜 ⋅ 2016/07/21 ⋅ 0

Kafka文章索引(入门)

目录索引: 1)apache kafka消息服务 2)kafka在zookeeper中存储结构 3)kafka log4j配置 4)kafka replication设计机制 5)apache kafka监控系列-监控指标 6)kafka.common.ConsumerRebala...

阿莱倪士 ⋅ 2014/11/27 ⋅ 0

基于docker部署的微服务架构(六): 日志统一输出到kafka中间件

前言 上一篇 基于docker部署的微服务架构(五): docker环境下的zookeeper和kafka部署 中,已经成功部署了 kafka 环境,现在我们要改造之前的项目,使用 log4j2 的 kafka appender 把日志统...

月冷X心寒 ⋅ 2016/11/22 ⋅ 3

kafka partition segment log关系*

引言 Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition...

cjun1990 ⋅ 2015/10/13 ⋅ 0

Kafka的Log存储解析

引言 Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition...

ivan-Zhao ⋅ 2015/12/16 ⋅ 0

Kafka日志存储系统和offset查找逻辑

起始篇 1,kafka通过文件系统来保存和缓存处理的消息,每个发送到kafka的消息,都会被记录到日志文件中,由partition的leader记录,并由partition的follower同步。 2,kafka的消息采用顺序写...

lkforce ⋅ 2017/09/05 ⋅ 0

Kafka的Log存储解析

引言 Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition...

dannyhe ⋅ 2015/09/29 ⋅ 0

Kafka~Linux环境下的部署

概念 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键...

mcy247 ⋅ 2017/12/06 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

三步为你的App集成LivePhoto功能

摘要:LivePhoto是iOS9新推出的一种拍照方式,类似于拍摄Gif图或录制视频片段生成图片。如果没有画面感,可以联想《哈利波特》霍格沃茨城堡的壁画,哈哈,很炫酷有木有,但坑爹的是只有iphone6S以...

壹峰 ⋅ 17分钟前 ⋅ 0

centos7 git安装

由于centos中的源仓库中git不是最新版本,需要进行源码安装。 1、查看yum仓库git信息 [root@iZm5e3d4r5i5ml889vh6esZ zh]# yum info gitLoaded plugins: fastestmirrorLoading mirror s...

xixingzhe ⋅ 27分钟前 ⋅ 0

input file 重复上传同一张图片失效的解决办法

解决办法 方法一:来回切换input[type='file']的type属性值,可以是‘text’,'button','button'....,然后再切换回来‘file’ 方法二:每次取消图片预览后,重置input[type='file']的value的...

时刻在奔跑 ⋅ 27分钟前 ⋅ 0

Mahout推荐算法API详解

前言 用Mahout来构建推荐系统,是一件既简单又困难的事情。简单是因为Mahout完整地封装了“协同过滤”算法,并实现了并行化,提供非常简单的API接口;困难是因为我们不了解算法细节,很难去根...

xiaomin0322 ⋅ 32分钟前 ⋅ 0

WampServer默认web服务器根目录位置

安装WampServer之后的web服务器根目录默认位置在WampServer安装目录下的www:

临江仙卜算子 ⋅ 33分钟前 ⋅ 0

Redux的一些手法记录

Redux Redux的基本概念见另一篇文。 这里记录一下Redux在项目中的实际操作的手法。 actions 首先定义action.js,actions的type,可以另起一个action-type.js文件。 action-type.js用来存...

LinearLaw ⋅ 34分钟前 ⋅ 0

android 手势检测(左右滑动、上下滑动)

GestureDetector类可以让我们快速的处理手势事件,如点击,滑动等。 使用GestureDetector分三步: 1. 定义GestureDetector类 2. 初始化手势类,同时设置手势监听 3. 将touch事件交给gesture...

王先森oO ⋅ 49分钟前 ⋅ 0

java 方法的执行时间监控 设置超时(Future 接口)

java 方法的执行时间监控 设置超时(Future 接口) import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor......

青峰Jun19er ⋅ 53分钟前 ⋅ 0

一名开源小白的Apache成长自述

今天收到了来自Apache Vote我成为Serviceomb项目Committer的邮件,代表自己的贡献得到了充分的肯定;除了感谢团队的给力支持,我更希望将自己的成长经历——如何践行Apache Way的心得介绍给大...

微服务框架 ⋅ 55分钟前 ⋅ 0

vim介绍、颜色显示和移动光标、一般模式下复制、剪切和粘贴

1.vim 是 vi 的升级版 vim 是带有颜色显示的 mini安装的系统,一般都不带有vim [root@aminglinux-128 ~]# yum install -y vim-enhanced已加载插件:fastestmirror, langpacksLoading mir...

oschina130111 ⋅ 56分钟前 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部