文档章节

Apache Kafka源码剖析:第7篇 日志存储系列2-FileMessageSet

强子1985
 强子1985
发布于 2017/08/14 13:31
字数 1505
阅读 68
收藏 0

本节主要讲解FileMessageSet

Kafka使用FileMessageSet来管理日志文件,对应磁盘上的一个真正的文件。

一开始没找到这个类
后来下了0.10版本看了下,才知道在0.11版本这个类没了,不知道是删除了还是转移了。

在0.10.0中,Kafka使用FileMessageSet管理日志文件。

它对应着磁盘上的一个真正的日志文件。

看一下类的继承关系

@nonthreadsafe
class FileMessageSet private[kafka](@volatile var file: File,
                                    private[log] val channel: FileChannel,
                                    private[log] val start: Int,
                                    private[log] val end: Int,
                                    isSlice: Boolean) extends MessageSet with Logging {
/**
 * A set of messages with offsets. A message set has a fixed serialized form, though the container
 * for the bytes could be either in-memory or on disk. The format of each message is
 * as follows:
 * 8 byte message offset number
 * 4 byte size containing an integer N
 * N message bytes as described in the Message class
 */
abstract class MessageSet extends Iterable[MessageAndOffset] {

MessageSet中保存的数据格式分为3部分:

1) 8字节的offset

2) 4字节的size表示data的大小

3) data表示真正的数据

1)+2)--->LogOverhead

这个从上面的注释也可以看得出来

/**
 * A set of messages with offsets. A message set has a fixed serialized form, though the container
 * for the bytes could be either in-memory or on disk. The format of each message is
 * as follows:
 * 8 byte message offset number
 * 4 byte size containing an integer N
 * N message bytes as described in the Message class
 */

不用关注细节,只要知道这里面包含了哪些信息点就足够了,协议都是人定的!

下面开始聊Message的格式,我们打开这个类

/**
 * Constants related to messages
 */
object Message {

  /**
   * The current offset and size for all the fixed-length fields
   */
  val CrcOffset = 0
  val CrcLength = 4
  val MagicOffset = CrcOffset + CrcLength
  val MagicLength = 1
  val AttributesOffset = MagicOffset + MagicLength
  val AttributesLength = 1
  // Only message format version 1 has the timestamp field.
  val TimestampOffset = AttributesOffset + AttributesLength
  val TimestampLength = 8
  val KeySizeOffset_V0 = AttributesOffset + AttributesLength
  val KeySizeOffset_V1 = TimestampOffset + TimestampLength
  val KeySizeLength = 4
  val KeyOffset_V0 = KeySizeOffset_V0 + KeySizeLength
  val KeyOffset_V1 = KeySizeOffset_V1 + KeySizeLength
  val ValueSizeLength = 4

下面是各个字段的含义

crc---4个字节,消息的校验码,防止消息错误

magic---1个字节,魔数,取值为0或者1,影响了消息的长度和格式。

attributes:1字节,消息的属性,比如压缩类型,时间戳类型,创建时间/追加时间等。

读者可以理解为一些meta信息点。

timestamp: 时间戳信息,由上面的字段负责解释具体含义。

key:你懂的,不解释

value:你懂的,不解释

------------------------------------------------------------------------------

有2个比较关键的方法:

1)writeTo---写消息
  /**
   * Write some of this set to the given channel.
   * @param destChannel The channel to write to.
   * @param writePosition The position in the message set to begin writing from.
   * @param size The maximum number of bytes to write
   * @return The number of bytes actually written.
   */
  def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {

2)
  /**
   * Provides an iterator over the message/offset pairs in this set
   */
  def iterator: Iterator[MessageAndOffset]
提供迭代器,顺序读取MessageSet中的消息

这个很好理解,写了就要读,MessageSet具有顺序写入和读取的特性。

在Kafka里,一切都是顺序IO

---上面聊的是MessageSet抽象类以及其中保存消息的格式,我们开始分析FileMessageSet实现类

先看下类的定义

/**
 * An on-disk message set. An optional start and end position can be applied to the message set
 * which will allow slicing a subset of the file.
 * @param file The file name for the underlying log data
 * @param channel the underlying file channel used
 * @param start A lower bound on the absolute position in the file from which the message set begins
 * @param end The upper bound on the absolute position in the file at which the message set ends
 * @param isSlice Should the start and end parameters be used for slicing?
 */
@nonthreadsafe
class FileMessageSet private[kafka](@volatile var file: File,
                                    private[log] val channel: FileChannel,
                                    private[log] val start: Int,
                                    private[log] val end: Int,
                                    isSlice: Boolean) extends MessageSet with Logging {

1)file: java.io.File类型,指向真正的磁盘上的日志文件

2)channel:用于读写此日志文件,我们都知道文件可以拿到channel,如果你写过MappedByteBuffer,你自然知道我说的是啥

3)start/end: FileMessageSet除了表示一个完整的日志文件,还可以表示日志文件分片,start/end就是这个分片的起始位置和结束位置,

4)isSlice:表示此FileMessageSet是否为日志文件的分片

5)_size: FileMessageSet大小,单位字节。如果是分片,就表示分片大小,否则表示FileMessageSet大小。

 

在操作系统里,进行文件的预分配可以提高写操作的性能。

上面解释了预分配的概念!

 

然后还是直接上源码吧

  /**
   * Create a file message set with no slicing, and with initFileSize and preallocate.
   * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
   * with one value (for example 512 * 1024 *1024 ) can improve the kafka produce performance.
   * If it's new file and preallocate is true, end will be set to 0.  Otherwise set to Int.MaxValue.
   */
  def this(file: File, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean) =
      this(file,
        channel = FileMessageSet.openChannel(file, mutable = true, fileAlreadyExists, initFileSize, preallocate),
        start = 0,
        end = ( if ( !fileAlreadyExists && preallocate ) 0 else Int.MaxValue),
        isSlice = false)

openChannel的具体实现如下:

/**
   * Open a channel for the given file
   * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
   * with one value (for example 512 * 1025 *1024 ) can improve the kafka produce performance.
   * @param file File path
   * @param mutable mutable
   * @param fileAlreadyExists File already exists or not
   * @param initFileSize The size used for pre allocate file, for example 512 * 1025 *1024
   * @param preallocate Pre allocate file or not, gotten from configuration.
   */
  def openChannel(file: File, mutable: Boolean, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): FileChannel = {
    if (mutable) {
      if (fileAlreadyExists)
        new RandomAccessFile(file, "rw").getChannel()
      else {
        if (preallocate) {
          val randomAccessFile = new RandomAccessFile(file, "rw")
          randomAccessFile.setLength(initFileSize)
          randomAccessFile.getChannel()
        }
        else
          new RandomAccessFile(file, "rw").getChannel()
      }
    }
    else
      new FileInputStream(file).getChannel()
  }
}

比较简单,一个是是否只读,一个是是否预先分配空间。

---------------------------------------------------------------------------------------------

初始化时,会设置channel的position位置

下面我们来分析读写过程

这个主要是看append函数

我们可以理解为初始化了一个FileMessageSet,然后就可以调用append函数进行写了,当然每次都是在文件的最后进行文件IO.
顺序性嘛!
  /**
   * Append these messages to the message set
   */
  def append(messages: ByteBufferMessageSet) {
    val written = messages.writeFullyTo(channel)
    _size.getAndAdd(written)
  }

writeFullyTo就是调用channel直接写文件了

下面讲查找文件,函数是searchFor()

/**
 * The mapping between a logical log offset and the physical position
 * in some log file of the beginning of the message set entry with the
 * given offset.
 */
case class OffsetPosition(offset: Long, position: Int) extends IndexEntry {
  override def indexKey = offset
  override def indexValue = position.toLong
}

也就是标记了position和offset的值!

还有1个writeTo方法,是将数据写入到别的channel.

 

© 著作权归作者所有

共有 人打赏支持
强子1985

强子1985

粉丝 864
博文 997
码字总数 679817
作品 8
南京
架构师
apache kafka技术分享系列(目录索引)

目录索引: Kafka使用场景 1.为何使用消息系统 2.我们为何需要搭建ApacheKafka分布式系统 3.消息队列中点对点与发布订阅区别 kafka开发与管理: 1)apachekafka消息服务 2)kafak安装与使用 ...

dannyhe
2015/09/06
453
1
Kafka的Log存储解析

引言 Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition...

dannyhe
2015/09/29
394
0
kafka源码解析之八LogManager

8.1 kafka日志的组成 class LogManager(val logDirs: Array[File], private val logs = new PoolTopicAndPartition, Log}class Log(val dir: File, ……private val segments: ConcurrentNav......

wl044090432
2016/03/29
0
0
kafka partition segment log关系*

引言 Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition...

cjun1990
2015/10/13
417
0
kafka的log存储解析——topic的分区partition分段segment以及索引等

转自:http://blog.csdn.net/jewes/article/details/42970799 引言 Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个...

xiaomin0322
05/29
0
0

没有更多内容

加载失败,请刷新页面

加载更多

day122-20181020-英语流利阅读-待学习

蜘蛛侠新片《毒液》来袭!导演灵感来自哪? Roxy 2018-10-20 1.今日导读 你还记得漫威宇宙中飞檐走壁的蜘蛛侠小可爱吗?在刚过去的国庆黄金周里,索尼影业发行的漫威超级英雄蜘蛛侠系列大片《...

飞鱼说编程
18分钟前
1
0
美团点评Docker容器管理平台

美团点评容器平台简介 本文介绍美团点评的Docker容器集群管理平台(以下简称“容器平台”)。该平台始于2015年,是基于美团云的基础架构和组件而开发的Docker容器集群管理平台。目前该平台为...

Skqing
24分钟前
1
0
JDK8笔记

判断两个对象是否相等 Objects.equals(value1, value2)

呼呼南风
今天
1
0
OSChina 周六乱弹 —— 到底谁是小公猫……

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @莱布妮子:分享Trivium的单曲《Throes Of Perdition》 《Throes Of Perdition》- Trivium 手机党少年们想听歌,请使劲儿戳(这里) @小鱼丁:...

小小编辑
今天
354
5
基础选择器

注意:本教程参考自网上流传的李兴华老师的jquery开发框架视频,但是苦于没有相应的配套笔记,由我本人做了相应的整理. 本次学习的内容 学习jquery提供的各种选择器的使用,掌握了jquery选择...

江戸川
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部