文档章节

从零开始最短路径学习Hadoop之04----Hadoop的I/O

brian_2017
 brian_2017
发布于 2017/01/17 08:58
字数 1874
阅读 18
收藏 1
《Hadoop权威指南》第2版本对Hadoop的I/O讲述内容甚多。对初学者来说,暂时不用使用太多技术。熟悉了主线,更多的细节查询手册即可得知。

1. 序列化
    1.1 为什么序列化?
          将一个内存中的对象,转化成字节流,这样可以很方便地通过网络进行传输,或者在磁盘上永久存储。
          从字节流转化成内存中的对象,称反序列化。
    1.2 序列化在分布式系统的两个地方经常出现:进程间通信,永久存储。
    1.3 Hadoop多节点的进程间通信是通过远程过程调用rpc实现的。rpc协议将消息序列化成为二进制流,然后发送到远程节点。远程节点将二进制流发序列化为原始消息。
    1.4 Hadoop的序列化格式Writable接口,它是Hadoop的核心。
    1.5 Writable接口定义两个方法,一个将状态写到DataOutput二进制流,一个从DataInput二进制流读取状态。
    1.6 IntWritable是封装java int的类,使用方式如下:
          IntWritable intw = new IntWritable();
          intw.set(163);
         也是这样:IntWritable  intw = new IntWritable(163);
    1.7 一个演示序列化的和反序列化的例子DispIntWri.java
package com.cere;

import java.io.DataOutputStream;
import java.io.DataInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.util.StringUtils;

public class DispIntWrit{
    public static byte[] serialize(Writable w)throws IOException{
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        DataOutputStream dataout = new DataOutputStream(out);
        w.write(dataout);
        dataout.close();
        return out.toByteArray();
    }

    public static byte[] deserialize(Writable w, byte[] bytes) throws IOException{
        ByteArrayInputStream in = new ByteArrayInputStream(bytes);
        DataInputStream datain = new DataInputStream(in);
        w.readFields(datain);
        datain.close();
        return bytes;
    }

    public static void main(String[] args) throws Exception{
        IntWritable intw = new IntWritable(163);
        byte[] bytes = serialize(intw);
        String bytes_str = StringUtils.byteToHexString(bytes);
        System.out.println(bytes_str);
        System.out.println(bytes.length);
        System.out.println("---------------");
        IntWritable intw2 = new IntWritable(-1);
        deserialize(intw2, bytes);
        System.out.println(intw2);
    }
}

        1.7.1 编译: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-4/p1$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar  -d ./classes/ src/*.java
        1.7.2 打包: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-4/p1$ jar -cvf dit.jar -C ./classes/ .
        1.7.3 执行: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-4/p1$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar dit.jar com.cere.DispIntWrit 

    1.8 WritableComparable接口继承了Writable和java.lang.Comparable接口。
           IntWritable实现了WritableComparable接口。
           对MapReduce来说,类型的比较是非常重要的,因为中间有个基于key的排序阶段。
           RawComparator接口继承了java jdk中的 Comparator接口,可以直接比较数据流中的记录,不需要把数据流反序列化为对象,这样可以避免了新建对象的额外开销。
           WritableComparator是一个类,实现了RawComparator接口。
    1.9 Hadoop里实现很多Writable类:BooleanWritable, ByteWritable, IntWritable, VintWritable, FloatWritable, LongWritable, VlongWritable, DoubleWritable等等。Text是utf-8序列的Writable类。
    1.10 实现定制的Writable类型。
    1.11 为速度实现一个RawComparator类型。
    1.12 定制Comparator类型。
    1.13 序列化框架:只要有一种机制对每个类进行类型与二进制表示的来回转换,就可以使用任何类型。
    1.14 序列化IDL:接口定义语言Interface Description Language,不依赖具体语言的方式进行声明。
    1.15  Apache Avro是一个独立于编程语言的数据序列化系统,解决Hadoop中Writable类型缺乏语言可移植性的问题。

2. 基于文件的数据结构SequenceFile
    日志文件,每一条日志记录是一行文本。如果想记录二进制类型,需要将文本转化成二进制。可以把日志文件转化成二进制的SeqenceFile存储,键key是LongWritable类型表示时间戳,值value是Writable类型,表示日志记录的数量。
    2.1 读写SeqenceFile的例子, WRSeqFile.java
package com.cere;

import java.lang.Exception;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.ReflectionUtils;

public class WRSeqFile{
    private static final String[] DATA = {
        "one",
        "Three",
        "Five",
        "Seven",
        "Nine"
    };

    public static void writeSeqFile(String[] args) throws IOException{
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path path = new Path(uri);
        IntWritable key = new IntWritable();
        Text value = new Text();

        SequenceFile.Writer writer = null;
        try{
            writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
            for(int i = 0; i < 100; i++){
                key.set(100 - i);
                value.set(DATA[i % DATA.length]);
                System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
                writer.append(key, value);
            }
        }finally{
            System.out.println("---------------------------------");
            System.out.println("write ok.");
            IOUtils.closeStream(writer);
        }
    }

    public static void readSeqFile(String[] args)throws IOException{
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path path = new Path(uri);

        SequenceFile.Reader reader = null;
        try{
            reader = new SequenceFile.Reader(fs, path, conf);
            Writable key = (Writable)ReflectionUtils.newInstance(reader.getKeyClass(), conf);
            Writable value = (Writable)ReflectionUtils.newInstance(reader.getValueClass(), conf);
            long position = reader.getPosition();
            while(reader.next(key, value)){
                String syncSeen = reader.syncSeen()?"*":"";
                System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
                position = reader.getPosition();
            }
        }finally{
            System.out.println("---------------------------------");
            System.out.println("read ok.");
            IOUtils.closeStream(reader);
        }
    }

    public static void main(String[] args) throws Exception{
        writeSeqFile(args);
        readSeqFile(args);
    }
}

    2.2 编译: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-4/p2$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes/ src/*.java 
    2.3 打包: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-4/p2$ jar -cvf wrsf.jar -C ./classes/ .
    2.4 执行:
            brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-4/p2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar wrsf.jar com.cere.WRSeqFile a.txt
            brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-4/p2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -text a.txt
    2.5 同步点:当数据读取的实例出错后能够再一次与记录边界同步的数据流中的一个位置。它是由SequenceFile.Writer记录的,也就是在顺序文件写入过程中插入一个特殊项以便每隔开几个记录便有一个同步表示,同步点位于记录的边界之处。
    2.6 在顺序文件中搜索给定位置有两种方法,一种是调用seek方法,读取给定位置,如果给定位置不是记录边界,调用next方法时会发生错误;另一种是通过同步点找到记录边界。
    2.7 通过命令行接口显示SequenceFile对象: " hadoop fs -text a.txt"
    2.8 顺序文件格式:前三个字节是SEQ,第4个字节是版本号,还有其他一些信息。

3. 基于文件的数据结构MapFile
    MapFile是已经排序的SequenceFile,它已加入用于搜索键的索引。可以将MapFile视为java.util.Map的持久化形式。它的大小可能超过保存在内存中的一个map的大小。
    注意对比SequenceFile和MapFile的Writer和Reader。
    3.1 读写MapFile的的例子WRMapFile.java
package com.cere;

import java.lang.Exception;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.ReflectionUtils;

public class WRMapFile{
    private static final String[] DATA = {
        "one",
        "Three",
        "Five",
        "Seven",
        "Nine"
    };

    public static void writeMapFile(String[] args) throws IOException{
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path path = new Path(uri);
        IntWritable key = new IntWritable();
        Text value = new Text();

        MapFile.Writer writer = null;
        try{
            writer = new MapFile.Writer(conf, fs, uri, key.getClass(), value.getClass());
            for(int i = 0; i < 1024; i++){
                key.set(i+1);
                value.set(DATA[i % DATA.length]);
                writer.append(key, value);
            }
        }finally{
            System.out.println("---------------------------------");
            System.out.println("write ok.");
            IOUtils.closeStream(writer);
        }
    }

    public static void readMapFile(String[] args)throws IOException{
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path path = new Path(uri);
        MapFile.Reader reader = null;
        try{
            reader = new MapFile.Reader(fs, path.toString(), conf);
            IntWritable key = new IntWritable();
            Text value = new Text();
            while(reader.next(key, value)){
                System.out.printf("%s\t%s\n", key, value);
            }
        }finally{
            System.out.println("---------------------------------");
            System.out.println("read ok.");
            IOUtils.closeStream(reader);
        }
    }

    public static void main(String[] args) throws Exception{
        writeMapFile(args);
        readMapFile(args);
    }
}

    3.2 编译: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-4/p3$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes/ src/*.java 
    3.3 打包: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-4/p3$ jar -cvf wrmf.jar -C ./classes/ .
    3.4 执行: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-4/p3$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar wrmf.jar com.cere.WRMapFile b
    3.5 写入时,文件名b是一个目录,里面包含data和index两个文件,都是SequenceFile,data文件包含所有记录,index文件包含一部分键和data中键到该键的偏移量的映射。一般是每隔128个键才有一个包含在index文件中。
    3.6 读取时,可以调用get()方法可以随机访问文件中的数据。一次查找需要一次磁盘寻址和一次最多128个条目的扫描。getClost()跟get()方法类似。
    3.7 可以对MapFile进行重建索引。

© 著作权归作者所有

brian_2017
粉丝 3
博文 61
码字总数 145216
作品 0
私信 提问
Linux命令-用户密码和组密码管理

用户密码和组密码分别保存在/etc/shadow和/etc/gshadow里,下面我们对这两个文件进行分析。 首先,cat /etc/shadow: shadow里保存的内容都是分段的,每段以”:“分隔,每一段的内容如下: ...

小辉hui
2014/04/30
0
0
hadoop0.20.2伪分布式环境搭建

虽然现在hadoop版本已经到了主流2点多的时代,但是对于学习大数据而言,我还是选择从以前老的版本0.20.2学起。 下面就是伪分布式的环境搭建过程。 hadoop下载地址: http://archive.apache....

断臂人
2018/06/21
0
0
[Hadoop] 完全分布式集群安装过程详解

1. 用Vmware Workstation创建4个虚拟机,每个虚拟机都装上Centos(版本:CentOS-6.3-x86_64),示意图如下: 2. 在所有结点上修改/etc/hosts,使彼此之间都能够用机器名解析IP 192.168.231....

长平狐
2013/06/03
157
0
[Hadoop] 完全分布式集群安装过程详解

1. 用Vmware Workstation创建4个虚拟机,每个虚拟机都装上Centos(版本:CentOS-6.3-x86_64),示意图如下: 2. 在所有结点上修改/etc/hosts,使彼此之间都能够用机器名解析IP 192.168.231....

长平狐
2013/06/03
216
0
想做数据工程师?从零开始系统规划大数据学习之路

大数据的领域非常广泛,往往使想要开始学习大数据及相关技术的人望而生畏。大数据技术的种类众多,这同样使得初学者难以选择从何处下手。   这正是我想要撰写本文的原因。本文将为你开始学...

董黎明
2018/07/15
14
0

没有更多内容

加载失败,请刷新页面

加载更多

哪些情况下适合使用云服务器?

我们一直在说云服务器价格适中,具备弹性扩展机制,适合部署中小规模的网站或应用。那么云服务器到底适用于哪些情况呢?如果您需要经常原始计算能力,那么使用独立服务器就能满足需求,因为他...

云漫网络Ruan
今天
10
0
Java 中的 String 有没有长度限制

转载: https://juejin.im/post/5d53653f5188257315539f9a String是Java中很重要的一个数据类型,除了基本数据类型以外,String是被使用的最广泛的了,但是,关于String,其实还是有很多东西...

低至一折起
今天
23
0
OpenStack 简介和几种安装方式总结

OpenStack :是一个由NASA和Rackspace合作研发并发起的,以Apache许可证授权的自由软件和开放源代码项目。项目目标是提供实施简单、可大规模扩展、丰富、标准统一的云计算管理平台。OpenSta...

小海bug
昨天
11
0
DDD(五)

1、引言 之前学习了解了DDD中实体这一概念,那么接下来需要了解的就是值对象、唯一标识。值对象,值就是数字1、2、3,字符串“1”,“2”,“3”,值时对象的特征,对象是一个事物的具体描述...

MrYuZixian
昨天
9
0
解决Mac下VSCode打开zsh乱码

1.乱码问题 iTerm2终端使用Zsh,并且配置Zsh主题,该主题主题需要安装字体来支持箭头效果,在iTerm2中设置这个字体,但是VSCode里这个箭头还是显示乱码。 iTerm2展示如下: VSCode展示如下: 2...

HelloDeveloper
昨天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部