文档章节

Apache Kafka源码剖析:第7篇 日志存储系列2-FileMessageSet

强子哥哥
 强子哥哥
发布于 2017/08/14 13:31
字数 1505
阅读 45
收藏 0
点赞 0
评论 0

本节主要讲解FileMessageSet

Kafka使用FileMessageSet来管理日志文件,对应磁盘上的一个真正的文件。

一开始没找到这个类
后来下了0.10版本看了下,才知道在0.11版本这个类没了,不知道是删除了还是转移了。

在0.10.0中,Kafka使用FileMessageSet管理日志文件。

它对应着磁盘上的一个真正的日志文件。

看一下类的继承关系

@nonthreadsafe
class FileMessageSet private[kafka](@volatile var file: File,
                                    private[log] val channel: FileChannel,
                                    private[log] val start: Int,
                                    private[log] val end: Int,
                                    isSlice: Boolean) extends MessageSet with Logging {
/**
 * A set of messages with offsets. A message set has a fixed serialized form, though the container
 * for the bytes could be either in-memory or on disk. The format of each message is
 * as follows:
 * 8 byte message offset number
 * 4 byte size containing an integer N
 * N message bytes as described in the Message class
 */
abstract class MessageSet extends Iterable[MessageAndOffset] {

MessageSet中保存的数据格式分为3部分:

1) 8字节的offset

2) 4字节的size表示data的大小

3) data表示真正的数据

1)+2)--->LogOverhead

这个从上面的注释也可以看得出来

/**
 * A set of messages with offsets. A message set has a fixed serialized form, though the container
 * for the bytes could be either in-memory or on disk. The format of each message is
 * as follows:
 * 8 byte message offset number
 * 4 byte size containing an integer N
 * N message bytes as described in the Message class
 */

不用关注细节,只要知道这里面包含了哪些信息点就足够了,协议都是人定的!

下面开始聊Message的格式,我们打开这个类

/**
 * Constants related to messages
 */
object Message {

  /**
   * The current offset and size for all the fixed-length fields
   */
  val CrcOffset = 0
  val CrcLength = 4
  val MagicOffset = CrcOffset + CrcLength
  val MagicLength = 1
  val AttributesOffset = MagicOffset + MagicLength
  val AttributesLength = 1
  // Only message format version 1 has the timestamp field.
  val TimestampOffset = AttributesOffset + AttributesLength
  val TimestampLength = 8
  val KeySizeOffset_V0 = AttributesOffset + AttributesLength
  val KeySizeOffset_V1 = TimestampOffset + TimestampLength
  val KeySizeLength = 4
  val KeyOffset_V0 = KeySizeOffset_V0 + KeySizeLength
  val KeyOffset_V1 = KeySizeOffset_V1 + KeySizeLength
  val ValueSizeLength = 4

下面是各个字段的含义

crc---4个字节,消息的校验码,防止消息错误

magic---1个字节,魔数,取值为0或者1,影响了消息的长度和格式。

attributes:1字节,消息的属性,比如压缩类型,时间戳类型,创建时间/追加时间等。

读者可以理解为一些meta信息点。

timestamp: 时间戳信息,由上面的字段负责解释具体含义。

key:你懂的,不解释

value:你懂的,不解释

------------------------------------------------------------------------------

有2个比较关键的方法:

1)writeTo---写消息
  /**
   * Write some of this set to the given channel.
   * @param destChannel The channel to write to.
   * @param writePosition The position in the message set to begin writing from.
   * @param size The maximum number of bytes to write
   * @return The number of bytes actually written.
   */
  def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {

2)
  /**
   * Provides an iterator over the message/offset pairs in this set
   */
  def iterator: Iterator[MessageAndOffset]
提供迭代器,顺序读取MessageSet中的消息

这个很好理解,写了就要读,MessageSet具有顺序写入和读取的特性。

在Kafka里,一切都是顺序IO

---上面聊的是MessageSet抽象类以及其中保存消息的格式,我们开始分析FileMessageSet实现类

先看下类的定义

/**
 * An on-disk message set. An optional start and end position can be applied to the message set
 * which will allow slicing a subset of the file.
 * @param file The file name for the underlying log data
 * @param channel the underlying file channel used
 * @param start A lower bound on the absolute position in the file from which the message set begins
 * @param end The upper bound on the absolute position in the file at which the message set ends
 * @param isSlice Should the start and end parameters be used for slicing?
 */
@nonthreadsafe
class FileMessageSet private[kafka](@volatile var file: File,
                                    private[log] val channel: FileChannel,
                                    private[log] val start: Int,
                                    private[log] val end: Int,
                                    isSlice: Boolean) extends MessageSet with Logging {

1)file: java.io.File类型,指向真正的磁盘上的日志文件

2)channel:用于读写此日志文件,我们都知道文件可以拿到channel,如果你写过MappedByteBuffer,你自然知道我说的是啥

3)start/end: FileMessageSet除了表示一个完整的日志文件,还可以表示日志文件分片,start/end就是这个分片的起始位置和结束位置,

4)isSlice:表示此FileMessageSet是否为日志文件的分片

5)_size: FileMessageSet大小,单位字节。如果是分片,就表示分片大小,否则表示FileMessageSet大小。

 

在操作系统里,进行文件的预分配可以提高写操作的性能。

上面解释了预分配的概念!

 

然后还是直接上源码吧

  /**
   * Create a file message set with no slicing, and with initFileSize and preallocate.
   * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
   * with one value (for example 512 * 1024 *1024 ) can improve the kafka produce performance.
   * If it's new file and preallocate is true, end will be set to 0.  Otherwise set to Int.MaxValue.
   */
  def this(file: File, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean) =
      this(file,
        channel = FileMessageSet.openChannel(file, mutable = true, fileAlreadyExists, initFileSize, preallocate),
        start = 0,
        end = ( if ( !fileAlreadyExists && preallocate ) 0 else Int.MaxValue),
        isSlice = false)

openChannel的具体实现如下:

/**
   * Open a channel for the given file
   * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
   * with one value (for example 512 * 1025 *1024 ) can improve the kafka produce performance.
   * @param file File path
   * @param mutable mutable
   * @param fileAlreadyExists File already exists or not
   * @param initFileSize The size used for pre allocate file, for example 512 * 1025 *1024
   * @param preallocate Pre allocate file or not, gotten from configuration.
   */
  def openChannel(file: File, mutable: Boolean, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): FileChannel = {
    if (mutable) {
      if (fileAlreadyExists)
        new RandomAccessFile(file, "rw").getChannel()
      else {
        if (preallocate) {
          val randomAccessFile = new RandomAccessFile(file, "rw")
          randomAccessFile.setLength(initFileSize)
          randomAccessFile.getChannel()
        }
        else
          new RandomAccessFile(file, "rw").getChannel()
      }
    }
    else
      new FileInputStream(file).getChannel()
  }
}

比较简单,一个是是否只读,一个是是否预先分配空间。

---------------------------------------------------------------------------------------------

初始化时,会设置channel的position位置

下面我们来分析读写过程

这个主要是看append函数

我们可以理解为初始化了一个FileMessageSet,然后就可以调用append函数进行写了,当然每次都是在文件的最后进行文件IO.
顺序性嘛!
  /**
   * Append these messages to the message set
   */
  def append(messages: ByteBufferMessageSet) {
    val written = messages.writeFullyTo(channel)
    _size.getAndAdd(written)
  }

writeFullyTo就是调用channel直接写文件了

下面讲查找文件,函数是searchFor()

/**
 * The mapping between a logical log offset and the physical position
 * in some log file of the beginning of the message set entry with the
 * given offset.
 */
case class OffsetPosition(offset: Long, position: Int) extends IndexEntry {
  override def indexKey = offset
  override def indexValue = position.toLong
}

也就是标记了position和offset的值!

还有1个writeTo方法,是将数据写入到别的channel.

 

© 著作权归作者所有

共有 人打赏支持
强子哥哥

强子哥哥

粉丝 855
博文 551
码字总数 647665
作品 8
南京
架构师
apache kafka技术分享系列(目录索引)

目录索引: Kafka使用场景 1.为何使用消息系统 2.我们为何需要搭建ApacheKafka分布式系统 3.消息队列中点对点与发布订阅区别 kafka开发与管理: 1)apachekafka消息服务 2)kafak安装与使用 ...

dannyhe ⋅ 2015/09/06 ⋅ 1

Kafka的Log存储解析

引言 Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition...

dannyhe ⋅ 2015/09/29 ⋅ 0

kafka源码解析之八LogManager

8.1 kafka日志的组成 class LogManager(val logDirs: Array[File], private val logs = new PoolTopicAndPartition, Log}class Log(val dir: File, ……private val segments: ConcurrentNav......

wl044090432 ⋅ 2016/03/29 ⋅ 0

kafka的log存储解析——topic的分区partition分段segment以及索引等

转自:http://blog.csdn.net/jewes/article/details/42970799 引言 Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个...

xiaomin0322 ⋅ 05/29 ⋅ 0

Kafka文章索引(入门)

目录索引: 1)apache kafka消息服务 2)kafka在zookeeper中存储结构 3)kafka log4j配置 4)kafka replication设计机制 5)apache kafka监控系列-监控指标 6)kafka.common.ConsumerRebala...

阿莱倪士 ⋅ 2014/11/27 ⋅ 0

kafka partition segment log关系*

引言 Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition...

cjun1990 ⋅ 2015/10/13 ⋅ 0

Kafka的Log存储解析

引言 Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition...

ivan-Zhao ⋅ 2015/12/16 ⋅ 0

kafka系列文章索引

apache kafka在数据处理中特别是日志和消息的处理上会有很多出色的表现,这里写个索引,关于kafka的文章暂时就更新到这里,最近利用空闲时间在对kafka做一些功能性增强,并java化,虽然现在已...

Gaischen ⋅ 2013/03/25 ⋅ 7

源码圈 365 胖友的书单整理

🙂🙂🙂关注微信公众号:【芋道源码】有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址 您对于源码的疑问...

芋道源码掘金Java群217878901 ⋅ 2017/09/21 ⋅ 0

分布式消息系统 Kafka 简介

Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。 ...

xrzs ⋅ 2014/08/19 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

到底会改名吗?微软GVFS 改名之争

微软去年透露了 Git Virtual File System(GVFS)项目,GVFS 是 Git 版本控制系统的一个开源插件,允许 Git 处理 TB 规模的代码库,比如 270 GB 的 Windows 代码库。该项目公布之初就引发了争...

linux-tao ⋅ 20分钟前 ⋅ 0

笔试题之Java基础部分【简】【二】

1.静态变量和实例变量的区别 在语法定义上的区别:静态变量前要加static关键字,而实例变量前则不加。在程序运行时的区别:实例变量属于某个对象的属性,必须创建了实例对象,其中的实例变...

anlve ⋅ 37分钟前 ⋅ 0

Lombok简单介绍及使用

官网 通过简单注解来精简代码达到消除冗长代码的目的 优点 提高编程效率 使代码更简洁 消除冗长代码 避免修改字段名字时忘记修改方法名 4.idea中安装lombnok pom.xml引入 <dependency> <grou...

to_ln ⋅ 55分钟前 ⋅ 0

【转】JS浮点数运算Bug的解决办法

37.5*5.5=206.08 (JS算出来是这样的一个结果,我四舍五入取两位小数) 我先怀疑是四舍五入的问题,就直接用JS算了一个结果为:206.08499999999998 怎么会这样,两个只有一位小数的数字相乘,怎...

NickSoki ⋅ 今天 ⋅ 0

table eg

user_id user_name full_name 1 zhangsan 张三 2 lisi 李四 `` ™ [========] 2018-06-18 09:42:06 星期一½ gdsgagagagdsgasgagadsgdasgagsa...

qwfys ⋅ 今天 ⋅ 0

一个有趣的Java问题

先来看看源码: public class TestDemo { public static void main(String[] args) { Integer a = 10; Integer b = 20; swap(a, b); System.out......

linxyz ⋅ 今天 ⋅ 0

十五周二次课

十五周二次课 17.1mysql主从介绍 17.2准备工作 17.3配置主 17.4配置从 17.5测试主从同步 17.1mysql主从介绍 MySQL主从介绍 MySQL主从又叫做Replication、AB复制。简单讲就是A和B两台机器做主...

河图再现 ⋅ 今天 ⋅ 0

docker安装snmp rrdtool环境

以Ubuntu16:04作为基础版本 docker pull ubuntu:16.04 启动一个容器 docker run -d -i -t --name flow_mete ubuntu:16.04 bash 进入容器 docker exec -it flow_mete bash cd ~ 安装基本软件 ......

messud4312 ⋅ 今天 ⋅ 0

OSChina 周一乱弹 —— 快别开心了,你还没有女友呢。

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @莱布妮子 :分享吴彤的单曲《好春光》 《好春光》- 吴彤 手机党少年们想听歌,请使劲儿戳(这里) @clouddyy :小萝莉街上乱跑,误把我认错成...

小小编辑 ⋅ 今天 ⋅ 9

Java 开发者不容错过的 12 种高效工具

Java 开发者常常都会想办法如何更快地编写 Java 代码,让编程变得更加轻松。目前,市面上涌现出越来越多的高效编程工具。所以,以下总结了一系列工具列表,其中包含了大多数开发人员已经使用...

jason_kiss ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部