文档章节

HDFS中的DataNode菊花链式接收数据

强子1985
 强子1985
发布于 2014/12/24 16:38
字数 973
阅读 47
收藏 0

public void run() {//数据节点运行

            try {

                DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));

                try {//获取输入流

                    byte op = (byte) in.read();//读取操作类型

                    if (op == OP_WRITE_BLOCK) {//如果是写一个块

                        /

                        // Read in the header

                        //

                        DataOutputStream reply = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));

                        try {//获取输出流

                            boolean shouldReportBlock = in.readBoolean();//读取一个boolean

                            Block b = new Block();

                            b.readFields(in);//创建一个新的block

                            int numTargets = in.readInt();//读取目标机器的个数

                            if (numTargets <= 0) {

                                throw new IOException("Mislabelled incoming datastream.");

                            }

                            DatanodeInfo targets[] = new DatanodeInfo[numTargets];//用来存储目标datanode节点信息

                            for (int i = 0; i < targets.length; i++) {//开始初始化it.

                                DatanodeInfo tmp = new DatanodeInfo();

                                tmp.readFields(in);

                                targets[i] = tmp;

                            }

                            byte encodingType = (byte) in.read();//获取encodingType

                            long len = in.readLong();//获取这个块的长度

                            //

                            // Make sure curTarget is equal to this machine

                            //

                            DatanodeInfo curTarget = targets[0];//获取第一个data节点

                            //

                            // Track all the places we've successfully written the block

                            //

                            Vector mirrors = new Vector();//???

                            //

                            // Open local disk out

                            //

                            DataOutputStream out = new DataOutputStream(new BufferedOutputStream(data.writeToBlock(b)));

//本地文件输出流

                            InetSocketAddress mirrorTarget = null;

                            try {

                                //

                                // Open network conn to backup machine, if 

                                // appropriate

                                //

                                DataInputStream in2 = null;

                                DataOutputStream out2 = null;

                                if (targets.length > 1) {

                                    // Connect to backup machine

                                    mirrorTarget = createSocketAddr(targets[1].getName().toString());//连接镜像节点

                                    try {

                                        Socket s2 = new Socket();

                                        s2.connect(mirrorTarget, READ_TIMEOUT);//连接镜像节点

                                        s2.setSoTimeout(READ_TIMEOUT);//设置超时时间

                                        out2 = new DataOutputStream(new BufferedOutputStream(s2.getOutputStream()));

                                        in2 = new DataInputStream(new BufferedInputStream(s2.getInputStream()));

                                        //获取与备份节点的2IO

                                        // Write connection header

                                        out2.write(OP_WRITE_BLOCK);//写操作类型

                                        out2.writeBoolean(shouldReportBlock);//boolean

                                        b.write(out2);//block

                                        out2.writeInt(targets.length - 1);//写剩下的几个datanode信息

                                        for (int i = 1; i < targets.length; i++) {

                                            targets[i].write(out2);

                                        }

                                        out2.write(encodingType);

                                        out2.writeLong(len);//数据长度

                                    } catch (IOException ie) {

                                        if (out2 != null) {

                                            try {

                                                out2.close();

                                                in2.close();

                                            } catch (IOException out2close) {

                                            } finally {

                                                out2 = null;

                                                in2 = null;

                                            }

                                        }

                                    }

                                }

                                //

                                // Process incoming data, copy to disk and

                                // maybe to network.

                                //万事俱备只欠数据

                                try {

                                    boolean anotherChunk = len != 0;//true

                                    byte buf[] = new byte[BUFFER_SIZE];//设置缓冲区

                                    while (anotherChunk) {

                                        while (len > 0) {//读取一些数据

                                            int bytesRead = in.read(buf, 0, (int)Math.min(buf.length, len));

                                            if (bytesRead < 0) {

                                              throw new EOFException("EOF reading from "+s.toString());

                                            }

                                            if (bytesRead > 0) {

                                                try {//写入到本地磁盘文件

                                                    out.write(buf, 0, bytesRead);

                                                } catch (IOException iex) {

                                                    shutdown();

                                                    throw iex;

                                                }

                                                if (out2 != null) {

                                                    try {//写入到网络以便让备份节点获取

                                                        out2.write(buf, 0, bytesRead);

                                                    } catch (IOException out2e) {

                                                        //

                                                        // If stream-copy fails, continue 

                                                        // writing to disk.  We shouldn't 

                                                        // interrupt client write.

                                                        //

                                                        try {

                                                            out2.close();

                                                            in2.close();

                                                        } catch (IOException out2close) {

                                                        } finally {

                                                            out2 = null;

                                                            in2 = null;

                                                        }

                                                    }

                                                }

                                                len -= bytesRead;//调整要写的字节数

                                            }

                                        }

                                        if (encodingType == RUNLENGTH_ENCODING) {

                                            anotherChunk = false;

                                        } else if (encodingType == CHUNKED_ENCODING) {

                                            len = in.readLong();//准备读取下一块block

                                            if (out2 != null) {

                                                out2.writeLong(len);

                                            }

                                            if (len == 0) {

                                                anotherChunk = false;

                                            }

                                        }

                                    }

                                    if (out2 == null) {

                                        LOG.info("Received block " + b + " from " + s.getInetAddress());

                                    } else {

                                        out2.flush();//输出缓冲区的内容

                                        long complete = in2.readLong();//得到备份节点的响应

                                        if (complete != WRITE_COMPLETE) {

                                            LOG.info("Conflicting value for WRITE_COMPLETE: " + complete);

                                        }

                                        LocatedBlock newLB = new LocatedBlock();

                                        newLB.readFields(in2);//得到block以及成功写到的节点信息

                                        DatanodeInfo mirrorsSoFar[] = newLB.getLocations();

                                        for (int k = 0; k < mirrorsSoFar.length; k++) {

                                            mirrors.add(mirrorsSoFar[k]);//更新到本地的mirrors

                                        }

                                        LOG.info("Received block " + b + " from " + s.getInetAddress() + " and mirrored to " + mirrorTarget);

                                    }

                                } finally {

                                    if (out2 != null) {//清理线程

                                        out2.close();

                                        in2.close();

                                    }

                                }

                            } finally {//清理现场

                                try {

                                    out.close();

                                } catch (IOException iex) {

                                    shutdown();

                                    throw iex;

                                }

                            }

                            data.finalizeBlock(b);//写入到dirTree结构中

                            // 

                            // Tell the namenode that we've received this block 

                            // in full, if we've been asked to.  This is done

                            // during NameNode-directed block transfers, but not

                            // client writes.

                            //

                            if (shouldReportBlock) {

                                synchronized (receivedBlockList) {

                                    receivedBlockList.add(b);//更新本地的receivedBlockList

                                    receivedBlockList.notifyAll();

                                }

                            }

                            //

                            // Tell client job is done, and reply with

                            // the new LocatedBlock.

                            //

                            reply.writeLong(WRITE_COMPLETE);//写结束

                            mirrors.add(curTarget);//把自己加上来

                            LocatedBlock newLB = new LocatedBlock(b, (DatanodeInfo[]) mirrors.toArray(new DatanodeInfo[mirrors.size()]));

                            newLB.write(reply);//返回给客户端

                        } finally {

                            reply.close();

                        }

                        //剩下的为读数据,忽略。

                    } else if (op == OP_READ_BLOCK || op == OP_READSKIP_BLOCK) {

                        //

                        // Read in the header

                        //

                        Block b = new Block();

                        b.readFields(in);

                        long toSkip = 0;

                        if (op == OP_READSKIP_BLOCK) {

                            toSkip = in.readLong();

                        }

                        //

                        // Open reply stream

                        //

                        DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));

                        try {

                            //

                            // Write filelen of -1 if error

                            //

                            if (! data.isValidBlock(b)) {

                                out.writeLong(-1);

                            } else {

                                //

                                // Get blockdata from disk

                                //

                                long len = data.getLength(b);

                                DataInputStream in2 = new DataInputStream(data.getBlockData(b));

                                out.writeLong(len);

                                if (op == OP_READSKIP_BLOCK) {

                                    if (toSkip > len) {

                                        toSkip = len;

                                    }

                                    long amtSkipped = 0;

                                    try {

                                        amtSkipped = in2.skip(toSkip);

                                    } catch (IOException iex) {

                                        shutdown();

                                        throw iex;

                                    }

                                    out.writeLong(amtSkipped);

                                }

                                byte buf[] = new byte[BUFFER_SIZE];

                                try {

                                    int bytesRead = 0;

                                    try {

                                        bytesRead = in2.read(buf);

                                    } catch (IOException iex) {

                                        shutdown();

                                        throw iex;

                                    }

                                    while (bytesRead >= 0) {

                                        out.write(buf, 0, bytesRead);

                                        len -= bytesRead;

                                        try {

                                            bytesRead = in2.read(buf);

                                        } catch (IOException iex) {

                                            shutdown();

                                            throw iex;

                                        }

                                    }

                                } catch (SocketException se) {

                                    // This might be because the reader

                                    // closed the stream early

                                } finally {

                                    try {

                                        in2.close();

                                    } catch (IOException iex) {

                                        shutdown();

                                        throw iex;

                                    }

                                }

                            }

                            LOG.info("Served block " + b + " to " + s.getInetAddress());

                        } finally {

                            out.close();

                        }

                    } else {

                        while (op >= 0) {

                            System.out.println("Faulty op: " + op);

                            op = (byte) in.read();

                        }

                        throw new IOException("Unknown opcode for incoming data stream");

                    }

                } finally {

                    in.close();

                }

            } catch (IOException ie) {

              LOG.log(Level.WARNING"DataXCeiver", ie);

            } finally {

                try {

                    s.close();

                } catch (IOException ie2) {

                }

            }

        }

    }

© 著作权归作者所有

共有 人打赏支持
强子1985

强子1985

粉丝 862
博文 935
码字总数 645914
作品 8
南京
架构师
DADOOP集群HDFS工作机制

hdfs的工作机制 概述 1. HDFS集群分为两大角色:NameNode、DataNode (Secondary Namenode) 2. NameNode负责管理整个文件系统的元数据 3. DataNode 负责管理用户的文件数据块 4. 文件会按照固...

weixin_40747272
04/15
0
0
Hadoop分布式文件系统设计要点与架构

Hadoop分布式文件系统设计要点与架构 Hadoop简介:一个分布式系统基础架构,由Apache基金会开发。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力高速运算和存...

李伟铭k
07/09
0
0
HDFS 原理、架构与特性介绍

本文主要讲述 HDFS原理-架构、副本机制、HDFS负载均衡、机架感知、健壮性、文件删除恢复机制 1:当前HDFS架构详尽分析 HDFS架构 •NameNode •DataNode •Sencondary NameNode 数据存储细节 ...

大数据之路
2013/08/11
0
1
深刻理解HDFS工作机制

 深入理解一个技术的工作机制是灵活运用和快速解决问题的根本方法,也是唯一途径。对于HDFS来说除了要明白它的应用场景和用法以及通用分布式架构之外更重要的是理解关键步骤的原理和实现细节...

张涛泽
2017/04/19
0
0
HDFS系统架构

1:当前HDFS架构详尽分析 HDFS架构 •NameNode •DataNode •Sencondary NameNode 数据存储细节 NameNode 目录结构 Namenode 的目录结构: ${ dfs.name.dir}/current /VERSION /edits /fsim...

o0无忧亦无怖
2015/10/08
21
0

没有更多内容

加载失败,请刷新页面

加载更多

20180920 rzsz传输文件、用户和用户组相关配置文件与管理

利用rz、sz实现Linux与Windows互传文件 [root@centos01 ~]# yum install -y lrzsz # 安装工具sz test.txt # 弹出对话框,传递到选择的路径下rz # 回车后,会从对话框中选择对应的文件传递...

野雪球
今天
1
0
OSChina 周四乱弹 —— 毒蛇当辣条

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @ 达尔文:分享花澤香菜/前野智昭/小野大輔/井上喜久子的单曲《ミッション! 健?康?第?イチ》 《ミッション! 健?康?第?イチ》- 花澤香菜/前野智...

小小编辑
今天
7
3
java -jar运行内存设置

java -Xms64m #JVM启动时的初始堆大小 -Xmx128m #最大堆大小 -Xmn64m #年轻代的大小,其余的空间是老年代 -XX:MaxMetaspaceSize=128m # -XX:CompressedClassSpaceSize=6...

李玉长
今天
3
0
Spring | 手把手教你SSM最优雅的整合方式

HEY 本节主要内容为:基于Spring从0到1搭建一个web工程,适合初学者,Java初级开发者。欢迎与我交流。 MODULE 新建一个Maven工程。 不论你是什么工具,选这个就可以了,然后next,直至finis...

冯文议
今天
2
0
RxJS的另外四种实现方式(四)——性能最高的库(续)

接上一篇RxJS的另外四种实现方式(三)——性能最高的库 上一篇文章我展示了这个最高性能库的实现方法。下面我介绍一下这个性能提升的秘密。 首先,为了弄清楚Most库究竟为何如此快,我必须借...

一个灰
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部