文档章节

hbase客户端源码分析调用

Zero零_度
 Zero零_度
发布于 2017/05/28 10:23
字数 1044
阅读 34
收藏 1

—client 的调用流程

delete 数据的流程.(table.delete(deleteColumn);)

(源码基于Hbase-1.1.5版本)

HTable table = new HTable(conf, Bytes.toBytes(tableName));
  • 1
  • 1

HTable 对象创建时调用如下方法创建对远程的链接对象管理器

ConnectionManager.getConnectionInternal(conf)
 ConnectionFactory.createConnection(conf, managed, pool, user)

默认为 HConnectionImplementation 类 
通过 ZooKeeperRegistry.getClusterId 拿到集群的id 
在该方法中调用 
HConnectionImplementation.getKeepAliveZooKeeperWatcher 
拿到zk的链接 
然后调用如下方法拿到clousterId

this.clusterId = ZKClusterId.readClusterIdZNode(zkw);

接下来创建rpcclient

this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);

实现类为 RpcClientImpl 
这样一个hbase对象的创建就完成了。

table.delete(deleteColumn);开始调用。 
创建一个 RegionServerCallable 对象

RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,tableName, delete.getRow()) 

包含了要删除的对应的表名和那一行。 
然后通过调用一下多次尝试的对象调用

rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, this.operationTimeout);

在caller中,会调用到callable.prepare,进行初始化。 
在初始化的时候,要拿到对应的regionserver信息

try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
  this.location = regionLocator.getRegionLocation(row, reload);
}

然后通过读取的是那一个主键,去进行定

locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID)

然后拼接要删除的行的对象

byte[] metaKey = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false);

通过创建

 Scan s = new Scan();
  s.setReversed(true);
  s.setStartRow(metaKey);
  s.setSmall(true);
  s.setCaching(1);

对象到发送到 meta元数据中进行查询

rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this,
rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0);

然后发起对 hbase:meta 元数据表的查询

smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan,
 getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(),
 getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller);

然后发起了查询了

values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);

然后创建对hbase:meta表的查询请求

RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
currentScannerCallable.getRow()); 
    在getRegionLocations 方法中
   rl = cConnection.locateRegion(tableName, row, useCache, true, replicaId);

发起元数据远程连接的调用

@Override
public RegionLocations locateRegion(final TableName tableName,
  final byte [] row, boolean useCache, boolean retry, int replicaId)
throws IOException {
  if (this.closed) throw new IOException(toString() + " closed");
  if (tableName== null || tableName.getName().length == 0) {
    throw new IllegalArgumentException(
        "table name cannot be null or zero length");
  }
  if (tableName.equals(TableName.META_TABLE_NAME)) {
    return locateMeta(tableName, useCache, replicaId);
  } else {
    // Region not in the cache - have to go to the meta RS
    return locateRegionInMeta(tableName, row, useCache, retry, replicaId);
  }
}

通过判断是否是元数据表,进行不同的定位,如果是元数据,就去zk中进行读取,否则,

在元数据中

locations = this.registry.getMetaRegionLocation();

拿到meta元数据的位置信息,这个meta元数据的位置信息是存放在zk中的 
通过下面的方法读取

List<ServerName> servers = new MetaTableLocator().blockUntilAvailable(zkw, hci.rpcTimeout,
      hci.getConfiguration());

拿到后,会缓存在client端 
然后就创建到hmaster的连接了

setStub(super.getConnection().getClient(dest));(dest=myubuntu,16020,1468468352472)

创建直接到这个目标端口的rpc 客户端连接

 BlockingRpcChannel channel =
  this.rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
  stub = ClientService.newBlockingStub(channel);

创建了 BlockingRpcChannelImplementation对象 
真正发起了请求了(这个时候的tableName ==hbase:meta)还是元数据表

 @Override
public Result[] call(int timeout) throws IOException {
  if (this.closed) return null;
  if (Thread.interrupted()) {
throw new InterruptedIOException();
  }
  ScanRequest request = RequestConverter.buildScanRequest(getLocation()
  .getRegionInfo().getRegionName(), getScan(), getCaching(), true);
  ScanResponse response = null;
  controller = controllerFactory.newController();
  try {
controller.setPriority(getTableName());
controller.setCallTimeout(timeout);
response = getStub().scan(controller, request);
Result[] results = ResponseConverter.getResults(controller.cellScanner(),
response);
if (response.hasMoreResultsInRegion()) {
  setHasMoreResultsContext(true);
  setServerHasMoreResults(response.getMoreResultsInRegion());
} else {
  setHasMoreResultsContext(false);
}
// We need to update result metrics since we are overriding call()
updateResultsMetrics(results);
return results;
  } catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
  }
}

request 对象内容为(blog2为我要删除的表。rowkey1为我要删除的主键)

region {
  type: REGION_NAME
  value: "hbase:meta,,1"
}
scan {
  start_row: "blog2,rowkey1,99999999999999"
  max_versions: 1
  cache_blocks: true
  small: true
  reversed: true
  caching: 1
}
number_of_rows: 1
close_scanner: true
client_handles_partials: true
client_handles_heartbeats: true

然后在 RpcClientImpl 的Connection对象的创建过程中 
设置了servicename =ClientService

ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
  builder.setServiceName(remoteId.getServiceName());
  UserInformation userInfoPB = getUserInfo(ticket);
  if (userInfoPB != null) {
builder.setUserInfo(userInfoPB);
  }
  if (this.codec != null) {
builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName());
  }
  if (this.compressor != null) {
builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName());
  }
  builder.setVersionInfo(ProtobufUtil.getVersionInfo());
  this.header = builder.build();

这样就设置了头部信息了 
然后就根据刚才创建的connection信息,进行了发送查询

 final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType,
pcrc.getCallTimeout());
final Connection connection = getConnection(ticket, call, addr);
connection.tracedWriteRequest(call, pcrc.getPriority(), Trace.currentSpan());

然后现在就连接到远程 setupIOstreams();

 NetUtils.connect(this.socket, remoteId.getAddress(), connectTO);

在这里直接发送数据过去了 
IPCUtil.write(this.out, header, call.param, cellBlock);

当把结果拿到后

setStub(getConnection().getClient(this.location.getServerName()));
(location = region=blog2,,1468401068588.f8572806e5866a27ccf7464ec899bb18., hostname=myubuntu,16020,1468468352472, seqNum=25)

就可以连接到具体的那个业务表的regionserver了 
拿到地址后,最后回调回对原始方法的调用

public void delete(final Delete delete)
  throws IOException {
RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
tableName, delete.getRow()) {
  @Override
  public Boolean call(int callTimeout) throws IOException {
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);

try {
  MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), delete);
  MutateResponse response = getStub().mutate(controller, request);
  return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) {
  throw ProtobufUtil.getRemoteException(se);
}
  }
};
rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
this.operationTimeout);
  }

tableName = blog2

最终返回到用户了

本文转载自:http://blog.csdn.net/gaoshui87/article/details/51920049

Zero零_度
粉丝 69
博文 1258
码字总数 257684
作品 0
程序员
私信 提问
HBase 写优化之 BulkLoad 实现数据快速入库

1、为何要 BulkLoad 导入?传统的 HTableOutputFormat 写 HBase 有什么问题? 我们先看下 HBase 的写流程: 通常 MapReduce 在写HBase时使用的是 TableOutputFormat 方式,在reduce中直接生成...

大数据之路
2013/12/25
0
1
HBase应用实践专场-HBase问题排查思路

HBCK - HBCK检查什么? (1)HBase Region一致性 集群中所有region都被assign,而且deploy到唯一一台RegionServer上 该region的状态在内存中、hbase:meta表中以及zookeeper这三个地方需要保持一...

HBase技术社区
2018/09/03
0
0
Hbase原理以及基本运行方式和优化

HBase是一个构建在HDFS上的分布式列存储系统; HBase是基于Google BigTable模型开发的,典型的key/value系统; HBase是Apache Hadoop生态系统中的重要一员,主要用于海量非结构化数据存储; ...

脸大的都是胖纸
2015/07/06
0
0
HBase原理之HBase MetaStore&Compaction剖析

1.概述 客户端读写数据是先从HBase Clienr获取RegionServer的元数据信息,比如Region地址信息。在执行数据写操作时,HBase会先写MetaStore,为什么会写到MetaStore。本篇文章将为读者剖析HBa...

HBase技术社区
2018/09/23
0
0
大数据 - 如何给你的Hbase写入性能提速

背景 之前我们的线上业务一直使用的是Hbase的单条put操作,为了提高程序的写入性能我们还针对业务进行了修改,将日志批量化,也就是hbase的put多条操作,后面发现hbase的客户端是支持本地批量...

大猪佩琪2019
03/24
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Node.js 多进程处理CPU密集任务

Node.js 单线程与多进程 大家都知道 Node.js 性能很高,是以异步事件驱动、非阻塞 I/O 而被广泛使用。但缺点也很明显,由于 Node.js 是单线程程序,如果长时间运算,会导致 CPU 不能及时释放...

Svend
40分钟前
3
0
Django笔记-3-模型-20190526

简介 django为各种数据库提供了很好的支持,django对这些数据库提供了统一的调用API;可以根据不同的也无需求选择不同的数据库; 配置数据库 在setting.py文件中配置数据库 DATABASES = { ...

Frank1126lin
56分钟前
3
0
OSChina 周日乱弹 —— 程序员做噩梦

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @-冰冰棒- :#今日歌曲推荐# 手嶌葵《Kiss The Girl》 《Kiss The Girl》- 手嶌葵 手机党少年们想听歌,请使劲儿戳(这里) @Sharon啊 :今天...

小小编辑
今天
190
11
Another app is currently holding the yum lock; waiting for it to exit...

Another app is currently holding the yum lock; waiting for it to exit... The other application is: PackageKit Memory : 153 M RSS (266 MB VSZ) Started: Thu Jul 12 00:03......

圣洁之子
今天
2
0
FastDateFormat 研究

FastDateFormat 对缓存的利用,其实就是用ConcurrentHashMap 做了一个map类型的缓存 public F getInstance(final String pattern, TimeZone timeZone, Locale locale) { Validate......

暗中观察
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部