文档章节

HBase Scan流程分析

超人学院
 超人学院
发布于 2014/11/21 17:00
字数 3277
阅读 580
收藏 24
点赞 1
评论 0

HBase的读流程目前看来比较复杂,主要由于:

  • HBase的表数据分为多个层次,HRegion->HStore->[HFile,HFile,...,MemStore]

  • RegionServer的LSM-Like存储引擎,不断flush产生新的HFile,同时产生新的MemStore用于后续数据写入,并且为了防止由于HFile过多而导致Scan时需要扫描的文件过多而导致的性能下降,后台线程会适时的进行Compaction,Compaction的过程会产生新的HFile,并且会删除Compact完成的HFile

  • 具体实现中的各种优化,比如lazy seek优化,导致代码比较复杂

读流程中充斥着各种Scanner,如下图:

                                 +--------------+
                                 |              |
                     +-----------+ RegionScanner+----------+
                     |           +------+-------+          |
                     |                  |                  |
                     |                  |                  |
               +-----v+-------+  +------v-------+   +------v+------+
               |              |  |              |   |              |
               | StoreScanner |  | StoreScanner |   | StoreScanner |
               |              |  |              |   |              |
               +--------------+  +--+---+-----+-+   +--------------+
                                    |   |     |
            +-----------------------+   |     +----------+
            |                           |                |
            |                           |                |
    +-------v---------+   +-------------v----+ +---------v------+
    |                 |   |                  | |                |
    |StoreFileScanner |   |StoreFileScanner  | | MemStoreScanner|
    |                 |   |                  | |                |
    +-------+---------+   +--------+---------+ +-------+--------+
            |                      |                   |
            |                      |                   |
            |                      |                   |
            |                      |                   |
    +-------v---------+   +--------v---------+ +-------v--------+
    |                 |   |                  | |                |
    |  HFileScanner   |   |  HFileScanner    | | HFileScanner   |
    |                 |   |                  | |                |
    +-----------------+   +------------------+ +----------------+

在HBase中,一张表可以有多个Column Family,在一次Scan的流程中,每个Column Family(后续叫Store)的数据读取由一个StoreScanner对象负责。每个Store的数据由一个内存中的MemStore和磁盘上的HFile文件组成,相对应的,StoreScanner对象雇佣一个MemStoreScanner和N个StoreFileScanner来进行实际的数据读取。

从逻辑上看,读取一行的数据需要

  1. 按照顺序读取出每个Store

  2. 对于每个Store,合并Store下面的相关的HFile和内存中的MemStore

实现上,这两步都是通过堆完成。RegionScanner的读取通过下面的多个StoreScanner组成的堆
完成,使用RegionScanner的成员变量KeyValueHeap storeHeap表示

组成StoreScanner的多个Scanner在RegionScannerImpl构造函数中获得:

for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
          scan.getFamilyMap().entrySet()) {
        Store store = stores.get(entry.getKey());        // 实际是StoreScanner类型
        KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);        if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
          || this.filter.isFamilyEssential(entry.getKey())) {
          scanners.add(scanner);
        } else {
          joinedScanners.add(scanner);
        }
}

store.getScanner(scan, entry.getValue(), this.readPt)内部就是new 一个StoreScanner,逻辑都在StoreScanner的构造函数中

构造函数内部其实就是找到相关的HFile和MemStore,然后建堆,注意,这个堆是StoreScanner级别的,一个StoreScanner一个堆,堆中的元素就是底下包含的HFile和MemStore对应的StoreFileScanner和MemStoreScanner
得到相关的HFile和MemStore逻辑在StoreScanner::getScannersNoCompaction()中,内部会根据请求指定的TimeRange,KeyRange过滤掉不需要的HFile,同时也会利用bloom filter过滤掉不需要的HFIle.接着,调用

seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
        isParallelSeekEnabled);

对这些StoreFileScanner和MemStoreScanner分别进行seek,seekKey是matcher.getStartKey(),
如下构造

 return new KeyValue(row, family, null, HConstants.LATEST_TIMESTAMP,        Type.DeleteFamily);

Seek语义

seek是针对KeyValue的,seek的语义是seek到指定KeyValue,如果指定KeyValue不存在,则seek到指定KeyValue的下一
个。举例来说,假设名为X的column family里有两列a和b,文件中有两行rowkey分别为aaa和
bbb,如下表所示.

Column Family X
rowkey column a column b
aaa 1 abc
bbb 2 def

HBase客户端设置scan请求的start key为aaa,那么matcher.getStartKey()会被初始化为(rowkey, family, qualifier,timestamp,type)=(aaa,X,null,LATEST_TIMESTAMP,Type.DeleteFamily),根据KeyValue的比较原则,这个KeyValue比aaa行的第一个列a更
小(因为没有qualifier),所以对这个StoreFileScanner seek时,会seek到aaa这行的第一列a

实际上

seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
        isParallelSeekEnabled);

有可能不会对StoreFileScanner进行实际的seek,而是进行lazy seek,seek的工作放到不得不做的时候。后续会专门说lazy seek

上面得到了请求scan涉及到的所有的column family对应的StoreScanner,随后调用如下函数进行建堆:

     protected void initializeKVHeap(List<KeyValueScanner> scanners,
        List<KeyValueScanner> joinedScanners, HRegion region)        throws IOException {      this.storeHeap = new KeyValueHeap(scanners, region.comparator);      if (!joinedScanners.isEmpty()) {        this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator);
      }
    }

KeyValueScanner是一个接口,表示一个可以向外迭代出KeyValue
的Scanner,StoreFileScanner,MemStoreScanner和StoreScanner都实现了该接口。这里的comparator类型为KVScannerComparator,用于比较两个KeyValueScanner,实际上内部使用了KVComparator,它是用来比较两个KeyValue的。从后面可以看出,实际上,这个由KeyValueScanner组成的堆,堆顶KeyValueScanner满足的特征是: 它的堆顶(KeyValue)最小

堆用类KeyValueHeap表示,看KeyValueHeap构造函数做了什么

    KeyValueHeap(List<? extends KeyValueScanner> scanners,
      KVScannerComparator comparator) throws IOException {    this.comparator = comparator;        if (!scanners.isEmpty()) {          // 根据传入的KeyValueScanner构造出一个优先级队列(内部实现就是堆)
          this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),              this.comparator);          for (KeyValueScanner scanner : scanners) {            if (scanner.peek() != null) {              this.heap.add(scanner);
            } else {
              scanner.close();
            }
          }        //以上将元素加入堆中
        // 从堆顶pop出一个KeyValueScanner放入成员变量current,那么这个堆的堆顶
        // 就是current这个KeyValueScanner的堆顶,KeyValueHeap的peek()取堆顶
        // 操作直接返回current.peek()
          this.current = pollRealKV();
        }
    }

在看pollRealKV()怎么做的之前需要先看看HBase 0.94引入的Lazy Seek

Lazy Seek优化

在这个优化之前,读取一个column family(Store),需要seek其下的所有HFile和MemStore到指定的查询KeyValue(seek的语义为如果KeyValue存在则seek到对应位置,如果不存在,则seek到这个KeyValue的后一个KeyValue,假设Store下有3个HFile和一个MemStore,按照时序递增记为[HFile1, HFile2, HFile3, MemStore],在lazy seek优化之前,需要对所有的HFile和MemStore进行seek,对HFile文件的seek比较慢,往往需要将HFile相应的block加载到内存,然后定位。在有了lazy seek优化之后,如果需要的KeyValue在HFile3中就存在,那么HFIle1和HFile2都不需要进行seek,大大提高速度。大体来说,思路是请求seek某个KeyValue时实际上没有对StoreFileScanner进行真正的seek,而是对于每个StoreFileScanner,设置它的peek为(rowkey,family,qualifier,lastTimestampInStoreFile)

KeyValueHeap有两个重要的接口,peek()和next(),他们都是返回堆顶,区别在于next()会将堆顶出堆,然后重新调整堆,对外来说就是迭代器向前移动,而peek()不会将堆顶出堆,堆顶不变。实现中,
peek()操作非常简单,只需要调用堆的成员变量current的peek()方法操作即可.拿StoreScanner堆举例,current要么是StoreFileScanner类型要么是MemStore,那么到底current是如何选择出来的以及Lazy Seek是如何实现的?

下面举个例子说明。

前提:

HBase开启了Lazy Seek优化(实际上默认开启)

假设:

Store下有三个HFile和MemStore,按照时间顺序记作[HFile1,HFile2,HFile3,MemStore],seek KeyValue为(rowkey,family,qualifier,timestamp),记作seekKV.
并且它只在HFile3中存在,不在其他HFile和MemStore中存在

Lazy Seek过程

seekScanner()的逻辑,如果是lazy seek,则对于每个Scanner都调
用requestSeek(seekKV)方法,方法内部首先进行rowcol类型的bloom filter过滤

  1. 如果结果判定seekKV在StoreFile中肯定不存在,则直接设置StoreFileScanner的peek(实际上StoreFileScanner不是一个
    堆只是为了统一代码)为 kv.createLastOnRowCol(),并且将realSeekDone设置true,表示实际的seek完成.

    public KeyValue createLastOnRowCol() {
        return new KeyValue(
            bytes, getRowOffset(), getRowLength(),
            bytes, getFamilyOffset(), getFamilyLength(),
            bytes, getQualifierOffset(), getQualifierLength(),
            HConstants.OLDEST_TIMESTAMP, Type.Minimum, null, 0, 0);
      }

    可以看出ts设置为最小,说明这个KeyValue排在所有的同rowkey同column family同qualifier的KeyValue最后。显然,当上层StoreScanner取堆顶时,
    如果其它StoreFileScanner/MemStoreScanner中存在同rowkey同column family同qualifier的真实的KeyValue则会优先弹出。

  2. 如果seekKV在StoreFile中,那么会执行如下逻辑:

     realSeekDone = false; long maxTimestampInFile = reader.getMaxTimestamp(); long seekTimestamp = kv.getTimestamp(); if (seekTimestamp > maxTimestampInFile) {     // Create a fake key that is not greater than the real next key.
         // (Lower timestamps correspond to higher KVs.)
         // To understand this better, consider that we are asked to seek
         // to
         // a higher timestamp than the max timestamp in this file. We
         // know that
         // the next point when we have to consider this file again is
         // when we
         // pass the max timestamp of this file (with the same
         // row/column).
         cur = kv.createFirstOnRowColTS(maxTimestampInFile);
      } else {
         enforceSeek();
      }

    显然,当kv的ts比HFile中最大的ts都更大时,那么这个HFile中显然不存在seekKV,但是可能存在
    相同rowkey,family,qualifier的不同ts的KeyValue,那么这里设置堆顶时要注意,不能把堆顶设置为比当前HFile文件中的可能真实存在的相同rowkey,family,qualifier的KeyValue大,如下:

    public KeyValue createFirstOnRowColTS(long ts) {
        return new KeyValue(
            bytes, getRowOffset(), getRowLength(),
            bytes, getFamilyOffset(), getFamilyLength(),
            bytes, getQualifierOffset(), getQualifierLength(),
            ts, Type.Maximum, bytes, getValueOffset(), getValueLength());
      }

    Type的比较中,Type.Maximum最小,这样产生的KeyValue保证了不会大于当前HFile文件中的可能存在的相同rowkey,family,qualifier的KeyValue,同时将seekKV保存到StoreFileScanner成员变量delayedSeekKV中,以便后续真正seek的时候获取.
    考虑一下如果seekKV的ts比当前HFile中的maxTimestamp更小怎么办?可以设置一个ts为latest_timestamp
    的KeyValue么?如果设置了,它会比其它HFile中存在实际的KeyValue先弹出,这样顺序就乱了,所以这种情况下,只能进行实际的seek,enforceSeek()函数中进行实际的seek后,将realSeekDone设置为
    true.

取StoreScanner堆顶逻辑

因为HFile3的latestTimestampInStoreFile最大,所以会首先取到HFile3对应的StoreFileScanner的pee
k(KeyValue的比较原则是timestamp大的KeyValue更小),
这个时候会检查这个KeyValueScanner是否进行了实际的seek(对于StoreFileScanner来说,通过布尔变量realSeekDone进行标记,对于MemStoreScanner来说,始终返回true),在这里,没有进行real seek
,接着进行实际的seek操作,seek到HFile3中存在的seekKV,接着拿着seekKV去和HFile2的peek进行比较,显然seekKV比HFile2的peek小(由于timestamp > lastTimestampInStoreFile2),故
StoreScanner的peek操作返回seekKV。

实现中,KeyValueHeap有两个重要的接口,peek()和next(),他们都是返回堆顶,区别在于next()会将堆顶出堆,然后重新调整堆,对外来说就是迭代器向前移动,而peek()不会将堆顶出堆,堆顶不变。实现中,
peek()操作非常简单,只需要调用堆的成员变量current的peek()方法操作即可.拿StoreScanner堆举例,current要么是StoreFileScanner类型要么是MemStore,而current的选择则是pollRealKV()
完成的,这个函数之所以内部有while循环就是因为考虑了Lazy Seek优化,实际上,pollRealKV()代码的逻辑就是例子中"取StoreScanner堆顶逻辑"。pollRealKV()的返回值会赋给current

  protected KeyValueScanner pollRealKV() throws IOException {
    KeyValueScanner kvScanner = heap.poll();    if (kvScanner == null) {      return null;
    }    while (kvScanner != null && !kvScanner.realSeekDone()) {      if (kvScanner.peek() != null) {
        kvScanner.enforceSeek();
        KeyValue curKV = kvScanner.peek();        if (curKV != null) {
          KeyValueScanner nextEarliestScanner = heap.peek();          if (nextEarliestScanner == null) {            // The heap is empty. Return the only possible scanner.
            return kvScanner;
          }          // Compare the current scanner to the next scanner. We try to avoid
          // putting the current one back into the heap if possible.
          KeyValue nextKV = nextEarliestScanner.peek();          if (nextKV == null || comparator.compare(curKV, nextKV) < 0) {            // We already have the scanner with the earliest KV, so return it.
            return kvScanner;
          }          // Otherwise, put the scanner back into the heap and let it compete
          // against all other scanners (both those that have done a "real
          // seek" and a "lazy seek").
          heap.add(kvScanner);
        } else {          // Close the scanner because we did a real seek and found out there
          // are no more KVs.
          kvScanner.close();
        }
      } else {        // Close the scanner because it has already run out of KVs even before
        // we had to do a real seek on it.
        kvScanner.close();
      }
      kvScanner = heap.poll();
    }    return kvScanner;
  }

Store下HFile集合发生变化如何处理

内存中的Memstore被flush到文件系统或者compaction完成都会改变Store的HFile文件集合。
在每次做完一批mutate操作后,会通过HRegion::isFlushSize(newSize)检查是否需要对当前HRegion内的memstore进行flush
其实就是判断HRegion内的所有的memstore大小和是否大于hbase.hregion.memstore.flush.size,默认128MB,如果需要flush,会将请求放入后台flush线程(MemStoreFlusher)的队列中,由后台flush线程处理,调用路径HRegion::flushcache()->internalFlushcache(...)->StoreFlushContext.flushCache(...)->StoreFlushContext.commit(...)=>HStore::updateStorefiles(),这块逻辑在HBase Snapshot原理和实现中有讲到,这里不赘述。只说一下最后一步的updateStorefiles()操作,该函数主要工作是拿住HStore级别的写锁,然后将新产生的HFile文件插入到StoreEngine中,解写锁,然后释放snapshot,最后调用
notifyChangedReadersObservers(),如下:

 this.lock.writeLock().lock(); try {   this.storeEngine.getStoreFileManager().insertNewFiles(sfs);   this.memstore.clearSnapshot(set);
 } finally {   // We need the lock, as long as we are updating the storeFiles
   // or changing the memstore. Let us release it before calling
   // notifyChangeReadersObservers. See HBASE-4485 for a possible
   // deadlock scenario that could have happened if continue to hold
   // the lock.
   this.lock.writeLock().unlock();
 } // Tell listeners of the change in readers.
 notifyChangedReadersObservers();

重点在于notifyChangedReadersObservers(),看看代码:

  private void notifyChangedReadersObservers() throws IOException {    for (ChangedReadersObserver o: this.changedReaderObservers) {
      o.updateReaders();
    }
  }

实际上,每个observer类型都是StoreScanner,每次新开一个StoreScanner都会注册在Store内部的这个observer集合中,当Store下面的HFile集合变化时,通知这些注册上来的StoreScanner即可。
具体的通知方式就是首先拿住StoreScanner的锁,将这个时候的堆顶保存在成员变量lastTop中,
然后将StoreScanner内部的堆置为null(this.heap=null)最后解锁,而StoreScanner那边next/seek/reseek时,都会首先通过函数checkReseek()函数来检查是否this.heap为null,为null
,为null说明当前Store下的HFile集合改变了,那么调用resetScannerStack(lastTop),将当前
Store下的所有StoreFileScanner/MemStoreScanner都seek到lastTop,然后重新建StoreScanner对应的堆。checkReseek()代码如下:

  protected boolean checkReseek() throws IOException {    if (this.heap == null && this.lastTop != null) {
      resetScannerStack(this.lastTop);      if (this.heap.peek() == null
          || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
        LOG.debug("Storescanner.peek() is changed where before = " + this.lastTop.toString()
            + ",and after = " + this.heap.peek());        this.lastTop = null;        return true;
      }      this.lastTop = null; // gone!
    }    // else dont need to reseek
    return false;
  }
 更多分享请关注:Q群:  340822685

© 著作权归作者所有

共有 人打赏支持
超人学院
粉丝 106
博文 335
码字总数 388917
作品 0
昌平
CTO(技术副总裁)
云HBase小组成功抢救某公司自建HBase集群,挽救30+T数据

摘要: 使用过开源HBase的人都知道,运维HBase是多么复杂的事情,集群大的时候,读写压力大,配置稍微不合理一点,就可能会出现集群状态不一致的情况,糟糕一点的直接导致入库、查询某个业务...

阿里云云栖社区
04/18
0
0
HBase shell的基本用法

hbase提供了一个shell的终端给用户交互。使用命令hbaseshell进入命令界面。通过执行 help可以看到命令的帮助信息。 以网上的一个学生成绩表的例子来演示hbase的用法。 name grad course mat...

超人学院
2015/05/27
0
0
Hbase的应用场景、原理及架构分析

Hbase概述 hbase是一个构建在HDFS上的分布式列存储系统。HBase是Apache Hadoop生态系统中的重要 一员,主要用于海量结构化数据存储。从逻辑上讲,HBase将数据按照表、行和列进行存储。 如图所...

xiangxizhishi
2017/07/22
0
0
云HBaseSQL及分析-Phoenix&Spark

在2018年1月的数据库直播大讲堂峰会HBase专场,来自阿里云的研发工程师瑾谦和沐远分享了云HBaseSQL以及分析Phoenix&Spark。本文介绍了详细了Phoinix和Spark的架构,适用性以及优缺点,并在最...

HBase技术社区
06/05
0
0
Hbase java api

本文中使用的是最原始的java api, 没有使用spring-data-hbase,只是使用spring管理Hbase的配置 查询操作分为如下几个步骤: 获取Hbase配置,这里的配置主要指hbase的地址。如果是Zookeeper...

哭哭吓唬你
2015/11/02
0
0
Spark常用函数(源码阅读六)

  源码层面整理下我们常用的操作RDD数据处理与分析的函数,从而能更好的应用于工作中。 连接Hbase,读取hbase的过程,首先代码如下: def tableInitByTime(sc : SparkContext,tableName : ...

雪童子
2016/11/24
0
0
hbaseMapper提交报找不到类NoClassDefFoundError

最近写一个hbase的mapper生成hfile文件,但是在执行jar包时,总是报Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/client/Scan错误,注释掉这个方法...

爱运动的小乌龟
07/09
0
0
HBase 常用Shell命令

1.进入hbase shell console $HBASE_HOME/bin/hbase shell 如果有kerberos认证,需要事先使用相应的keytab进行一下认证(使用kinit命令),认证成功之后再使用hbase shell进入可以使用whoami命...

zlfwmm
06/26
0
0
Hbase+Phoenix 安装及基本操作

1.安装Hbase: 解压:tar-zxvf hbase-1.2.1-bin.tar.gz 重命名:mvhbase-1.2.1 hbase 2.配置环境变量 exportJAVAHOME=/usr/local/software/jdk1.8.066 exportCLASSPATH=.:$JAVAHOME/lib/dt.j......

sinadrew
04/12
0
0
HBase中CloumnFamily的设计规则

为什么要这样子做呢? HBase本身的设计目标是支持稀疏表,而稀疏表通常会有很多列,但是每一行有值的列又比较少。 如果不使用Column Family的概念,那么有两种设计方案: 1.把所有列的数据放...

hanzhankang
2014/02/14
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

【面试题】盲人坐飞机

有100位乘客乘坐飞机,其中有一位是盲人,每位乘客都按自己的座位号就坐。由于盲人看不见自己的座位号,所以他可能会坐错位置,而自己的座位被占的乘客会随便找个座位就坐。问所有乘客都坐对...

garkey
48分钟前
0
0
谈谈神秘的ES6——(二)ES6的变量

谈谈神秘的ES6——(二)ES6的变量 我们在《零基础入门JavaScript》的时候就说过,在ES5里,变量是有弊端的,我们先来回顾一下。 首先,在ES5中,我们所有的变量都是通过关键字var来定义的。...

JandenMa
今天
1
0
arts-week1

Algorithm 594. Longest Harmonious Subsequence - LeetCode 274. H-Index - LeetCode 219. Contains Duplicate II - LeetCode 217. Contains Duplicate - LeetCode 438. Find All Anagrams ......

yysue
今天
0
0
NNS拍卖合约

前言 关于NNS的介绍,这里就不多做描述,相关的信息可以查看NNS的白皮书http://doc.neons.name/zh_CN/latest/nns_background.html。 首先nns中使用的竞价货币是sgas,关于sgas介绍可以戳htt...

红烧飞鱼
今天
1
0
Java IO类库之管道流PipeInputStream与PipeOutputStream

一、java管道流介绍 在java多线程通信中管道通信是一种重要的通信方式,在java中我们通过配套使用管道输出流PipedOutputStream和管道输入流PipedInputStream完成线程间通信。多线程管道通信的...

老韭菜
今天
0
0
用Python绘制红楼梦词云图,竟然发现了这个!

Python在数据分析中越来越受欢迎,已经达到了统计学家对R的喜爱程度,Python的拥护者们当然不会落后于R,开发了一个个好玩的数据分析工具,下面我们来看看如何使用Python,来读红楼梦,绘制小...

猫咪编程
今天
1
0
Java中 发出请求获取别人的数据(阿里云 查询IP归属地)

1.效果 调用阿里云的接口 去定位IP地址 2. 代码 /** * 1. Java中远程调用方法 * http://localhost:8080/mavenssm20180519/invokingUrl.action * @Title: invokingUrl * @Description: * @ret......

Lucky_Me
今天
1
0
protobuf学习笔记

相关文档 Protocol buffers(protobuf)入门简介及性能分析 Protobuf学习 - 入门

OSC_fly
昨天
0
0
Mybaties入门介绍

Mybaties和Hibernate是我们在Java开发中应用的比较多的两个ORM框架。当然,目前Mybaties正在慢慢取代Hibernate,这是因为相比较Hibernate而言Mybaties性能更好,响应更快,更加灵活。我们在开...

王子城
昨天
2
0
编程学习笔记之python深入之装饰器案例及说明文档[图]

编程学习笔记之python深入之装饰器案例及说明文档[图] 装饰器即在不对一个函数体进行任何修改,以及不改变整体的原本意思的情况下,增加函数功能的新函数,因为这个新函数对旧函数进行了装饰...

原创小博客
昨天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部