文档章节

HDFS中的DataNode菊花链式接收数据

强子1985
 强子1985
发布于 2014/12/24 16:38
字数 973
阅读 48
收藏 0

public void run() {//数据节点运行

            try {

                DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));

                try {//获取输入流

                    byte op = (byte) in.read();//读取操作类型

                    if (op == OP_WRITE_BLOCK) {//如果是写一个块

                        /

                        // Read in the header

                        //

                        DataOutputStream reply = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));

                        try {//获取输出流

                            boolean shouldReportBlock = in.readBoolean();//读取一个boolean

                            Block b = new Block();

                            b.readFields(in);//创建一个新的block

                            int numTargets = in.readInt();//读取目标机器的个数

                            if (numTargets <= 0) {

                                throw new IOException("Mislabelled incoming datastream.");

                            }

                            DatanodeInfo targets[] = new DatanodeInfo[numTargets];//用来存储目标datanode节点信息

                            for (int i = 0; i < targets.length; i++) {//开始初始化it.

                                DatanodeInfo tmp = new DatanodeInfo();

                                tmp.readFields(in);

                                targets[i] = tmp;

                            }

                            byte encodingType = (byte) in.read();//获取encodingType

                            long len = in.readLong();//获取这个块的长度

                            //

                            // Make sure curTarget is equal to this machine

                            //

                            DatanodeInfo curTarget = targets[0];//获取第一个data节点

                            //

                            // Track all the places we've successfully written the block

                            //

                            Vector mirrors = new Vector();//???

                            //

                            // Open local disk out

                            //

                            DataOutputStream out = new DataOutputStream(new BufferedOutputStream(data.writeToBlock(b)));

//本地文件输出流

                            InetSocketAddress mirrorTarget = null;

                            try {

                                //

                                // Open network conn to backup machine, if 

                                // appropriate

                                //

                                DataInputStream in2 = null;

                                DataOutputStream out2 = null;

                                if (targets.length > 1) {

                                    // Connect to backup machine

                                    mirrorTarget = createSocketAddr(targets[1].getName().toString());//连接镜像节点

                                    try {

                                        Socket s2 = new Socket();

                                        s2.connect(mirrorTarget, READ_TIMEOUT);//连接镜像节点

                                        s2.setSoTimeout(READ_TIMEOUT);//设置超时时间

                                        out2 = new DataOutputStream(new BufferedOutputStream(s2.getOutputStream()));

                                        in2 = new DataInputStream(new BufferedInputStream(s2.getInputStream()));

                                        //获取与备份节点的2IO

                                        // Write connection header

                                        out2.write(OP_WRITE_BLOCK);//写操作类型

                                        out2.writeBoolean(shouldReportBlock);//boolean

                                        b.write(out2);//block

                                        out2.writeInt(targets.length - 1);//写剩下的几个datanode信息

                                        for (int i = 1; i < targets.length; i++) {

                                            targets[i].write(out2);

                                        }

                                        out2.write(encodingType);

                                        out2.writeLong(len);//数据长度

                                    } catch (IOException ie) {

                                        if (out2 != null) {

                                            try {

                                                out2.close();

                                                in2.close();

                                            } catch (IOException out2close) {

                                            } finally {

                                                out2 = null;

                                                in2 = null;

                                            }

                                        }

                                    }

                                }

                                //

                                // Process incoming data, copy to disk and

                                // maybe to network.

                                //万事俱备只欠数据

                                try {

                                    boolean anotherChunk = len != 0;//true

                                    byte buf[] = new byte[BUFFER_SIZE];//设置缓冲区

                                    while (anotherChunk) {

                                        while (len > 0) {//读取一些数据

                                            int bytesRead = in.read(buf, 0, (int)Math.min(buf.length, len));

                                            if (bytesRead < 0) {

                                              throw new EOFException("EOF reading from "+s.toString());

                                            }

                                            if (bytesRead > 0) {

                                                try {//写入到本地磁盘文件

                                                    out.write(buf, 0, bytesRead);

                                                } catch (IOException iex) {

                                                    shutdown();

                                                    throw iex;

                                                }

                                                if (out2 != null) {

                                                    try {//写入到网络以便让备份节点获取

                                                        out2.write(buf, 0, bytesRead);

                                                    } catch (IOException out2e) {

                                                        //

                                                        // If stream-copy fails, continue 

                                                        // writing to disk.  We shouldn't 

                                                        // interrupt client write.

                                                        //

                                                        try {

                                                            out2.close();

                                                            in2.close();

                                                        } catch (IOException out2close) {

                                                        } finally {

                                                            out2 = null;

                                                            in2 = null;

                                                        }

                                                    }

                                                }

                                                len -= bytesRead;//调整要写的字节数

                                            }

                                        }

                                        if (encodingType == RUNLENGTH_ENCODING) {

                                            anotherChunk = false;

                                        } else if (encodingType == CHUNKED_ENCODING) {

                                            len = in.readLong();//准备读取下一块block

                                            if (out2 != null) {

                                                out2.writeLong(len);

                                            }

                                            if (len == 0) {

                                                anotherChunk = false;

                                            }

                                        }

                                    }

                                    if (out2 == null) {

                                        LOG.info("Received block " + b + " from " + s.getInetAddress());

                                    } else {

                                        out2.flush();//输出缓冲区的内容

                                        long complete = in2.readLong();//得到备份节点的响应

                                        if (complete != WRITE_COMPLETE) {

                                            LOG.info("Conflicting value for WRITE_COMPLETE: " + complete);

                                        }

                                        LocatedBlock newLB = new LocatedBlock();

                                        newLB.readFields(in2);//得到block以及成功写到的节点信息

                                        DatanodeInfo mirrorsSoFar[] = newLB.getLocations();

                                        for (int k = 0; k < mirrorsSoFar.length; k++) {

                                            mirrors.add(mirrorsSoFar[k]);//更新到本地的mirrors

                                        }

                                        LOG.info("Received block " + b + " from " + s.getInetAddress() + " and mirrored to " + mirrorTarget);

                                    }

                                } finally {

                                    if (out2 != null) {//清理线程

                                        out2.close();

                                        in2.close();

                                    }

                                }

                            } finally {//清理现场

                                try {

                                    out.close();

                                } catch (IOException iex) {

                                    shutdown();

                                    throw iex;

                                }

                            }

                            data.finalizeBlock(b);//写入到dirTree结构中

                            // 

                            // Tell the namenode that we've received this block 

                            // in full, if we've been asked to.  This is done

                            // during NameNode-directed block transfers, but not

                            // client writes.

                            //

                            if (shouldReportBlock) {

                                synchronized (receivedBlockList) {

                                    receivedBlockList.add(b);//更新本地的receivedBlockList

                                    receivedBlockList.notifyAll();

                                }

                            }

                            //

                            // Tell client job is done, and reply with

                            // the new LocatedBlock.

                            //

                            reply.writeLong(WRITE_COMPLETE);//写结束

                            mirrors.add(curTarget);//把自己加上来

                            LocatedBlock newLB = new LocatedBlock(b, (DatanodeInfo[]) mirrors.toArray(new DatanodeInfo[mirrors.size()]));

                            newLB.write(reply);//返回给客户端

                        } finally {

                            reply.close();

                        }

                        //剩下的为读数据,忽略。

                    } else if (op == OP_READ_BLOCK || op == OP_READSKIP_BLOCK) {

                        //

                        // Read in the header

                        //

                        Block b = new Block();

                        b.readFields(in);

                        long toSkip = 0;

                        if (op == OP_READSKIP_BLOCK) {

                            toSkip = in.readLong();

                        }

                        //

                        // Open reply stream

                        //

                        DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));

                        try {

                            //

                            // Write filelen of -1 if error

                            //

                            if (! data.isValidBlock(b)) {

                                out.writeLong(-1);

                            } else {

                                //

                                // Get blockdata from disk

                                //

                                long len = data.getLength(b);

                                DataInputStream in2 = new DataInputStream(data.getBlockData(b));

                                out.writeLong(len);

                                if (op == OP_READSKIP_BLOCK) {

                                    if (toSkip > len) {

                                        toSkip = len;

                                    }

                                    long amtSkipped = 0;

                                    try {

                                        amtSkipped = in2.skip(toSkip);

                                    } catch (IOException iex) {

                                        shutdown();

                                        throw iex;

                                    }

                                    out.writeLong(amtSkipped);

                                }

                                byte buf[] = new byte[BUFFER_SIZE];

                                try {

                                    int bytesRead = 0;

                                    try {

                                        bytesRead = in2.read(buf);

                                    } catch (IOException iex) {

                                        shutdown();

                                        throw iex;

                                    }

                                    while (bytesRead >= 0) {

                                        out.write(buf, 0, bytesRead);

                                        len -= bytesRead;

                                        try {

                                            bytesRead = in2.read(buf);

                                        } catch (IOException iex) {

                                            shutdown();

                                            throw iex;

                                        }

                                    }

                                } catch (SocketException se) {

                                    // This might be because the reader

                                    // closed the stream early

                                } finally {

                                    try {

                                        in2.close();

                                    } catch (IOException iex) {

                                        shutdown();

                                        throw iex;

                                    }

                                }

                            }

                            LOG.info("Served block " + b + " to " + s.getInetAddress());

                        } finally {

                            out.close();

                        }

                    } else {

                        while (op >= 0) {

                            System.out.println("Faulty op: " + op);

                            op = (byte) in.read();

                        }

                        throw new IOException("Unknown opcode for incoming data stream");

                    }

                } finally {

                    in.close();

                }

            } catch (IOException ie) {

              LOG.log(Level.WARNING"DataXCeiver", ie);

            } finally {

                try {

                    s.close();

                } catch (IOException ie2) {

                }

            }

        }

    }

© 著作权归作者所有

共有 人打赏支持
强子1985

强子1985

粉丝 871
博文 1050
码字总数 745382
作品 8
南京
架构师
私信 提问
DADOOP集群HDFS工作机制

hdfs的工作机制 概述 1. HDFS集群分为两大角色:NameNode、DataNode (Secondary Namenode) 2. NameNode负责管理整个文件系统的元数据 3. DataNode 负责管理用户的文件数据块 4. 文件会按照固...

weixin_40747272
04/15
0
0
Hadoop分布式文件系统设计要点与架构

Hadoop分布式文件系统设计要点与架构 Hadoop简介:一个分布式系统基础架构,由Apache基金会开发。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力高速运算和存...

李伟铭k
07/09
0
0
hadoop(2.5,2.6) HDFS偶发性心跳异常以及大量DataXceiver线程被Blocked故障处理分享

一、概要 公司近期Storm清洗程序那边反应HDFS会出现偶发性的异常导致数据写不进HDFS,另外一些Spark作业在大规模往HDFS灌数据时客户端会出现各种“all datanode bad..”以及服务端出现各种t...

zengzhaozheng
2015/08/13
0
0
HDFS 原理、架构与特性介绍

本文主要讲述 HDFS原理-架构、副本机制、HDFS负载均衡、机架感知、健壮性、文件删除恢复机制 1:当前HDFS架构详尽分析 HDFS架构 •NameNode •DataNode •Sencondary NameNode 数据存储细节 ...

大数据之路
2013/08/11
0
1
HDFS系统架构

1:当前HDFS架构详尽分析 HDFS架构 •NameNode •DataNode •Sencondary NameNode 数据存储细节 NameNode 目录结构 Namenode 的目录结构: ${ dfs.name.dir}/current /VERSION /edits /fsim...

o0无忧亦无怖
2015/10/08
21
0

没有更多内容

加载失败,请刷新页面

加载更多

纹理与表面细节添加方法---凹凸映射

中国龙-扬科
8分钟前
0
0
20181115上课截图

小丑鱼00
16分钟前
0
0
初识css自定义属性

这算是一篇两篇文章译文的糅合体,旨在帮助理解css自定义属性。 今天,CSS预处理器是Web开发的标准。 预处理器的一个主要优点是它们使您能够使用变量, 这有助于您避免复制和粘贴代码,并简化...

嫣然丫丫丫
25分钟前
0
0
JAVA基础--session共享的前生今世

session共享的前生今世 Session及cookie基本概念及生命周期 session   当浏览器发起一个新的HTTP请求时,WEB服务端会主动创建一个session.并分配一个sessionID作为服务端识别客户端的一个标...

spinachgit
34分钟前
0
0
Deepin Linux 下把 UC 缓存视频变为 MP4 文件

本文是利用 FFMPEG 的功能实现的。 生成 file.txt文件 因为缓存文件都是数字,且文件夹内还有其他文件,包括 index.* 的文件。 $ ls -1v --hide=file.txt --hide=index* > file.txt 解释 ls...

不避风云
35分钟前
0
2

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部