文档章节

HDFS的reportWrittenBlock函数解析

强子哥哥
 强子哥哥
发布于 2014/12/19 18:08
字数 832
阅读 56
收藏 5
点赞 0
评论 0

可先看下网友的解析:http://fire-balrog.iteye.com/blog/812281

以下为我的解析:

当一个Block经由所有的DataNode写完后,就需要告诉namenode可以执行reportWrittenBlock函数了。

下面就来解析下这个函数的意思!

====================================

/**

     * The client can report in a set written blocks that it wrote.

     * These blocks are reported via the client instead of the datanode

     * to prevent weird heartbeat race conditions.

     */

    public void reportWrittenBlock(LocatedBlock lb) throws IOException {

        Block b = lb.getBlock();//获取完成的这个Block信息

        DatanodeInfo targets[] = lb.getLocations();//获取节点信息

        for (int i = 0; i < targets.length; i++) {

            namesystem.blockReceived(b, targets[i].getName());//对于每个DataNode来说,都要调用一次此函数

        }

}

C1:2014-12-19 18:26:00    C2:2014-12-19 18:59:00            C3:2014-12-19 19:03:00

=========================

那么,接下来就是理解 namesystem.blockReceived(b, targets[i].getName());了。

 

       /**

     * The given node is reporting that it received a certain block.

     */

    public synchronized void blockReceived(Block block, UTF8 name) {

        DatanodeInfo node = (DatanodeInfo) datanodeMap.get(name);//获取对应的datanode

        if (node == null) {//为空可不行

            throw new IllegalArgumentException("Unexpected exception.  Got blockReceived message from node " + name + ", but there is no info for " + name);

        }

        //

        // Modify the blocks->datanode map

        // 

        addStoredBlock(block, node);//下面两行是来执行blocknode的一个映射。

        //

        // Supplement node's blockreport

        //

        node.addBlock(block);//同上

    }

C1:2014-12-19 19:11:00       C2:2014-12-19 19:11:00      C3:2014-12-19 19:12:00

===============那么接下来还有2个函数需要攻破,分别是addStoredBlock和node.addBlock(block);

后面一个函数非常简单,不细讲,所以就剩下最后一个函数了!

addStoredBlock(block, node);的执行过程如下:

 

synchronized void addStoredBlock(Block block, DatanodeInfo node) {

        TreeSet containingNodes = (TreeSetblocksMap.get(block);//获取当前block已经存在的datanode信息

        if (containingNodes == null) {//这里保证肯定存在datanode集合,不保证一定有节点在内

            containingNodes = new TreeSet();

            blocksMap.put(block, containingNodes);

        }

        if (! containingNodes.contains(node)) {//根据需要决定是否加入此datanode信息

            containingNodes.add(node);

        } else {

            LOG.info("Redundant addStoredBlock request received for block " + block + " on node " + node);

        }

//接下来的逻辑是确定是否需要重新备份

        synchronized (neededReplications) {//锁定neededReplications

            if (dir.isValidBlock(block)) {//不懂这一句

                if (containingNodes.size() >= this.desiredReplication) {//如果已经超过最大备份个数

                    neededReplications.remove(block);//删除此block

                    pendingReplications.remove(block);//删除此block

                } else if (containingNodes.size() < this.desiredReplication) {

                    if (! neededReplications.contains(block)) {

                        neededReplications.add(block);//否则表示需要重新备份,这代码写的真够差的。。。

                    }

                }

                //

                // Find how many of the containing nodes are "extra", if any.

                // If there are any extras, call chooseExcessReplicates() to

                // mark them in the excessReplicateMap.

                //

//也有可能一个block存储的datanode节点数太多了,同样要删除这些block

                Vector nonExcess = new Vector();//构造一个空的Vector

                for (Iterator it = containingNodes.iterator(); it.hasNext(); ) {

                    DatanodeInfo cur = (DatanodeInfo) it.next();//对于当前节点来说

                    TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getName());//取到当前节点的多余块信息

                    if (excessBlocks == null || ! excessBlocks.contains(block)) {//如果之前没有标志在这个节点的多余块信息里

                        nonExcess.add(cur);//则表明当前节点存储了这个block

                    }

                }

                if (nonExcess.size() > this.maxReplication) {//如果超过了最大备份数

                    chooseExcessReplicates(nonExcess, block, this.maxReplication);//选择若干来消除块    

                }

            }

        }

}

void chooseExcessReplicates(Vector nonExcess, Block b, int maxReps) {

        while (nonExcess.size() - maxReps > 0) {//如果还有需要

            int chosenNode = r.nextInt(nonExcess.size());//随机选择一个节点

            DatanodeInfo cur = (DatanodeInfo) nonExcess.elementAt(chosenNode);

            nonExcess.removeElementAt(chosenNode);//获取这个节点

            TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getName());

            if (excessBlocks == null) {

                excessBlocks = new TreeSet();

                excessReplicateMap.put(cur.getName(), excessBlocks);

            }

            excessBlocks.add(b);//加入此blockexcessReplicateMap

            //

            // The 'excessblocks' tracks blocks until we get confirmation

            // that the datanode has deleted them; the only way we remove them

            // is when we get a "removeBlock" message.  

            //

            // The 'invalidate' list is used to inform the datanode the block 

            // should be deleted.  Items are removed from the invalidate list

            // upon giving instructions to the namenode.

            //

            Vector invalidateSet = (Vector) recentInvalidateSets.get(cur.getName());

            if (invalidateSet == null) {

                invalidateSet = new Vector();

                recentInvalidateSets.put(cur.getName(), invalidateSet);

            }

            invalidateSet.add(b);//同样的,更新recentInvalidateSets没啥好解释的

        }

    }

 

© 著作权归作者所有

共有 人打赏支持
强子哥哥

强子哥哥

粉丝 856
博文 551
码字总数 647493
作品 8
南京
架构师
循序渐进,了解Hive是什么!

一直想抽个时间整理下最近的所学,断断续续接触hive也有半个多月了,大体上了解了很多Hive相关的知识。那么,一般对陌生事物的认知都会经历下面几个阶段: 为什么会出现?解决了什么问题? ...

青夜之衫 ⋅ 2017/12/05 ⋅ 0

HDFS如何实现本地文件上传?

上传时,涉及到很多IO类,但是最关键的类就是:DFSOutputStream这个类。【0.1.0】 下面就开始分析这个类的代码。 -------------------------------------------类结构分析 ----------------...

强子哥哥 ⋅ 2014/12/15 ⋅ 0

Impala安装json解析udf插件

背景 Impala跟Hive一样,是常用的数据仓库组件之一。熟悉Hive的同学肯定知道,Hive官方提供了getjsonobject函数用于处理json字符串,但是Impala官方并没有提供类似的方法,好在是有第三方实现...

wooyoo ⋅ 2017/04/18 ⋅ 0

MapReduce和Yarn的运行机制详解

今天先简要介绍一下Hadoop中另外两个关键的组成部分:MapReduce和Yarn的运行机制。 首先普及一下概念: HDFS:负责海量数据的存储 MapReduce:负责海量数据的分析和计算 Yarn:负责资源管理调...

LeeWanzhi的博客 ⋅ 2017/12/20 ⋅ 0

学习笔记TF065: TensorFlowOnSpark

Hadoop生态大数据系统分为Yam、 HDFS、MapReduce计算框架。TensorFlow分布式相当于MapReduce计算框架,Kubernetes相当于Yam调度系统。TensorFlowOnSpark,利用远程直接内存访问(Remote Direc...

利炳根 ⋅ 2017/11/13 ⋅ 0

Hadoop作业引用第三方jar文件

在eclipse中写mapreduce程序, 引用第三方jar文件, 可以利用eclipse Hadoop插件直接run on hadoop提交, 很方便. 不过插件版本要和eclipse匹配, 不然总是local执行, 在50070是没有job产生的. 如...

Zero零_度 ⋅ 2015/10/29 ⋅ 0

Hadoop 版本 生态圈 MapReduce模型

一 Hadoop版本 和 生态圈 1. Hadoop版本 (1) Apache Hadoop版本介绍 Apache的开源项目开发流程 : -- 主干分支 : 新功能都是在 主干分支(trunk)上开发; -- 特性独有分支 : 很多新特性稳定性很...

日拱一卒 ⋅ 2014/05/17 ⋅ 0

一脸懵逼学习MapReduce的原理和编程(Map局部处理,Reduce汇总)和MapReduce几种运行方式

1:MapReduce的概述:   (1):MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题.   (2):MapReduce由两个阶段组成:Map和Reduce,用户只需要...

别叫小伙 ⋅ 2017/09/18 ⋅ 0

hadoop的概念

hadoop的概念 网上会经常遇到各种hadoop的概念,Hive,HBase,Hdfs都各是什么呢? 首先从hdfs说起,hdfs是分布式文件系统,它把集群当作单机一样做文件操作,文件可能存在于多个机器上,具体...

王二狗子11 ⋅ 01/07 ⋅ 0

初涉MapReduce程序

一 MapRecuce_WordCount程序测试 上次的hadoop中的hdfs文件系统中,我们把hadoop根目录下面的conf下的所有.xml文件上传到了hdfs文件系统中,下面我们就通过mapreduce程序来对单词进行一下统计 ...

black_lxf_720 ⋅ 2016/03/08 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

内存障碍: 软件黑客的硬件视图

此文为笔者近日有幸看到的一则关于计算机底层内存障碍的学术论文,并翻译(机译)而来[自认为翻译的还行],若读者想要英文原版的论文话,给我留言,我发给你。 内存障碍: 软件黑客的硬件视图...

Romane ⋅ 31分钟前 ⋅ 0

SpringCloud 微服务 (七) 服务通信 Feign

壹 继续第(六)篇RestTemplate篇 做到现在,本机上已经有注册中心: eureka, 服务:client、order、product 继续在order中实现通信向product服务,使用Feign方式 下面记录学习和遇到的问题 贰 or...

___大侠 ⋅ 48分钟前 ⋅ 0

001. 深入JVM学习—Java运行流程

1. Java运行流程图 2. Java运行时数据区 3. Java虚拟机栈 栈内存是线程私有的,其生命周期和线程相同; 虚拟机栈描述的是Java方法执行的内存模型:执行一个方法时会产生一个栈帧随后将其保存...

影狼 ⋅ 今天 ⋅ 0

gitee、github上issue标签方案

目录 [TOC] issue生命周期 st=>start: 开始e=>end: 结束op0=>operation: 新建issueop1=>operation: 评审issueop2=>operation: 任务负责人执行任务cond1=>condition: 是否通过?op3=>o......

lovewinner ⋅ 今天 ⋅ 0

浅谈mysql的索引设计原则以及常见索引的区别

索引定义:是一个单独的,存储在磁盘上的数据库结构,其包含着对数据表里所有记录的引用指针. 数据库索引的设计原则: 为了使索引的使用效率更高,在创建索引时,必须考虑在哪些字段上创建索...

屌丝男神 ⋅ 今天 ⋅ 0

String,StringBuilder,StringBuffer三者的区别

这三个类之间的区别主要是在两个方面,即运行速度和线程安全这两方面。 首先说运行速度,或者说是, 1.执行速度 在这方面运行速度快慢为:StringBuilder(线程不安全,可变) > StringBuffer...

时刻在奔跑 ⋅ 今天 ⋅ 0

java以太坊开发 - web3j使用钱包进行转账

首先载入钱包,然后利用账户凭证操作受控交易Transfer进行转账: Web3j web3 = Web3j.build(new HttpService()); // defaults to http://localhost:8545/Credentials credentials = Wallet......

以太坊教程 ⋅ 今天 ⋅ 0

Oracle全文检索配置与实践

Oracle全文检索配置与实践

微小宝 ⋅ 今天 ⋅ 0

mysql的分区和分表

1,什么是mysql分表,分区 什么是分表,从表面意思上看呢,就是把一张表分成N多个小表,具体请看mysql分表的3种方法 什么是分区,分区呢就是把一张表的数据分成N多个区块,这些区块可以在同一...

梦梦阁 ⋅ 今天 ⋅ 0

exception.ZuulException: Forwarding error

错误日志 com.netflix.zuul.exception.ZuulException: Forwarding error Caused by: com.netflix.hystrix.exception.HystrixRuntimeException: xxx timed-out and no fallback available. Ca......

jack_peng ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部