hadoop 序列化框架

原创
2017/05/03 09:20
阅读数 71

hadoop 序列化框架

[toc]

序列化,反序列化

序列化: 按照一定格式把一个对象编码成一个字节流,可以存储在硬盘,可以在网络中传递,可以拷贝,克隆 等, 反序列化: 把存入字节流的对象,解析成一个对象。

java 序列化

序列化接口: Serializable 输入输出: ObjectInputStream 和 ObjectOutputStream 的 readObject() 和writeObject() 序列化内容:对象类,类签名,非静态成员变量值,所有父类对象,其他引用的对象等

hadoop序列化

Writable接口

InterfaceAudience.Public
InterfaceStability.Stable
public interface Writable {
  /** 
   * 输出对象到数据流中
   */
  void write(DataOutput out) throws IOException;

  /** 
   * Deserialize the fields of this object from <code>in</code>.  
   * 从流中读取对象,为了效率,尽可能复用现有对象
   */
  void readFields(DataInput in) throws IOException;
}

介绍几个重要的接口:

WritableComparable : 有比较能力的序列化接口,同时继承了writable 和 comparable 接口, ByteWritable,IntWritable,DoubleWritable 等java 基本类型对应的Writable 都继承了这个接口 RawComparator : 允许从流中读取未被反序列化的对象进行比较。 WritableComparator : RawComparator 的通用实现类

例子 ObjectWirtable 类

主要成员变量

  //需要序列化,反序列化的类名
  private Class declaredClass;
  //被封装的对象的实例
  private Object instance;
  private Configuration conf;

序列化方法

      
      
    @Override
    public void write(DataOutput out) throws IOException {
        writeObject(out, instance, declaredClass, conf);
    }
    public static void writeObject(DataOutput out, Object instance,
                                 Class declaredClass, 
                                 Configuration conf) throws IOException {
        writeObject(out, instance, declaredClass, conf, false);
    }

    public static void writeObject(DataOutput out, Object instance,
        Class declaredClass, Configuration conf, boolean allowCompactArrays) 
    throws IOException {
    //判断实例是不是为null
    if (instance == null) {                       // null
      instance = new NullInstance(declaredClass, conf);
      declaredClass = Writable.class;
    }
    //判断是不是基本类型的数组
    // Special case: must come before writing out the declaredClass.
    // If this is an eligible array of primitives,
    // wrap it in an ArrayPrimitiveWritable$Internal wrapper class.
    if (allowCompactArrays && declaredClass.isArray()
        && instance.getClass().getName().equals(declaredClass.getName())
        && instance.getClass().getComponentType().isPrimitive()) {
      instance = new ArrayPrimitiveWritable.Internal(instance);
      declaredClass = ArrayPrimitiveWritable.Internal.class;
    }

    UTF8.writeString(out, declaredClass.getName()); // always write declared
    
    if (declaredClass.isArray()) {     // non-primitive or non-compact array
      int length = Array.getLength(instance);
      out.writeInt(length);
      for (int i = 0; i < length; i++) {
        writeObject(out, Array.get(instance, i),
            declaredClass.getComponentType(), conf, allowCompactArrays);
      }
      
    } else if (declaredClass == ArrayPrimitiveWritable.Internal.class) {
      ((ArrayPrimitiveWritable.Internal) instance).write(out);
      
    } else if (declaredClass == String.class) {   // String
      UTF8.writeString(out, (String)instance);
     //判断是否是基本类型
    } else if (declaredClass.isPrimitive()) {     // primitive type

      if (declaredClass == Boolean.TYPE) {        // boolean
        out.writeBoolean(((Boolean)instance).booleanValue());
      } else if (declaredClass == Character.TYPE) { // char
        out.writeChar(((Character)instance).charValue());
      } else if (declaredClass == Byte.TYPE) {    // byte
        out.writeByte(((Byte)instance).byteValue());
      } else if (declaredClass == Short.TYPE) {   // short
        out.writeShort(((Short)instance).shortValue());
      } else if (declaredClass == Integer.TYPE) { // int
        out.writeInt(((Integer)instance).intValue());
      } else if (declaredClass == Long.TYPE) {    // long
        out.writeLong(((Long)instance).longValue());
      } else if (declaredClass == Float.TYPE) {   // float
        out.writeFloat(((Float)instance).floatValue());
      } else if (declaredClass == Double.TYPE) {  // double
        out.writeDouble(((Double)instance).doubleValue());
      } else if (declaredClass == Void.TYPE) {    // void
      } else {
        throw new IllegalArgumentException("Not a primitive: "+declaredClass);
      }
    } else if (declaredClass.isEnum()) {         // enum
      UTF8.writeString(out, ((Enum)instance).name());
    } else if (Writable.class.isAssignableFrom(declaredClass)) { // 其他实现了writable接口的类型
      UTF8.writeString(out, instance.getClass().getName());
      ((Writable)instance).write(out);

    } else if (Message.class.isAssignableFrom(declaredClass)) {
      ((Message)instance).writeDelimitedTo(
          DataOutputOutputStream.constructOutputStream(out));
    } else {
      throw new IOException("Can't write: "+instance+" as "+declaredClass);
    }
  }
  

上边介绍的writable 接口的序列化,主要应用在mapreduce 过程中输入输出,但是hadoop还支持了其他序列化方法,包括hadoop Avro, Apache Thrift 和Google Protocol Bufferd等但是这些主要应用在远程rpc通信。对应数据存储例如:map的输出,reduce输出等就主要用到writable接口实现的类。

hadoop简单的序列化框架

序列化类图

接口 Serialzation

方法:

    //判断序列化实现是否支持该类对象
    boolean accept(Class<?> c);
    //获取用于序列化的对象Serializer的实现
    Serializer<T> getSerializer(Class<T> c);
    //获取用于反序列化的对象Deserializer实现
    Deserializer<T> getDeserializer(Class<T> c);

接口 Serializer

    //打开流,为序列化准备
  void open(OutputStream out) throws IOException;
    //开始将对象序列化到流中
  void serialize(T t) throws IOException;
    //关闭流,结束序列化,清理 
  void close() throws IOException;

接口 Deserializer

与序列化过程类似

java序列化支持

主要实现了Serialzation 接口,并且有两个静态内部类JavaSerializationDeserializer 和 JavaSerializationSerializer 分别实现Deserializer 和Serializer 接口 具体代码可以查看 hadoop 项目hadoop-common 的org.apache.hadoop.io.serializer.JavaSerializationl 类。WritableSerialization 和AvroReflectSerialization 也有类似的实现。

展开阅读全文
打赏
1
1 收藏
分享
加载中
更多评论
打赏
0 评论
1 收藏
1
分享
返回顶部
顶部