文档章节

HDFS如何实现本地文件上传?

强子哥哥
 强子哥哥
发布于 2014/12/15 17:52
字数 2120
阅读 389
收藏 8
点赞 0
评论 0

上传时,涉及到很多IO类,但是最关键的类就是:DFSOutputStream这个类。【0.1.0】

下面就开始分析这个类的代码。

-------------------------------------------类结构分析

 

-------------------------------------------具体函数分析

public synchronized void write(byte b[], int off, int len)

        throws IOException {

            if (closed) { //校验是否关闭了,关闭了自然不应该再写入数据了

                throw new IOException("Stream closed");

            }

            while (len > 0) { //这里的len就是指源缓冲区剩下的未写完的数据长度,单位byte

              int remaining = BUFFER_SIZE - pos; //目的缓冲区里可以写的字节数

              int toWrite = Math.min(remaining, len); //跟需要写的字节数比较,取较小值作为真正要写入的字节数

              System.arraycopy(b, off, outBufpos, toWrite); //开始复制来作为写入到目的缓冲区操作

              pos += toWrite; //更新目的缓冲区位置指针

              off += toWrite; //更新源缓冲区位置指针

              len -= toWrite; //更新源缓冲区剩下的内容长度

              filePos += toWrite; //计算整个文件的总的已经写入的长度(包括缓冲区里的内容)

              if ((bytesWrittenToBlock + pos >= BLOCK_SIZE) ||

                  (pos == BUFFER_SIZE)) {

                flush(); //这里是2个条件引起flush,一个是总长度(已写+缓存)超过一个块大小,

                                              //第2个就是目的缓冲区已经满了,都么空间写入了,自然需要flush了。

              }

            }

        }

 //友情提醒,这里的前半段写入是能写多少写多少,写完了再判断!

为啥有2个判断条件?想必很多人对缓冲区满了很好理解,因为都没剩余空间了

而对bytesWrittenToBlock + pos >= BLOCK_SIZE可能不是很清楚

这是因为一个Block写满了就要另起炉灶,重新开一个Block.

flush()函数暂时不解释,后面再解释!

 

---

 

 

 public synchronized void write(int b) throws IOException {

            if (closed) {//仍然是校验是否关闭

                throw new IOException("Stream closed");

            }

            if ((bytesWrittenToBlock + pos == BLOCK_SIZE) ||

                (pos >= BUFFER_SIZE)) {

                flush();

            }//仍然是2个条件的校验

            outBuf[pos++] = (byte) b;

            filePos++;//2句的意义在于真正的写入到目的缓冲区里

                         不过为啥不把这2段调一下顺序更好理解?果然思维独特!

        }

 

---

 public synchronized void flush() throws IOException {

            if (closed) {

                throw new IOException("Stream closed");

            }//检验是否关闭,老规矩

            if (bytesWrittenToBlock + pos >= BLOCK_SIZE) {

                flushData(BLOCK_SIZE - bytesWrittenToBlock);

            }//如果需要新起1Block的话,就把剩下的不足字节数先写上

            if (bytesWrittenToBlock == BLOCK_SIZE) {

                endBlock();//然后关闭当前块,新起一块

            }

            flushData(pos);//对当前块继续写剩下的

        } 

 

---

 

继续看别的函数

在看别的函数之前,首先希望读者先建立一个0.1.0中文件的存储机制。

在读取本地文件上传到HDFS中,文件流是这样的。

本地文件--->本地内存缓冲区Buffer--->本地文件--->上传到远程HDFS系统。

本地内存缓冲区Buffer--->本地文件就是flushData做的事情,请再复习下flush函数,然后再接下来分析flushData.

PS:看代码比写代码累,看代码是了解别人的思维,写代码是把自己的思维实现起来。。。 

private synchronized void flushData(int maxPos) throws IOException {

            int workingPos = Math.min(pos, maxPos);//计算要写入的字节数,真是多此一举。

            

            if (workingPos > 0) {//如果确实需要写的话

                //

                // To the local block backup, write just the bytes

                //

                backupStream.write(outBuf, 0, workingPos);//写入到本地文件

                //注意,请认真阅读backupStream的初始化过程,是一个本地文件。

                //也就是说计划把内存缓冲区里的内容写到本地文件中,写完一个block再发送给HDFS.

                //聪明的读者应该想到最后一个block的大小是<=blockSize的。

                // Track position

                //

                bytesWrittenToBlock += workingPos;//更新写入到block块的字节数,

                //尤其要强调,当一个块结束后,这个变量就会重置为0,你懂的。

                System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);

                //字节前挪移到偏移量为0的位置,方便后面IO操作,你懂得,不解释。

                pos -= workingPos;//相关变量都需要更新

            }

        }

 

---------------

接下来到了比较核心的函数endBlock(); 意思是关闭当前块,新起一块,下面来看看具体的代码!

 

private synchronized void endBlock() throws IOException {

            //

            // Done with local copy

            //

            backupStream.close();//关闭本地文件系统的临时文件

            //

            // Send it to datanode//准备发送给datanode了。

            //

            boolean mustRecover = true;//定义一个哨兵变量

            while (mustRecover) {//需要读取当前文件时

                nextBlockOutputStream();

           因为这个函数到后面才分析,所以提把背景知识补充好,这个函数

           主要是初始化了一对IO流句柄,这个流是当前shell和远程datanode

           之间的TCP连接,这对IO流句柄就是 blockStream + blockReplyStream

           分别对应着输出流和输入流,输出流用来输出文件头和文件内容,输入流是

           用来读取响应。 

                InputStream in = new FileInputStream(backupFile);//既然第一行关闭了写,

                现在就可以开始读了

                try {

                    byte buf[] = new byte[BUFFER_SIZE];//还是局部的IO缓冲区

                    int bytesRead = in.read(buf);//从本地文件中读取内容

                    while (bytesRead > 0) {//大于0

                        blockStream.writeLong((long) bytesRead);//写入字节数

                        blockStream.write(buf, 0, bytesRead);//写入缓冲区的内容

                        bytesRead = in.read(buf);//继续从本地文件中读取

                    }

                    internalClose();//NameNodeDataNode的交互,表示关闭

                    mustRecover = false;//表示任务结束

                } catch (IOException ie) {

                    handleSocketException(ie);

                } finally {

                  in.close();//关闭当前文件的输入流

                }

            }

            //

            // Delete local backup, start new one

            //下面4行是从新建立起本地文件系统的文件缓冲系统,不解释

            backupFile.delete();

            backupFile = newBackupFile();

            backupStream = new FileOutputStream(backupFile);

            bytesWrittenToBlock = 0;

        }

在阅读以上代码之后,我个人认为如果用C语言来写这段逻辑的话,我会直接调用sendfile来实现文件传输。

当然JAVA的API滞后性以及OS当时或许都不提供这种方式吧,反正现在的内核都提供了。

 

---------------------------------------

 那么接下来分析的是函数:nextBlockOutputStream()

 

private synchronized void nextBlockOutputStream() throws IOException {

            boolean retry = false;//不解释

            long start = System.currentTimeMillis();//当前开始时间

            do {

                retry = false;//重置为false 

                

                long localstart = System.currentTimeMillis();//当前开始时间

                boolean blockComplete = false;//标注块是否OK

                LocatedBlock lb = null;    //初始化为null          

                while (! blockComplete) {//如果未结束

                    if (firstTime) {//如果是第一次开启一个文件

                        lb = namenode.create(src.toString(), clientName.toString(), localNameoverwrite);//创建一个文件 

                    } else {

                        lb = namenode.addBlock(src.toString(), localName);

                    }//增加一个block

                    if (lb == null) {//如果找不到

                        try {

                            Thread.sleep(400);//就沉睡400毫秒

                            if (System.currentTimeMillis() - localstart > 5000) {

                                LOG.info("Waiting to find new output block node for " + (System.currentTimeMillis() - start) + "ms");

                            }

                        } catch (InterruptedException ie) {

                        }

                    } else {

                        blockComplete = true;//设置blockCompletetrue.解释为找到了一个block

                    }

                }

                block = lb.getBlock();//lb中获取block的信息

                DatanodeInfo nodes[] = lb.getLocations();//lb中获取block要存储的DataNode数组

                //

                // Connect to first DataNode in the list.  Abort if this fails.

                //请注意上面这句的意思:连接第一个数据节点,

                //为啥?数据传输采用计算机组成原理的菊花链模式

                InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName().toString());//解析

                try {

                    s = new Socket();

                    s.connect(target, READ_TIMEOUT);//连接第一个DataNode

                    s.setSoTimeout(READ_TIMEOUT);//设置读取时间

                } catch (IOException ie) {//异常这里就不分析了

                    // Connection failed.  Let's wait a little bit and retry

                    try {

                        if (System.currentTimeMillis() - start > 5000) {

                            LOG.info("Waiting to find target node: " + target);

                        }

                        Thread.sleep(6000);

                    } catch (InterruptedException iex) {

                    }

                    if (firstTime) {

                        namenode.abandonFileInProgress(src.toString());

                    } else {

                        namenode.abandonBlock(blocksrc.toString());

                    }

                    retry = true;

                    continue;

                }

                //此时已经成功连接到了远程DataNode节点,bingo!

                // Xmit header info to datanode

                //

                DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));

//获取输出流句柄

                out.write(OP_WRITE_BLOCK);//输出行为标识

                out.writeBoolean(false);//false?

                block.write(out);//写入block信息,注意:是把从namenode获取到的block写给DataNode

                out.writeInt(nodes.length);//这一样和下面这一行是为了写入所有存储及备份的DataNode

                for (int i = 0; i < nodes.length; i++) {

                    nodes[i].write(out);//不解释

                }

                out.write(CHUNKED_ENCODING);//CHUNKED_ENCODING

                bytesWrittenToBlock = 0;//重置为0

                blockStream = out;//把句柄赋值给类的局部变量供后续使用

                blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));//同理,不解释

            } while (retry);

            firstTime = false;//firstTime在至少有一个块信息返回后就为false

 

=================================================== 

接下来要分析的函数是

 

private synchronized void internalClose() throws IOException {

            blockStream.writeLong(0);//表明长度结束了

            blockStream.flush();//把缓冲内容全部输出。

            long complete = blockReplyStream.readLong();//读取响应

            if (complete != WRITE_COMPLETE) {//如果不是结束

                LOG.info("Did not receive WRITE_COMPLETE flag: " + complete);

                throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete);

            }

                    

            LocatedBlock lb = new LocatedBlock();//创建一个新对象

            lb.readFields(blockReplyStream);//根据响应流来赋值

            namenode.reportWrittenBlock(lb);//namenode报告写入成功

            s.close();//关闭此流

            s = null;

        }

 

================

最后就是close函数

 

public synchronized void close() throws IOException {

            if (closed) {

                throw new IOException("Stream closed");

            }//校验是否关闭了

            flush();//尽可能的输出内容

            if (filePos == 0 || bytesWrittenToBlock != 0) {

              try {

                endBlock();//结束一个块

              } catch (IOException e) {

                namenode.abandonFileInProgress(src.toString());//抛弃此file

                throw e;

              }

            }

            backupStream.close();//关闭流

            backupFile.delete();//删除文件

            if (s != null) {

                s.close();//不解释

                s = null;

            }

            super.close();

            long localstart = System.currentTimeMillis();

            boolean fileComplete = false;

            while (! fileComplete) {//循环报告文件写完了

                fileComplete = namenode.complete(src.toString(), clientName.toString());

                if (!fileComplete) {

                    try {

                        Thread.sleep(400);

                        if (System.currentTimeMillis() - localstart > 5000) {

                            LOG.info("Could not complete file, retrying...");

                        }

                    } catch (InterruptedException ie) {

                    }

                }

            }

            closed = true;

        }

 

 

© 著作权归作者所有

共有 人打赏支持
强子哥哥

强子哥哥

粉丝 856
博文 551
码字总数 647493
作品 8
南京
架构师
HDFS的Shell访问和Java API访问

Shell访问HDFS常用命令 1.shell 操作单个 HDFS 集群 下面列举出几个常用场景下的命令。 1、创建文件夹 HDFS 上的文件目录结构类似 Linux,根目录使用 "/" 表示。下面的命令将在 /middle 目录...

等待救赎 ⋅ 2015/10/19 ⋅ 0

hadoop的体系结构 hadoop hdfs 命令

hadoop的体系结构 NameNode - 主节点主服务器 SecondaryNameNode– 是辅助nameNode DataNode -数据保存用的 TaskTracker – 接收任务 JobTracker - 分数据 -100M Datanode1,DataNode2,DataNo...

八戒_o ⋅ 2015/12/03 ⋅ 0

HDFS的运行原理,如何实现HDFS的高可用

1 HDFS的运行机制 HDFS集群中的节点分为两种角色,一种角色负责管理整个集群的元数据,是名称节点(name node);另一种角色负责存储文件数据块和管理文件数据块,是数据节点(datanode)。 1....

qq_31598113 ⋅ 2017/04/05 ⋅ 0

Hadoop之MapReduce的Java实现

今天将为大家演示一下,Hadoop中MR用Java是如何编码实现的。 1.环境准备 1.1 需要把下载的hadoop包解压到windows目录下,注意不要有空格目录或者中文字符 image.png 1.2 配置环境变量 配置系...

landy8530 ⋅ 2017/11/25 ⋅ 0

上传本地文件到hdfs

前提是安装配置好hadoop环境 1,命令上传 hdfs dfs -put /home/minstsvmlight.txt /home/minstsvmlight.txt 本地文件 hdfs上的路径 上传后的文件名并不一定和原先的一样。可以重新命名。 2,...

岁月留痕 ⋅ 2015/12/08 ⋅ 0

Hadoop中的一些基本操作

先粗略说一下“hadoop fs”和“hadoop dfs”的区别:fs是各比较抽象的层面,在分布式环境中,fs就是dfs,但在本地环境中,fs是local file system,这个时候dfs不可用。 1、列出HDFS文件: ha...

KevinWen ⋅ 2014/04/29 ⋅ 0

Java实现HDFS文件操作工具类

1、创建HDFS配置信息静态代码块 static Configuration conf = new Configuration(); //具体配置信息根据自己的安装环境参数来 //HDFS通信地址 // conf.set("dfs.nameservices", "nameservice...

佛系程序猿灬 ⋅ 04/26 ⋅ 0

DADOOP集群HDFS工作机制

hdfs的工作机制 概述 1. HDFS集群分为两大角色:NameNode、DataNode (Secondary Namenode) 2. NameNode负责管理整个文件系统的元数据 3. DataNode 负责管理用户的文件数据块 4. 文件会按照固...

weixin_40747272 ⋅ 04/15 ⋅ 0

六:熟悉HDFS基本常用命令(一)

Hadoop命令学习-官网链接: http://hadoop.apache.org/docs/r2.6.4/hadoop-project-dist/hadoop-common/FileSystemShell.html ----------------------------------------------------------......

牧羊人Berg ⋅ 2016/04/30 ⋅ 0

360推出XLearning:可支持多种机器学习,深度学习框架调度系统!

  XLearning是一款支持多种机器学习,深度学习框架调度系统。基于Hadoop Yarn完成了对TensorFlow,MXNet,Caffe,Theano,PyTorch,Keras,XGBoost等常用框架的集成,同时具备良好的扩展性...

人工智能技术社区 ⋅ 2017/12/06 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

用ZBLOG2.3博客写读书笔记网站能创造今日头条的辉煌吗?

最近两年,著名的自媒体网站今日头条可以说是火得一塌糊涂,虽然从目前来看也遇到了一点瓶颈,毕竟发展到了一定的规模,继续增长就更加难了,但如今的今日头条规模和流量已经非常大了。 我们...

原创小博客 ⋅ 28分钟前 ⋅ 0

MyBatis四大核心概念

本文讲解 MyBatis 四大核心概念(SqlSessionFactoryBuilder、SqlSessionFactory、SqlSession、Mapper)。 MyBatis 作为互联网数据库映射工具界的“上古神器”,训有四大“神兽”,谓之:Sql...

waylau ⋅ 48分钟前 ⋅ 0

以太坊java开发包web3j简介

web3j(org.web3j)是Java版本的以太坊JSON RPC接口协议封装实现,如果需要将你的Java应用或安卓应用接入以太坊,或者希望用java开发一个钱包应用,那么用web3j就对了。 web3j的功能相当完整...

汇智网教程 ⋅ 今天 ⋅ 0

2个线程交替打印100以内的数字

重点提示: 线程的本质上只是一个壳子,真正的逻辑其实在“竞态条件”中。 举个例子,比如本题中的打印,那么在竞态条件中,我只需要一个方法即可; 假如我的需求是2个线程,一个+1,一个-1,...

Germmy ⋅ 今天 ⋅ 0

Springboot2 之 Spring Data Redis 实现消息队列——发布/订阅模式

一般来说,消息队列有两种场景,一种是发布者订阅者模式,一种是生产者消费者模式,这里利用redis消息“发布/订阅”来简单实现订阅者模式。 实现之前先过过 redis 发布订阅的一些基础概念和操...

Simonton ⋅ 今天 ⋅ 0

error:Could not find gradle

一.更新Android Studio后打开Project,报如下错误: Error: Could not find com.android.tools.build:gradle:2.2.1. Searched in the following locations: file:/D:/software/android/andro......

Yao--靠自己 ⋅ 昨天 ⋅ 0

Spring boot 项目打包及引入本地jar包

Spring Boot 项目打包以及引入本地Jar包 [TOC] 上篇文章提到 Maven 项目添加本地jar包的三种方式 ,本篇文章记录下在实际项目中的应用。 spring boot 打包方式 我们知道,传统应用可以将程序...

Os_yxguang ⋅ 昨天 ⋅ 0

常见数据结构(二)-树(二叉树,红黑树,B树)

本文介绍数据结构中几种常见的树:二分查找树,2-3树,红黑树,B树 写在前面 本文所有图片均截图自coursera上普林斯顿的课程《Algorithms, Part I》中的Slides 相关命题的证明可参考《算法(第...

浮躁的码农 ⋅ 昨天 ⋅ 0

android -------- 混淆打包报错 (warning - InnerClass ...)

最近做Android混淆打包遇到一些问题,Android Sdutio 3.1 版本打包的 错误如下: Android studio warning - InnerClass annotations are missing corresponding EnclosingMember annotation......

切切歆语 ⋅ 昨天 ⋅ 0

eclipse酷炫大法之设置主题、皮肤

eclipse酷炫大法 目前两款不错的eclipse 1.系统设置 Window->Preferences->General->Appearance 2.Eclipse Marketplace下载【推荐】 Help->Eclipse Marketplace->搜索‘theme’进行安装 比如......

anlve ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部