文档章节

Hadoop中DataNode的启动过程详解

彭苏云
 彭苏云
发布于 2015/01/29 10:08
字数 723
阅读 176
收藏 0

本文的目的在于详细记录2.4.1版本下hadoop的DataNode的启动过程,作此记录,也为以后回过头看DataNode留下方便。本文的思路是结合DataNode的代码,来分析他的启动过程。

说明,限于篇幅,对文章中引用的代码都只留了关键部分。

DataNode同NameNode都一样,是一个java进程,所以从main方法开始,看代码:

public class DataNode extends Configured 
    implements InterDatanodeProtocol, ClientDatanodeProtocol,
    DataNodeMXBean {
  public static void main(String args[]) {
    secureMain(args, null);
  }

  public static void secureMain(String args[], SecureResources resources) {
  	DataNode datanode = createDataNode(args, null, resources);
  }

  public static DataNode createDataNode(String args[], Configuration conf,
      SecureResources resources) throws IOException {
    DataNode dn = instantiateDataNode(args, conf, resources);
    if (dn != null) {
      dn.runDatanodeDaemon();
    }
    return dn;
  }

  public static DataNode instantiateDataNode(String args [], Configuration conf,
      SecureResources resources) throws IOException {
    return makeInstance(dataLocations, conf, resources);
  }

  static DataNode makeInstance(Collection<StorageLocation> dataDirs,
      Configuration conf, SecureResources resources) throws IOException {
    return new DataNode(conf, locations, resources);
  }

  DataNode(final Configuration conf,
           final List<StorageLocation> dataDirs,
           final SecureResources resources) throws IOException {
     startDataNode(conf, dataDirs, resources);
  }

  public void runDatanodeDaemon() throws IOException {
    blockPoolManager.startAll();

    // start dataXceiveServer
    dataXceiverServer.start();
    if (localDataXceiverServer != null) {
      localDataXceiverServer.start();
    }
    ipcServer.start();
    startPlugins(conf);
  }
}



上面的代码跟踪不难,但是最终需要注意两个方法:startDataNode和runDatanodeDaemon方法,前面一个用于初始化DataNode,后面一个启动DataNode的后台线程,这些线程是会伴随DataNode进程一直跑着的。接着,让我们重点研究下方法startDataNode,看代码:


void startDataNode(Configuration conf, 
                     List<StorageLocation> dataDirs,
                    // DatanodeProtocol namenode,
                     SecureResources resources
                     ) throws IOException {
    storage = new DataStorage();
    
    // global DN settings
    registerMXBean();
    initDataXceiver(conf);
    startInfoServer(conf);
    pauseMonitor = new JvmPauseMonitor(conf);
    pauseMonitor.start();

    initIpcServer(conf);
    blockPoolManager = new BlockPoolManager(this);
    blockPoolManager.refreshNamenodes(conf);
  }


registerMXBean这个方法可以忽略,用来注册MBean信息;initDataXceiver这个方法应该来说还是比较重要,实例化的dataXceiverServer用来接受客户端或者其他datanode的数据接收或者发送请求;startInfoServer方法用来启动datanode的web服务器;pauseMonitor用来监控jvm是否有停顿;initIpcServer方法比较重要,用来启动datanode上的rpc服务,主要包括两个服务:ClientDatanodeProtocolPB和InterDatanodeProtocolPB。

然后属于DataNode的重点来了,blockPoolManager对象的实例化,注意一点,2.4.1 这个版本的hadoop已经支持了hadoop Federation的特性,而blockPooolManager就是支撑这个特性来的。现在让我们来看看他里面的东西。还是先上代码吧。

class BlockPoolManager {
  BlockPoolManager(DataNode dn) {
    this.dn = dn;
  }

  void refreshNamenodes(Configuration conf)
      throws IOException {
    synchronized (refreshNamenodesLock) {
      doRefreshNamenodes(newAddressMap);
    }
  }

  private void doRefreshNamenodes(
      Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {
 synchronized (this) {
		startAll();
	}
  }

  synchronized void startAll() throws IOException {
	for (BPOfferService bpos : offerServices) {
          bpos.start();
     }
  }
}

class BPOfferService {
  void start() {
    for (BPServiceActor actor : bpServices) {
      actor.start();
    }
  }
}

class BPServiceActor implements Runnable {
  void start() {
    if ((bpThread != null) && (bpThread.isAlive())) {
      //Thread is started already
      return;
    }
    bpThread = new Thread(this, formatThreadName());
    bpThread.setDaemon(true); // needed for JUnit testing
    bpThread.start();
  }

  public void run() {
while (true) {
	connectToNNAndHandshake();
     	break;
}

    while (shouldRun()) {
      offerService();   
    }
  }

  private void offerService() throws Exception {
while (shouldRun()) {
	HeartbeatResponse resp = sendHeartBeat();

	List<DatanodeCommand> cmds = blockReport();
}
  }
}


顺着代码往下走,整个思路都会比较清晰了,BPServiceActor这个类做了具体的事情,包括datanode跟namenode的握手,发送心跳和报告块信息,执行namenode发回来的命名。

详细的过程就不啰嗦了。

到这里DataNode的启动过程就搞了一个段落。

 

© 著作权归作者所有

共有 人打赏支持
彭苏云
粉丝 41
博文 204
码字总数 54255
作品 0
广州
高级程序员
分布式计算框架Hadoop原理

  本文来自于csdn,这篇文章讲解了分布式计算框架的核心内容、架构图详解,运用流程等   hadoop是Apache软件基金会所开发的并行计算框架与分布式文件系统。最核心的模块包括Hadoop Comm...

深度学习
01/06
0
0
分布式计算框架Hadoop原理

  编辑推荐:   本文来自于csdn,这篇文章讲解了分布式计算框架的核心内容、架构图详解,运用流程等   Hadoop是Apache软件基金会所开发的并行计算框架与分布式文件系统。最核心的模块包...

大数据头条
01/05
0
0
Hadoop环境搭建及相关组件的工作流程介绍

1前言 本篇博客主要是记录Hadoop环境配置包括单机伪分布环境搭建,分布式环境搭建和Hadoop相关组件的工作流程介绍,包括HDFS读写流程,YARN的资源调度流程,MapReduce工作流程。 建议先理解各...

u014732537
05/24
0
0
hadoop架构详解一

hadoop架构组成 hadoop有两部分组成:分布式文件系统HDFS,统一的资源管理器YARN hdfs架构 Client:切分文件;访问或通过命令行管理HDFS;与NameNode交互,获取文件位置信息;与DataNode交互...

张欢19933
2016/03/29
224
0
[Hadoop] 完全分布式集群安装过程详解

1. 用Vmware Workstation创建4个虚拟机,每个虚拟机都装上Centos(版本:CentOS-6.3-x86_64),示意图如下: 2. 在所有结点上修改/etc/hosts,使彼此之间都能够用机器名解析IP 192.168.231....

长平狐
2013/06/03
122
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

人生苦短:Python里的17个“超赞操作

人生苦短,我选Python”。那么,你真的掌握了Python吗? 1. 交换变量 有时候,当我们要交换两个变量的值时,一种常规的方法是创建一个临时变量,然后用它来进行交换。比如: # 输入 a = 5 b ...

糖宝lsh
47分钟前
4
0
咕泡-spring中常用设计模式概述

设计模式就是经验之谈,供后人借鉴,解决一些具有代表性的问题 设计模式来源于生活,反过来帮助我们更好生活 设计模式提升代码的可读性、可扩展性、维护成本、复杂业务问题 千万不要死记硬背...

职业搬砖20年
今天
2
0
day59-20180817-流利阅读笔记-待学习

假·照骗,真·社交焦虑 雪梨 2018-08-17 1.今日导读 发朋友圈之前,不少人为了展现更美好的生活状态会对照片加以“微调”,或是加个滤镜显得逼格更高,或是磨个皮瘦个脸拉个大长腿。现在,国...

aibinxiao
今天
23
0
OSChina 周五乱弹 —— 姑娘在这个节日里表白你接受么?

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @Sharon啊:完全被这个小姐姐圈粉了,学两首她的歌去哈哈 分享王贰浪的单曲《往后余生(翻自 马良)》 《往后余生(翻自 马良)》- 王贰浪 手...

小小编辑
今天
1K
16
为什么HashMap要自己实现writeObject和readObject方法?

为什么HashMap要自己实现writeObject和readObject方法? 如果你有仔细阅读过HashMap的源码,那么你一定注意过一个问题:HashMap中有两个私有方法。 private void writeObject(java.io.Objec...

DemonsI
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部