文档章节

HDFS中的DataNode菊花链式接收数据

强子大叔的码田
 强子大叔的码田
发布于 2014/12/24 16:38
字数 973
阅读 54
收藏 0

钉钉、微博极速扩容黑科技,点击观看阿里云弹性计算年度发布会!>>>

public void run() {//数据节点运行

            try {

                DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));

                try {//获取输入流

                    byte op = (byte) in.read();//读取操作类型

                    if (op == OP_WRITE_BLOCK) {//如果是写一个块

                        /

                        // Read in the header

                        //

                        DataOutputStream reply = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));

                        try {//获取输出流

                            boolean shouldReportBlock = in.readBoolean();//读取一个boolean

                            Block b = new Block();

                            b.readFields(in);//创建一个新的block

                            int numTargets = in.readInt();//读取目标机器的个数

                            if (numTargets <= 0) {

                                throw new IOException("Mislabelled incoming datastream.");

                            }

                            DatanodeInfo targets[] = new DatanodeInfo[numTargets];//用来存储目标datanode节点信息

                            for (int i = 0; i < targets.length; i++) {//开始初始化it.

                                DatanodeInfo tmp = new DatanodeInfo();

                                tmp.readFields(in);

                                targets[i] = tmp;

                            }

                            byte encodingType = (byte) in.read();//获取encodingType

                            long len = in.readLong();//获取这个块的长度

                            //

                            // Make sure curTarget is equal to this machine

                            //

                            DatanodeInfo curTarget = targets[0];//获取第一个data节点

                            //

                            // Track all the places we've successfully written the block

                            //

                            Vector mirrors = new Vector();//???

                            //

                            // Open local disk out

                            //

                            DataOutputStream out = new DataOutputStream(new BufferedOutputStream(data.writeToBlock(b)));

//本地文件输出流

                            InetSocketAddress mirrorTarget = null;

                            try {

                                //

                                // Open network conn to backup machine, if 

                                // appropriate

                                //

                                DataInputStream in2 = null;

                                DataOutputStream out2 = null;

                                if (targets.length > 1) {

                                    // Connect to backup machine

                                    mirrorTarget = createSocketAddr(targets[1].getName().toString());//连接镜像节点

                                    try {

                                        Socket s2 = new Socket();

                                        s2.connect(mirrorTarget, READ_TIMEOUT);//连接镜像节点

                                        s2.setSoTimeout(READ_TIMEOUT);//设置超时时间

                                        out2 = new DataOutputStream(new BufferedOutputStream(s2.getOutputStream()));

                                        in2 = new DataInputStream(new BufferedInputStream(s2.getInputStream()));

                                        //获取与备份节点的2IO

                                        // Write connection header

                                        out2.write(OP_WRITE_BLOCK);//写操作类型

                                        out2.writeBoolean(shouldReportBlock);//boolean

                                        b.write(out2);//block

                                        out2.writeInt(targets.length - 1);//写剩下的几个datanode信息

                                        for (int i = 1; i < targets.length; i++) {

                                            targets[i].write(out2);

                                        }

                                        out2.write(encodingType);

                                        out2.writeLong(len);//数据长度

                                    } catch (IOException ie) {

                                        if (out2 != null) {

                                            try {

                                                out2.close();

                                                in2.close();

                                            } catch (IOException out2close) {

                                            } finally {

                                                out2 = null;

                                                in2 = null;

                                            }

                                        }

                                    }

                                }

                                //

                                // Process incoming data, copy to disk and

                                // maybe to network.

                                //万事俱备只欠数据

                                try {

                                    boolean anotherChunk = len != 0;//true

                                    byte buf[] = new byte[BUFFER_SIZE];//设置缓冲区

                                    while (anotherChunk) {

                                        while (len > 0) {//读取一些数据

                                            int bytesRead = in.read(buf, 0, (int)Math.min(buf.length, len));

                                            if (bytesRead < 0) {

                                              throw new EOFException("EOF reading from "+s.toString());

                                            }

                                            if (bytesRead > 0) {

                                                try {//写入到本地磁盘文件

                                                    out.write(buf, 0, bytesRead);

                                                } catch (IOException iex) {

                                                    shutdown();

                                                    throw iex;

                                                }

                                                if (out2 != null) {

                                                    try {//写入到网络以便让备份节点获取

                                                        out2.write(buf, 0, bytesRead);

                                                    } catch (IOException out2e) {

                                                        //

                                                        // If stream-copy fails, continue 

                                                        // writing to disk.  We shouldn't 

                                                        // interrupt client write.

                                                        //

                                                        try {

                                                            out2.close();

                                                            in2.close();

                                                        } catch (IOException out2close) {

                                                        } finally {

                                                            out2 = null;

                                                            in2 = null;

                                                        }

                                                    }

                                                }

                                                len -= bytesRead;//调整要写的字节数

                                            }

                                        }

                                        if (encodingType == RUNLENGTH_ENCODING) {

                                            anotherChunk = false;

                                        } else if (encodingType == CHUNKED_ENCODING) {

                                            len = in.readLong();//准备读取下一块block

                                            if (out2 != null) {

                                                out2.writeLong(len);

                                            }

                                            if (len == 0) {

                                                anotherChunk = false;

                                            }

                                        }

                                    }

                                    if (out2 == null) {

                                        LOG.info("Received block " + b + " from " + s.getInetAddress());

                                    } else {

                                        out2.flush();//输出缓冲区的内容

                                        long complete = in2.readLong();//得到备份节点的响应

                                        if (complete != WRITE_COMPLETE) {

                                            LOG.info("Conflicting value for WRITE_COMPLETE: " + complete);

                                        }

                                        LocatedBlock newLB = new LocatedBlock();

                                        newLB.readFields(in2);//得到block以及成功写到的节点信息

                                        DatanodeInfo mirrorsSoFar[] = newLB.getLocations();

                                        for (int k = 0; k < mirrorsSoFar.length; k++) {

                                            mirrors.add(mirrorsSoFar[k]);//更新到本地的mirrors

                                        }

                                        LOG.info("Received block " + b + " from " + s.getInetAddress() + " and mirrored to " + mirrorTarget);

                                    }

                                } finally {

                                    if (out2 != null) {//清理线程

                                        out2.close();

                                        in2.close();

                                    }

                                }

                            } finally {//清理现场

                                try {

                                    out.close();

                                } catch (IOException iex) {

                                    shutdown();

                                    throw iex;

                                }

                            }

                            data.finalizeBlock(b);//写入到dirTree结构中

                            // 

                            // Tell the namenode that we've received this block 

                            // in full, if we've been asked to.  This is done

                            // during NameNode-directed block transfers, but not

                            // client writes.

                            //

                            if (shouldReportBlock) {

                                synchronized (receivedBlockList) {

                                    receivedBlockList.add(b);//更新本地的receivedBlockList

                                    receivedBlockList.notifyAll();

                                }

                            }

                            //

                            // Tell client job is done, and reply with

                            // the new LocatedBlock.

                            //

                            reply.writeLong(WRITE_COMPLETE);//写结束

                            mirrors.add(curTarget);//把自己加上来

                            LocatedBlock newLB = new LocatedBlock(b, (DatanodeInfo[]) mirrors.toArray(new DatanodeInfo[mirrors.size()]));

                            newLB.write(reply);//返回给客户端

                        } finally {

                            reply.close();

                        }

                        //剩下的为读数据,忽略。

                    } else if (op == OP_READ_BLOCK || op == OP_READSKIP_BLOCK) {

                        //

                        // Read in the header

                        //

                        Block b = new Block();

                        b.readFields(in);

                        long toSkip = 0;

                        if (op == OP_READSKIP_BLOCK) {

                            toSkip = in.readLong();

                        }

                        //

                        // Open reply stream

                        //

                        DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));

                        try {

                            //

                            // Write filelen of -1 if error

                            //

                            if (! data.isValidBlock(b)) {

                                out.writeLong(-1);

                            } else {

                                //

                                // Get blockdata from disk

                                //

                                long len = data.getLength(b);

                                DataInputStream in2 = new DataInputStream(data.getBlockData(b));

                                out.writeLong(len);

                                if (op == OP_READSKIP_BLOCK) {

                                    if (toSkip > len) {

                                        toSkip = len;

                                    }

                                    long amtSkipped = 0;

                                    try {

                                        amtSkipped = in2.skip(toSkip);

                                    } catch (IOException iex) {

                                        shutdown();

                                        throw iex;

                                    }

                                    out.writeLong(amtSkipped);

                                }

                                byte buf[] = new byte[BUFFER_SIZE];

                                try {

                                    int bytesRead = 0;

                                    try {

                                        bytesRead = in2.read(buf);

                                    } catch (IOException iex) {

                                        shutdown();

                                        throw iex;

                                    }

                                    while (bytesRead >= 0) {

                                        out.write(buf, 0, bytesRead);

                                        len -= bytesRead;

                                        try {

                                            bytesRead = in2.read(buf);

                                        } catch (IOException iex) {

                                            shutdown();

                                            throw iex;

                                        }

                                    }

                                } catch (SocketException se) {

                                    // This might be because the reader

                                    // closed the stream early

                                } finally {

                                    try {

                                        in2.close();

                                    } catch (IOException iex) {

                                        shutdown();

                                        throw iex;

                                    }

                                }

                            }

                            LOG.info("Served block " + b + " to " + s.getInetAddress());

                        } finally {

                            out.close();

                        }

                    } else {

                        while (op >= 0) {

                            System.out.println("Faulty op: " + op);

                            op = (byte) in.read();

                        }

                        throw new IOException("Unknown opcode for incoming data stream");

                    }

                } finally {

                    in.close();

                }

            } catch (IOException ie) {

              LOG.log(Level.WARNING"DataXCeiver", ie);

            } finally {

                try {

                    s.close();

                } catch (IOException ie2) {

                }

            }

        }

    }

强子大叔的码田

强子大叔的码田

粉丝 924
博文 1630
码字总数 1283214
作品 9
南京
架构师
私信 提问
加载中
请先登录后再评论。
HDFS浅析

HDFS简介 HDFS(Hadoop Distributed File System),是一个分布式的文件系统,用来分布式存放海量数据,该文件系统可以存在于廉价机器上,具有高容错性。 HDFS工作流程 以上传文件为例 架构图...

特拉仔
2018/05/15
71
0
大数据面试知识点分析(一)

为了保证效率和质量,每篇文章发布6个知识点,由简单及难,我们从HDFS开始: 1)如何杀死一个job hadoop job –list./hadoop job -kill job20121211162811166 2)删除hdfs上的/tmp/xxx目录 ...

Rople
2018/03/15
61
0
_HDFS初探

不抄袭 ,直接点击http://www.cnblogs.com/xia520pi/archive/2012/05/28/2520813.html 场景简析: 1.hdfs在 适量文件&非实时场景 下表现最好 2.典型hdfs为单Master模式,能承载的文件数约=内...

深蓝苹果
2014/01/28
72
0
大数据学习之四——HDFS

1.Hadoop与HDFS的关系 Hadoop实现了一个分布式文件系统,即Hadoop Distributed File System,简称HDFS。对外部客户机而言,HDFS就像一个传统的分级文件系统,所以,很多时候,我们也叫它DFS...

osc_1jqxkpyl
2018/01/24
2
0
hadoop读写文件流程

1.HDFS-写文件 1.客户端将文件写入本地磁盘的临时文件中 2.当临时文件大小达到一个block大小时,HDFS client通知NameNode,申请写入文件 3.NameNode在HDFS的文件系统中创建一个文件,并把该b...

棉花糖_霜
2018/01/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

插入,在PostgreSQL中重复更新吗? - Insert, on duplicate update in PostgreSQL?

问题: Several months ago I learned from an answer on Stack Overflow how to perform multiple updates at once in MySQL using the following syntax: 几个月前,我从关于堆栈溢出的答案......

技术盛宴
32分钟前
20
0
互联网的寒冬下各大一线互联网公司还在用SpringBoot这是为什么?

引言 现在各大技术社区 Spring Boot 的文章越来越多,Spring Boot 相关的图文、视频教程越来越多,使用 Spring Boot 的互联网公司也越来越多; Java 程序员现在出去面试, Spring Boot 已经成...

北柠Java
35分钟前
8
0
vue+elementui实现简易的列筛选功能实现。

一、简易效果图: 二、需求背景 大家都知道,后管类系统当中,有时一个列表可能有很多列需要展示,如下图所示,但是用户在使用系统的时候,往往会需要针对其中某几列进行数据提取,在展示列比...

一生懸命吧
37分钟前
60
0
批处理问题记录——数字实验bat

记录学习批处理时的问题 批处理为输入一个数字,如果大于等于一百,直接输出输入数字,如果小于一百会重复+1,直到100后输出。 问题是,如果不输入数字,直接空格的话,批处理会出错。 寻求一...

愤怒的乌老大
44分钟前
6
0
算法题汇总

计算两个字符串中的最大的相同字符串

佳幂小煜
54分钟前
27
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部