文档章节

HDFS的reportWrittenBlock函数解析

强子大叔的码田
 强子大叔的码田
发布于 2014/12/19 18:08
字数 832
阅读 202
收藏 5

码上生花,ECharts 作品展示赛正式启动!>>>

可先看下网友的解析:http://fire-balrog.iteye.com/blog/812281

以下为我的解析:

当一个Block经由所有的DataNode写完后,就需要告诉namenode可以执行reportWrittenBlock函数了。

下面就来解析下这个函数的意思!

====================================

/**

     * The client can report in a set written blocks that it wrote.

     * These blocks are reported via the client instead of the datanode

     * to prevent weird heartbeat race conditions.

     */

    public void reportWrittenBlock(LocatedBlock lb) throws IOException {

        Block b = lb.getBlock();//获取完成的这个Block信息

        DatanodeInfo targets[] = lb.getLocations();//获取节点信息

        for (int i = 0; i < targets.length; i++) {

            namesystem.blockReceived(b, targets[i].getName());//对于每个DataNode来说,都要调用一次此函数

        }

}

C1:2014-12-19 18:26:00    C2:2014-12-19 18:59:00            C3:2014-12-19 19:03:00

=========================

那么,接下来就是理解 namesystem.blockReceived(b, targets[i].getName());了。

 

       /**

     * The given node is reporting that it received a certain block.

     */

    public synchronized void blockReceived(Block block, UTF8 name) {

        DatanodeInfo node = (DatanodeInfo) datanodeMap.get(name);//获取对应的datanode

        if (node == null) {//为空可不行

            throw new IllegalArgumentException("Unexpected exception.  Got blockReceived message from node " + name + ", but there is no info for " + name);

        }

        //

        // Modify the blocks->datanode map

        // 

        addStoredBlock(block, node);//下面两行是来执行blocknode的一个映射。

        //

        // Supplement node's blockreport

        //

        node.addBlock(block);//同上

    }

C1:2014-12-19 19:11:00       C2:2014-12-19 19:11:00      C3:2014-12-19 19:12:00

===============那么接下来还有2个函数需要攻破,分别是addStoredBlock和node.addBlock(block);

后面一个函数非常简单,不细讲,所以就剩下最后一个函数了!

addStoredBlock(block, node);的执行过程如下:

 

synchronized void addStoredBlock(Block block, DatanodeInfo node) {

        TreeSet containingNodes = (TreeSetblocksMap.get(block);//获取当前block已经存在的datanode信息

        if (containingNodes == null) {//这里保证肯定存在datanode集合,不保证一定有节点在内

            containingNodes = new TreeSet();

            blocksMap.put(block, containingNodes);

        }

        if (! containingNodes.contains(node)) {//根据需要决定是否加入此datanode信息

            containingNodes.add(node);

        } else {

            LOG.info("Redundant addStoredBlock request received for block " + block + " on node " + node);

        }

//接下来的逻辑是确定是否需要重新备份

        synchronized (neededReplications) {//锁定neededReplications

            if (dir.isValidBlock(block)) {//不懂这一句

                if (containingNodes.size() >= this.desiredReplication) {//如果已经超过最大备份个数

                    neededReplications.remove(block);//删除此block

                    pendingReplications.remove(block);//删除此block

                } else if (containingNodes.size() < this.desiredReplication) {

                    if (! neededReplications.contains(block)) {

                        neededReplications.add(block);//否则表示需要重新备份,这代码写的真够差的。。。

                    }

                }

                //

                // Find how many of the containing nodes are "extra", if any.

                // If there are any extras, call chooseExcessReplicates() to

                // mark them in the excessReplicateMap.

                //

//也有可能一个block存储的datanode节点数太多了,同样要删除这些block

                Vector nonExcess = new Vector();//构造一个空的Vector

                for (Iterator it = containingNodes.iterator(); it.hasNext(); ) {

                    DatanodeInfo cur = (DatanodeInfo) it.next();//对于当前节点来说

                    TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getName());//取到当前节点的多余块信息

                    if (excessBlocks == null || ! excessBlocks.contains(block)) {//如果之前没有标志在这个节点的多余块信息里

                        nonExcess.add(cur);//则表明当前节点存储了这个block

                    }

                }

                if (nonExcess.size() > this.maxReplication) {//如果超过了最大备份数

                    chooseExcessReplicates(nonExcess, block, this.maxReplication);//选择若干来消除块    

                }

            }

        }

}

void chooseExcessReplicates(Vector nonExcess, Block b, int maxReps) {

        while (nonExcess.size() - maxReps > 0) {//如果还有需要

            int chosenNode = r.nextInt(nonExcess.size());//随机选择一个节点

            DatanodeInfo cur = (DatanodeInfo) nonExcess.elementAt(chosenNode);

            nonExcess.removeElementAt(chosenNode);//获取这个节点

            TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getName());

            if (excessBlocks == null) {

                excessBlocks = new TreeSet();

                excessReplicateMap.put(cur.getName(), excessBlocks);

            }

            excessBlocks.add(b);//加入此blockexcessReplicateMap

            //

            // The 'excessblocks' tracks blocks until we get confirmation

            // that the datanode has deleted them; the only way we remove them

            // is when we get a "removeBlock" message.  

            //

            // The 'invalidate' list is used to inform the datanode the block 

            // should be deleted.  Items are removed from the invalidate list

            // upon giving instructions to the namenode.

            //

            Vector invalidateSet = (Vector) recentInvalidateSets.get(cur.getName());

            if (invalidateSet == null) {

                invalidateSet = new Vector();

                recentInvalidateSets.put(cur.getName(), invalidateSet);

            }

            invalidateSet.add(b);//同样的,更新recentInvalidateSets没啥好解释的

        }

    }

 

© 著作权归作者所有

强子大叔的码田

强子大叔的码田

粉丝 924
博文 1630
码字总数 1283214
作品 9
南京
架构师
私信 提问
加载中
请先登录后再评论。
循序渐进,了解Hive是什么!

一直想抽个时间整理下最近的所学,断断续续接触hive也有半个多月了,大体上了解了很多Hive相关的知识。那么,一般对陌生事物的认知都会经历下面几个阶段: 为什么会出现?解决了什么问题? ...

青夜之衫
2017/12/05
0
0
用mapreduce 处理气象数据集

编写程序求每日最高最低气温,区间最高最低气温 气象数据集下载地址为:ftp://ftp.ncdc.noaa.gov/pub/data/noaa 2.按学号后三位下载不同年份月份的数据(例如201506110136号同学,就下载201...

osc_yfquc6et
2018/05/09
4
0
C++通过http协议操作hdfs

一、http协议存在hdfs组件 通过http协议操作hdfs有两个组件,httpfs和webhdfs,我一开始还以为这两个是同一个东西,其实不是。webhdfs是namenode、datanode自带的,httpfs是完全独立的一个组...

byxdaz
2019/11/23
0
0
用Python编写WordCount程序任务

用Python编写WordCount程序并提交任务 程序 WordCount 输入 一个包含大量单词的文本文件 输出 文件中每个单词及其出现次数(频数),并按照单词字母顺序排序,每个单词和其频数占一行,单词和...

osc_dfi5j6xi
2018/05/11
2
0
MapReduce和Yarn的运行机制详解

今天先简要介绍一下Hadoop中另外两个关键的组成部分:MapReduce和Yarn的运行机制。 首先普及一下概念: HDFS:负责海量数据的存储 MapReduce:负责海量数据的分析和计算 Yarn:负责资源管理调...

LeeWanzhi的博客
2017/12/20
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OpenvSwitch系列之四 ovs-ofctl命令使用

Open vSwitch系列之一 Open vSwitch诞生 Open vSwitch系列之二 安装指定版本ovs Open vSwitch系列之三 ovs-vsctl命令使用 Open vSwitch系列之四 ovs-ofctl命令使用 Open vSwitch系列之五 网桥...

osc_y9y4gqxx
10分钟前
6
0
27岁了,程序员写给自己的一封信

前言 相信很多人小时候的梦想都是当科学家,现在想想当初太傻,发现这些根本不可选。就跟考清华还是北大一样,当初纠结的要死,结果发现自己只是普通人,过着普通的生活,上着一眼望到头的班...

osc_teeurf8z
11分钟前
8
0
mysql 存储过程 查询结果集循环处理游标使用

注意每个版本的mysq的存储过程,触发器写法都会有些许区别,注意查看官方版本,不然你网上copy的语句可能执行无效,或者不成功 官方英文说明文档 https://dev.mysql.com/doc/refman/5.7/en/...

Love彼岸花开
12分钟前
3
0
Dump微信PC端的界面Duilib文件

零、有啥用 网上大多数的微信逆向思路,是CE搜索数据得到地址,OD下访问断点,然后在堆栈里面大海捞针的找Call,效率太低了。 其实微信的界面是用Duilib做的,Duilib的界面布局写在XML文件。...

osc_paqz1zc7
12分钟前
17
0
VNC电脑客户端,VNC电脑客户端下载!

IIS7服务器管理工具能够作为VNC的客户端,进行VNC的命令操作,可在客户端,下载,安装VNC软件! 同时,它也可以作为FTP的客户端,进行FTP的命令操作!它能够批量连接Windows和Linux系统下的服...

Raymond13
12分钟前
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部