文档章节

Hadoop Outline - Part 2 (I/O - Writable)

woodo
 woodo
发布于 2014/08/28 11:42
字数 1790
阅读 50
收藏 0

Hadoop I/O


Data Integrity

Hdfs: % hadoop fs -cat hdfs://namenode/data/a.txt

LocalFS: % hadoop fs -cat file:///tmp/a.txt


generate crc check sum file

%hadoop fs -copyToLocal -crc /data/a.txt file:///data/a.txt

check sum file: .a.txt.crc is a hidden file.


Ref: CRC-32,循环冗余校验算法,error-detecting.

io.bytes.per.checksum is deprecated, it's dfs.bytes-per-checksum, default is 512, Must not be larger than dfs.stream-buffer-size,which is the size of buffer to stream files. The size of this buffer should probably be a multiple of hardware page size (4096 on Intel x86), and it determines how much data is buffered during read and write operations.


Data Compression

常用算法

读书时,hadoop支持四种压缩算法,如果调解空间和效率的话,-1 ~ -9,代表从最优速度到最优空间. 压缩算法支持在org.apache.hadoop.io.compress.*.

  1. deflate (.deflate), 就是常用的gzip, package ..DefaultCodec

  2. Gzip (.gz),在deflate格式加了文件头和尾. 压缩速度(适中),解压速度(适中),压缩效率(适中),package ..GzipCodec, both of java and native

  3. bzip2 (.bz2), 压缩速度(最差),< 解压速度(最差),压缩效率 (最好),特点是支持可切分(splitable),对map-red非常友好。,package ..BZip2Codec,java only

  4. LZO (.lzo), 压缩速度(最快),解压速度(最快),压缩效率(最差),,package com.hadoop.compressiojn.lzo.lzopCodec, native only


如果禁用原生库,使用hadoop.native.lib.

如果使用原生库,可能对象创建的成本较高,所以可以使用CodecPool,重复使用这些对象。


对于一个非常大的数据文件,存储如下方案:

  1. 使用支持切分的bzip2

  2. 手动切分,并使压缩后的part接近于block size.

  3. 使用Sequence File, 它支持压缩和切分

  4. 使用Avro数据文件,它也支持压缩和切分,而且增加了很多编程语言的可读写性。


如果Map-Red的output自动压缩:

conf.setBoolean ("mared.output.compress",true);
conf.setClass("mapred.output.compression.codec",GzipCodec.class,CompressionCodec.class);


如果Map-Red的中间结果的自动压缩:

//or conf.setCompressMapOutput(true);
conf.setBoolean ("mared.compress.map.output",true);

//or conf.setMapOutputComressorClass(GzipCodec.class)
conf.setClass("mapred.map.output.compression.codec",GzipCodec.class,CompressionCodec.class);


序列化(Serialization/Deserialization)

Writable and WritableComparable

// core class for hadoop
public interface Writable{
       void write(DataOutput out) throw IOException;
       void readFields(DataInput in) throw IOException;
}

public interface Comparable<T>{
       int compareTo(T o);
}

//core class for map-reduce shuffle
public interface WritableComparable<T> extends Writable, Comparable<T> {
}

// Sample
public class MyWritableComparable implements WritableComparable {
       // Some data
       private int counter;
       private long timestamp;
       
       public void write(DataOutput out) throws IOException {
         out.writeInt(counter);
         out.writeLong(timestamp);
       }
       
       public void readFields(DataInput in) throws IOException {
         counter = in.readInt();
         timestamp = in.readLong();
       }
       
       public int compareTo(MyWritableComparable o) {
         int thisValue = this.value;
         int thatValue = o.value;
         return (thisValue < thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
       }

       public int hashCode() {
         final int prime = 31;
         int result = 1;
         result = prime * result + counter;
         result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
         return result
       }
}

//optimize for stream comparasion
public interface RawComparator<T> extends Comparator<T>{
      // s1 start position, l1, length of bytes
      public int compare(byte[] b1, int s1,int l1,byte[] b2,int s2,int l2);
}

public class WritableComparator implements RawComparator{
}


Comparator RawComparator WritableComparator

WritableComparator 提供了原始compator的compare反序列化对象的实现,性能较差。不过它作为RawComparator实例的工厂:

RawComparator<IntWritable> comparator = WritableComparator.get(IntWritable.class);


// 注册一个经过优化的比较算子。Register an optimized comparator for a WritableComparable implementation.

static void define(Class c, WritableComparator comparator);

          

// 获得一个WritableComparable的比较算子. Get a comparator for a WritableComparable implementation.

static WritableComparator get(Class<? extends WritableComparable> c);

public MyWritableComparator extends WritableComparator{
    static{
        define(MyWritableComparable.class, new MyWritableComparator());
    }
    public MyWritableComparator {
        super(MyWritableComparable.class);
    }

    @Override
    public int compare(byte[] b1, int s1,int l1,byte[] b2,int s2,int l2){
    }
}

 

: 要使static initializer被调用,除非有该类的实例被创建,或某静态方法或成员被访问。或者直接强制,代码如:

Class.forName("package.yourclass"); 它会强制初始化静态initializer.


Java Primitive Data Type wrapped by Writable

Extends from WritableComparable

  • BooleanWritable, 1

  • ByteWritable, 1,

  • BytesWritable,

  • IntWritable,4

  • VIntWritable,1~5

  • FloatWritable,4,

  • LongWritable,8,

  • VLongWritable,1~9

  • DoubleWritable,8

  • NullWritable,Immutable singletone.

  • Text,4~

  • MD5Hash,

  • ObjectWritable,

  • GenericWritable


Extends from Writable only

  • ArrayWritable

  • TwoDArrayWritable

  • AbstractMapWritable

  •      MapWritable

  •      SortedMapWritable


[Text]

值得一提的是Text的序列化方式是Zero-compressed encoding,这个看过一些资料,其实是一种编码方式,意图是省略掉高位0所占用的空间,对于小数,它能节省空间,对于大数会额外占用空间。相比压缩,它能比较快速。其实类似于VIntWritable, VLongWritable的编码方式。


- 如何选择变长和定长数值呢?

1. 定长适合分布非常均匀的数值(如hash),变长适合分布非常不均匀的数值。

2. 变长可以节省空间,而且可以在VIntWritable 和VLongWritable之间转换。


- Text和String的区别

1。String是char序列,Text是UTF-8的byte序列.

UTF-8类不能对字符串大于32767的进行utf-8编码。


(Indexing)索引:对于ASCII来说, Text和String是一样的, 对于Unicode就不同了。String类的长度是其所含char编码单元的长度,然而Text是UTF-8的字节码的长度。CodePointAt表示一个真正的Unicode字符,它可以是2char,4bytes的unicode。

Iteration(迭代): 将Text转换ByteBuffer,然后反复调用bytesToCodePoint()静态方法,可以取到整型的Unicode.

Mutable(易变性): 可以set,类似writable 和StringBuffer,getLength()返回有效字串长度,getbytes().length,返回空间大小。


[BytesWritable]

这是二进制数组的封装,类似于windows下的BSTR,都是前面一个整型表示字节长度,后面是字节的二进制流。

它也是mutable,getLength() != getBytes().length


[NullWritable]

NullWritable是Writable的一个特殊类型。它的序列化长度为0,其实只是一个占位符,既不读入,也不写出。只是存在于程序体中。

Immutable,是一个singleton。


[ObjectWritable] 

ObjectWritable是Java的Array, String, 以及Primitive类型的通用封装 (注:不包含Integer)。它的序列化则使用java的类型序列化,写入类型信息等,比较占用空间。

通过两个特殊的构造:

public ObjectWritable(Object instance);

public ObjectWritable(Class declaredClass,Object instance);

举例子:

ObjectWritable objectw = new ObjectWritable(int.class,5);


[GenericWritable]

首先这是一个抽象类,需要被具象化才能使用。

观察下面这个实列,它以一种Union方式,显示的代理一个Writable实例,解决了Reduce函数的参数声明问题。

public class MyGenericWritable extends GenericWritable {

    private static Class<? extends Writable>[] CLASSES = null;

    static {
        CLASSES = (Class<? extends Writable>[]) new Class[] {
            IntWritable.class,
            Text.class
             //add as many different Writable class as you want
        };
    }


    @Override
    protected Class<? extends Writable>[] getTypes() {
        return CLASSES;
    }

    @Override
    public String toString() {
        return "MyGenericWritable [getTypes()=" + Arrays.toString(getTypes()) + "]";
    }

    // override hashcode();
}

public class Reduce extends Reducer<Text, MyGenericWritable, Text, Text> {
    public void reduce(Text key, Iterable<MyGenericWritable> values, Context context) throws IOException, InterruptedException {
}


[ArrayWritable /TwoDArrayWritable]

ArrayWritable aw = new ArrayWriable(Text.class);


[MapWritable / SortedMapWritable]

实现了java.util.Map<Writable,Writable> 和SortedMap...

它的serialize, 使用先写map<classname,id>,然后后边每个类的类型,以id来替代,节省空间。这些都在父类AbstractMapWritable中实现。

集合小结:

1. 如果是单类型的列表,使用ArrayWritable就足够了

2。如果是把不同类型的Writable存储在一个列表中:

-- 可以使用GenerickWritable,把元素封装在一个ArrayWritable,这个貌似只能同一类型。

    public class MyGenericWritable extends GenericWritable {

    private static Class<? extends Writable>[] CLASSES = null;

    static {
        CLASSES = (Class<? extends Writable>[]) new Class[] {
            ArrayWritable.class,
             //add as many different Writable class as you want
        };
    }


    @Override
    protected Class<? extends Writable>[] getTypes() {
        return CLASSES;
    }

-- 可以使用写一个仿照MapWritable的ListWritable

    //注意实现hashcode,equals,toString, comparTo (if possible)

    //hashcode尤其重要,HashPartitioner通常用hashcode来选择reduce分区,所以为你的类写一个比较好的hashcode非常必要。

    public class ListWritable extends ArrayList<Writable> implements Writable {

    }

/**
 * @author cloudera
 *
 */
public class ListWritable extends ArrayList<Writable> implements Writable {
	private List<Writable> list = new ArrayList<Writable>();
	
	public void set(Writable writable){
		list.add(writable);
	}
	
	@Override
	public void readFields(DataInput in) throws IOException {
		int nsize = in.readInt();
		Configuration conf = new Configuration();
		Text className = new Text();
		while(nsize-->0){
	
			Class theClass = null;
			try {
				className.readFields(in);
				theClass = Class.forName(className.toString());
			} catch (ClassNotFoundException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}

			Writable w = (Writable)ReflectionUtils.newInstance(theClass,conf);
			w.readFields(in);
			
			add(w);
			
		}
	}

	@Override
	public void write(DataOutput out) throws IOException {
		Writable w = null;
		out.writeInt(size());
		for(int i = 0;i<size();i++){
			w = get(i);
			new Text(w.getClass().getName()).write(out);
			w.write(out);
		}
	}

}


© 著作权归作者所有

woodo
粉丝 5
博文 57
码字总数 32118
作品 0
朝阳
高级程序员
私信 提问
haoop序列化 Writable接口与WriteCompareable接口

public interface WritableComparable<T> extends Writable, Comparable<T> {} Hadoop并没有使用JAVA的序列化,而是引入了自己实的序列化系统,package org.apache.hadoop.io这个包中定义了大......

tuzibuluo
2012/02/08
252
0
Hadoop序列化与Writable接口(二)

上一篇文章Hadoop序列化与Writable接口(一)介绍了Hadoop序列化,Hadoop Writable接口以及如何定制自己的Writable类,在本文中我们继续Hadoop Writable类的介绍,这一次我们关注的是Writabl...

pczhangtl
2013/11/21
1K
0
hadoop自定义key,value

Hadoop的自定制数据类型 一般有两个办法,一种较为简单的是针对值,另外一种更为完整的是对于键和值都适应的方法: 1、实现Writable接口: / DataInput and DataOutput 类是java.io的类 / pu...

张欢19933
2016/04/07
170
0
Hadoop中的序列化和基于文件的存储结构(9)

序列化概念: 序列化(Serialization):是指把结构化对象转化为字节流。 反序列化(Deserialization):是序列化的逆过程,即把字节流转回结构化对象。 Java序列化:(java.io.Serializable) Ha...

肖鋭
2014/03/02
206
0
hadoop深入研究:(十)——序列化与Writable接口

转载请写明来源地址:http://blog.csdn.net/lastsweetop/article/details/9193907 所有源码在github上,https://github.com/lastsweetop/styhadoop 简介 序列化和反序列化就是结构化对象和字...

lastsweetop
2013/07/05
0
0

没有更多内容

加载失败,请刷新页面

加载更多

测试定时

23/58

FalconChen
昨天
43
0
新海软件邮政市场监管综合信息平台

二、系统功能 该平台包括邮政普遍服务管理、快递管理两大系统,涵盖了地图定位、普服信息、快递信息、GIS管理、网格管理、视频监控、数据分析(BI)、系统设置等八大模块,全面反映了区域邮政...

neocean
昨天
177
0
【微记忆】用户隐私政策与条款

微记忆尊重并保护所有注册用户的个人隐私权。为了给您提供更准确、更贴心的服务,微记忆会按照本隐私权政策的规定储存并使用您的个人信息。微记忆承诺将以高度严格的审慎义务对待这些信息。除...

微记忆
昨天
69
0
两周自制脚本语言-第7天 添加函数功能

第7天 添加函数功能 基本的函数定义与调用执行、引入闭包使Stone语言可以将变量赋值为函数,或将函数作为参数传递给其他函数 有些函数将有返回值的归为函数,没有返回值的归为子程序 7.1 扩充...

果汁分你一半
昨天
105
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部