文档章节

HDFS中的DataNode菊花链式接收数据

强子哥哥
 强子哥哥
发布于 2014/12/24 16:38
字数 973
阅读 47
收藏 0
点赞 0
评论 0

public void run() {//数据节点运行

            try {

                DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));

                try {//获取输入流

                    byte op = (byte) in.read();//读取操作类型

                    if (op == OP_WRITE_BLOCK) {//如果是写一个块

                        /

                        // Read in the header

                        //

                        DataOutputStream reply = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));

                        try {//获取输出流

                            boolean shouldReportBlock = in.readBoolean();//读取一个boolean

                            Block b = new Block();

                            b.readFields(in);//创建一个新的block

                            int numTargets = in.readInt();//读取目标机器的个数

                            if (numTargets <= 0) {

                                throw new IOException("Mislabelled incoming datastream.");

                            }

                            DatanodeInfo targets[] = new DatanodeInfo[numTargets];//用来存储目标datanode节点信息

                            for (int i = 0; i < targets.length; i++) {//开始初始化it.

                                DatanodeInfo tmp = new DatanodeInfo();

                                tmp.readFields(in);

                                targets[i] = tmp;

                            }

                            byte encodingType = (byte) in.read();//获取encodingType

                            long len = in.readLong();//获取这个块的长度

                            //

                            // Make sure curTarget is equal to this machine

                            //

                            DatanodeInfo curTarget = targets[0];//获取第一个data节点

                            //

                            // Track all the places we've successfully written the block

                            //

                            Vector mirrors = new Vector();//???

                            //

                            // Open local disk out

                            //

                            DataOutputStream out = new DataOutputStream(new BufferedOutputStream(data.writeToBlock(b)));

//本地文件输出流

                            InetSocketAddress mirrorTarget = null;

                            try {

                                //

                                // Open network conn to backup machine, if 

                                // appropriate

                                //

                                DataInputStream in2 = null;

                                DataOutputStream out2 = null;

                                if (targets.length > 1) {

                                    // Connect to backup machine

                                    mirrorTarget = createSocketAddr(targets[1].getName().toString());//连接镜像节点

                                    try {

                                        Socket s2 = new Socket();

                                        s2.connect(mirrorTarget, READ_TIMEOUT);//连接镜像节点

                                        s2.setSoTimeout(READ_TIMEOUT);//设置超时时间

                                        out2 = new DataOutputStream(new BufferedOutputStream(s2.getOutputStream()));

                                        in2 = new DataInputStream(new BufferedInputStream(s2.getInputStream()));

                                        //获取与备份节点的2IO

                                        // Write connection header

                                        out2.write(OP_WRITE_BLOCK);//写操作类型

                                        out2.writeBoolean(shouldReportBlock);//boolean

                                        b.write(out2);//block

                                        out2.writeInt(targets.length - 1);//写剩下的几个datanode信息

                                        for (int i = 1; i < targets.length; i++) {

                                            targets[i].write(out2);

                                        }

                                        out2.write(encodingType);

                                        out2.writeLong(len);//数据长度

                                    } catch (IOException ie) {

                                        if (out2 != null) {

                                            try {

                                                out2.close();

                                                in2.close();

                                            } catch (IOException out2close) {

                                            } finally {

                                                out2 = null;

                                                in2 = null;

                                            }

                                        }

                                    }

                                }

                                //

                                // Process incoming data, copy to disk and

                                // maybe to network.

                                //万事俱备只欠数据

                                try {

                                    boolean anotherChunk = len != 0;//true

                                    byte buf[] = new byte[BUFFER_SIZE];//设置缓冲区

                                    while (anotherChunk) {

                                        while (len > 0) {//读取一些数据

                                            int bytesRead = in.read(buf, 0, (int)Math.min(buf.length, len));

                                            if (bytesRead < 0) {

                                              throw new EOFException("EOF reading from "+s.toString());

                                            }

                                            if (bytesRead > 0) {

                                                try {//写入到本地磁盘文件

                                                    out.write(buf, 0, bytesRead);

                                                } catch (IOException iex) {

                                                    shutdown();

                                                    throw iex;

                                                }

                                                if (out2 != null) {

                                                    try {//写入到网络以便让备份节点获取

                                                        out2.write(buf, 0, bytesRead);

                                                    } catch (IOException out2e) {

                                                        //

                                                        // If stream-copy fails, continue 

                                                        // writing to disk.  We shouldn't 

                                                        // interrupt client write.

                                                        //

                                                        try {

                                                            out2.close();

                                                            in2.close();

                                                        } catch (IOException out2close) {

                                                        } finally {

                                                            out2 = null;

                                                            in2 = null;

                                                        }

                                                    }

                                                }

                                                len -= bytesRead;//调整要写的字节数

                                            }

                                        }

                                        if (encodingType == RUNLENGTH_ENCODING) {

                                            anotherChunk = false;

                                        } else if (encodingType == CHUNKED_ENCODING) {

                                            len = in.readLong();//准备读取下一块block

                                            if (out2 != null) {

                                                out2.writeLong(len);

                                            }

                                            if (len == 0) {

                                                anotherChunk = false;

                                            }

                                        }

                                    }

                                    if (out2 == null) {

                                        LOG.info("Received block " + b + " from " + s.getInetAddress());

                                    } else {

                                        out2.flush();//输出缓冲区的内容

                                        long complete = in2.readLong();//得到备份节点的响应

                                        if (complete != WRITE_COMPLETE) {

                                            LOG.info("Conflicting value for WRITE_COMPLETE: " + complete);

                                        }

                                        LocatedBlock newLB = new LocatedBlock();

                                        newLB.readFields(in2);//得到block以及成功写到的节点信息

                                        DatanodeInfo mirrorsSoFar[] = newLB.getLocations();

                                        for (int k = 0; k < mirrorsSoFar.length; k++) {

                                            mirrors.add(mirrorsSoFar[k]);//更新到本地的mirrors

                                        }

                                        LOG.info("Received block " + b + " from " + s.getInetAddress() + " and mirrored to " + mirrorTarget);

                                    }

                                } finally {

                                    if (out2 != null) {//清理线程

                                        out2.close();

                                        in2.close();

                                    }

                                }

                            } finally {//清理现场

                                try {

                                    out.close();

                                } catch (IOException iex) {

                                    shutdown();

                                    throw iex;

                                }

                            }

                            data.finalizeBlock(b);//写入到dirTree结构中

                            // 

                            // Tell the namenode that we've received this block 

                            // in full, if we've been asked to.  This is done

                            // during NameNode-directed block transfers, but not

                            // client writes.

                            //

                            if (shouldReportBlock) {

                                synchronized (receivedBlockList) {

                                    receivedBlockList.add(b);//更新本地的receivedBlockList

                                    receivedBlockList.notifyAll();

                                }

                            }

                            //

                            // Tell client job is done, and reply with

                            // the new LocatedBlock.

                            //

                            reply.writeLong(WRITE_COMPLETE);//写结束

                            mirrors.add(curTarget);//把自己加上来

                            LocatedBlock newLB = new LocatedBlock(b, (DatanodeInfo[]) mirrors.toArray(new DatanodeInfo[mirrors.size()]));

                            newLB.write(reply);//返回给客户端

                        } finally {

                            reply.close();

                        }

                        //剩下的为读数据,忽略。

                    } else if (op == OP_READ_BLOCK || op == OP_READSKIP_BLOCK) {

                        //

                        // Read in the header

                        //

                        Block b = new Block();

                        b.readFields(in);

                        long toSkip = 0;

                        if (op == OP_READSKIP_BLOCK) {

                            toSkip = in.readLong();

                        }

                        //

                        // Open reply stream

                        //

                        DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));

                        try {

                            //

                            // Write filelen of -1 if error

                            //

                            if (! data.isValidBlock(b)) {

                                out.writeLong(-1);

                            } else {

                                //

                                // Get blockdata from disk

                                //

                                long len = data.getLength(b);

                                DataInputStream in2 = new DataInputStream(data.getBlockData(b));

                                out.writeLong(len);

                                if (op == OP_READSKIP_BLOCK) {

                                    if (toSkip > len) {

                                        toSkip = len;

                                    }

                                    long amtSkipped = 0;

                                    try {

                                        amtSkipped = in2.skip(toSkip);

                                    } catch (IOException iex) {

                                        shutdown();

                                        throw iex;

                                    }

                                    out.writeLong(amtSkipped);

                                }

                                byte buf[] = new byte[BUFFER_SIZE];

                                try {

                                    int bytesRead = 0;

                                    try {

                                        bytesRead = in2.read(buf);

                                    } catch (IOException iex) {

                                        shutdown();

                                        throw iex;

                                    }

                                    while (bytesRead >= 0) {

                                        out.write(buf, 0, bytesRead);

                                        len -= bytesRead;

                                        try {

                                            bytesRead = in2.read(buf);

                                        } catch (IOException iex) {

                                            shutdown();

                                            throw iex;

                                        }

                                    }

                                } catch (SocketException se) {

                                    // This might be because the reader

                                    // closed the stream early

                                } finally {

                                    try {

                                        in2.close();

                                    } catch (IOException iex) {

                                        shutdown();

                                        throw iex;

                                    }

                                }

                            }

                            LOG.info("Served block " + b + " to " + s.getInetAddress());

                        } finally {

                            out.close();

                        }

                    } else {

                        while (op >= 0) {

                            System.out.println("Faulty op: " + op);

                            op = (byte) in.read();

                        }

                        throw new IOException("Unknown opcode for incoming data stream");

                    }

                } finally {

                    in.close();

                }

            } catch (IOException ie) {

              LOG.log(Level.WARNING"DataXCeiver", ie);

            } finally {

                try {

                    s.close();

                } catch (IOException ie2) {

                }

            }

        }

    }

© 著作权归作者所有

共有 人打赏支持
强子哥哥

强子哥哥

粉丝 856
博文 551
码字总数 647493
作品 8
南京
架构师
DADOOP集群HDFS工作机制

hdfs的工作机制 概述 1. HDFS集群分为两大角色:NameNode、DataNode (Secondary Namenode) 2. NameNode负责管理整个文件系统的元数据 3. DataNode 负责管理用户的文件数据块 4. 文件会按照固...

weixin_40747272 ⋅ 04/15 ⋅ 0

HDFS的架构和设计要点

虽然本文已经比较旧远了,但是在很多方面还是有一定学习的价值,中文版译者为killme。 一、前提和设计目标 硬件错误是常态,而非异常情况,HDFS可能是有成百上千的server组成,任何一个组件都...

xrzs ⋅ 2012/10/11 ⋅ 0

hadoop(2.5,2.6) HDFS偶发性心跳异常以及大量DataXceiver线程被Blocked故障处理分享

一、概要 公司近期Storm清洗程序那边反应HDFS会出现偶发性的异常导致数据写不进HDFS,另外一些Spark作业在大规模往HDFS灌数据时客户端会出现各种“all datanode bad..”以及服务端出现各种t...

zengzhaozheng ⋅ 2015/08/13 ⋅ 0

HDFS 原理、架构与特性介绍

本文主要讲述 HDFS原理-架构、副本机制、HDFS负载均衡、机架感知、健壮性、文件删除恢复机制 1:当前HDFS架构详尽分析 HDFS架构 •NameNode •DataNode •Sencondary NameNode 数据存储细节 ...

xrzs ⋅ 2013/08/11 ⋅ 1

HDFS系统架构

1:当前HDFS架构详尽分析 HDFS架构 •NameNode •DataNode •Sencondary NameNode 数据存储细节 NameNode 目录结构 Namenode 的目录结构: ${ dfs.name.dir}/current /VERSION /edits /fsim...

o0无忧亦无怖 ⋅ 2015/10/08 ⋅ 0

深刻理解HDFS工作机制

 深入理解一个技术的工作机制是灵活运用和快速解决问题的根本方法,也是唯一途径。对于HDFS来说除了要明白它的应用场景和用法以及通用分布式架构之外更重要的是理解关键步骤的原理和实现细节...

张涛泽 ⋅ 2017/04/19 ⋅ 0

HDFS入门笔记------架构以及应用介绍

引言—HDFS的重要性: Hadoop的定义:适合大数据的分布式存储与计算的一个平台,其中大数据的分布式存储就是由HDFS来完成的,因此掌握好HDFS的相关概念与应用非常重要! 本篇博客将从以下几个...

a2011480169 ⋅ 2016/06/14 ⋅ 0

HDFS的工作流程分析

作者: Bryanzhou HDFS的工作机制概述 HDFS集群分为两大角色:NameNode、DataNode NameNode负责管理整个文件系统的元数据 DataNode 负责管理用户的文件数据块 文件会按照固定的大小(blocksi...

xiaogong1688 ⋅ 2017/08/02 ⋅ 0

hadoop学习一:HDFS

HDFS和KFS都是GFS的开源实现,而HDFS是Hadoop的子项目,用Java实现,为Hadoop上层应用提供高吞吐量的可扩展的大文件存储服务。KFS是web级一个用如存储log data、Map/Reduce数据的分布式文件系...

SibylY ⋅ 2013/09/13 ⋅ 1

【Hadoop】数据存储----HDFS

一次写入,多次查询,写人关闭时,不支持修改,同一时间只有写或读一个操作,不支持并发写入情况,适合大数据 使用前提: 1.hdfs设计思路和实现目标 : 1.硬件错误检测和快速恢复;2.注重批量...

mengdonghui123456 ⋅ 2016/11/13 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Mahout推荐算法API详解

前言 用Mahout来构建推荐系统,是一件既简单又困难的事情。简单是因为Mahout完整地封装了“协同过滤”算法,并实现了并行化,提供非常简单的API接口;困难是因为我们不了解算法细节,很难去根...

xiaomin0322 ⋅ 18分钟前 ⋅ 0

WampServer默认web服务器根目录位置

安装WampServer之后的web服务器根目录默认位置在WampServer安装目录下的www:

临江仙卜算子 ⋅ 19分钟前 ⋅ 0

Redux的一些手法记录

Redux Redux的基本概念见另一篇文。 这里记录一下Redux在项目中的实际操作的手法。 actions 首先定义action.js,actions的type,可以另起一个action-type.js文件。 action-type.js用来存...

LinearLaw ⋅ 20分钟前 ⋅ 0

android 手势检测(左右滑动、上下滑动)

GestureDetector类可以让我们快速的处理手势事件,如点击,滑动等。 使用GestureDetector分三步: 1. 定义GestureDetector类 2. 初始化手势类,同时设置手势监听 3. 将touch事件交给gesture...

王先森oO ⋅ 34分钟前 ⋅ 0

java 方法的执行时间监控 设置超时(Future 接口)

java 方法的执行时间监控 设置超时(Future 接口) import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor......

青峰Jun19er ⋅ 39分钟前 ⋅ 0

一名开源小白的Apache成长自述

今天收到了来自Apache Vote我成为Serviceomb项目Committer的邮件,代表自己的贡献得到了充分的肯定;除了感谢团队的给力支持,我更希望将自己的成长经历——如何践行Apache Way的心得介绍给大...

微服务框架 ⋅ 41分钟前 ⋅ 0

vim介绍、颜色显示和移动光标、一般模式下复制、剪切和粘贴

1.vim 是 vi 的升级版 vim 是带有颜色显示的 mini安装的系统,一般都不带有vim [root@aminglinux-128 ~]# yum install -y vim-enhanced已加载插件:fastestmirror, langpacksLoading mir...

oschina130111 ⋅ 41分钟前 ⋅ 0

Deepin 操作系统四面楚歌

作为国内做的最好的 Linux 发行版,源自 Debian sid 的 Deepin 目前正面临重重困境,新版本不断延期,开发人员离职,bug 长期得不到修复,和 Debian/Ubuntu 的兼容性问题也面临越来越严重的挑...

六库科技 ⋅ 42分钟前 ⋅ 0

MyBatis之动态sql

我们需要知道的是,使用mybatis重点是对sql的灵活解析和处理。在原先的UserMappser.xml中,我们这样查询表中满足条件的记录 : 123 <select id="findUserList" parameterType="userQuery...

瑟青豆 ⋅ 42分钟前 ⋅ 0

这届俄罗斯世界杯的冷门那么多怎么办?

最纯粹的世界杯,最神奇的大冷门。 德国0比1被墨西哥摩擦了。 日本历史性的赢了哥伦比亚。 C罗也挑平了西班牙。 梅西被冰岛狮吼吼愣神了。 就连11次进世界杯4强的巴西也被瑞士逼平了。 天台已...

开源中国众包平台 ⋅ 43分钟前 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部