文档章节

Avro 序列化操作原理与应用

为为02
 为为02
发布于 2017/04/12 23:57
字数 2586
阅读 442
收藏 6

Avro 序列化操作原理与应用

内存中的序列化与反序列化

Avro 提供了序列化与反序列化API,通过这些API我们可以很方便的将 Avro 集成到现有的系统. Avro的序列化不同于json,其更重视数据在网络上传输的性能,传输过程中忽略了数据的可读性。当然Avro也支持像json一样的key value传输。Avro提供了两种方式进行数据的序列化与反序列化,根据不同的使用场景可以任由我们选择。下面通过Avro使用通用和特殊两种API进行数据的序列化与反序列化的两个例子来探究其原理与应用。

通用 API

 使用通用API我们可以不必创建相应的java对象,就像操作Map一样,只需要知道需要操作的数据的key和value.甚至可以对已有的schema进行分析动态的得出我们所需要的数据结构.

首先以一个简单的Avro模式 StringPire.avsc为例

    {
      "type": "record",
      "name": "StringPair",
      "doc": "A pair of strings.",
      "fields": [
        {"name": "left", "type": "string"},
        {"name": "right", "type": "string"}
      ]
    }

接着再创建一个 SerializeString.java 文件,用于执行指定模式的Avro的序列化与反序列化

    package cn.weiwei.WHadoop.Avro;
    
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericData;
    import org.apache.avro.generic.GenericDatumReader;
    import org.apache.avro.generic.GenericDatumWriter;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.avro.io.*;
    import org.apache.avro.util.Utf8;
    
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    
    /**
     * <pre>字符序列化与反序列化</pre>
     * [@author](https://my.oschina.net/arthor) Wang Weiwei <weiwei02@vip.qq.com / weiwei.wang@100credit.com>
     * [@version](https://my.oschina.net/u/931210) 1.0
     * [@sine](https://my.oschina.net/mysine) 17-4-6
     */
    public class SerializeString {
        /**
         * 使用通用API序列化字符串
         * */
        public ByteArrayOutputStream serialize(String path) throws IOException {
            Schema schema = buildSchema(path);
    
    
            //创建通用Avro记录
            GenericRecord genericRecord = new GenericData.Record(schema);
            genericRecord.put("left",new Utf8("LEFT"));
            genericRecord.put("right",new Utf8("RIGHT"));
    
            //将记录序列化到输出流中
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            //datumWriter对象将数据翻译成Encoder对象可以理解的类型然后写入到输出流
            DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
            //由于encord无需重用 binaryEncoder()的第二个参数为null
            Encoder encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream,null);
            datumWriter.write(genericRecord, encoder);
            encoder.flush();
            byteArrayOutputStream.close();
            System.out.println(byteArrayOutputStream.toString("utf-8"));
            return byteArrayOutputStream;
        }
    
        /**
         * 加载要使用的模式
         * */
        private Schema buildSchema(String path) throws IOException {
            //加载要使用的模式
            Schema.Parser parser = new Schema.Parser();
            return parser.parse(getClass().getClassLoader().getResourceAsStream(path));
        }
    
        /**使用通用API反序列化数据*/
        public void deserialize(String path) throws IOException {
            Schema schema = buildSchema(path);
            DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
            Decoder decoder = DecoderFactory.get().binaryDecoder(serialize(path).toByteArray(),null);
            GenericRecord record = datumReader.read(null,decoder);
            System.out.println(record.toString());
        }
        
    }

对于上面的示例,测试用例如下:

    package cn.weiwei.WHadoop.Avro;
    
    import org.junit.Test;
    
    import static org.junit.Assert.*;
    
    /**<pre>测试字符串序列化与反序列化</pre>
     * @author Wang Weiwei <weiwei02@vip.qq.com / weiwei.wang@100credit.com>
     * @version 1.0
     * @sine 17-4-6
     */
    public class SerializeStringTest {
        SerializeString serializeString = new SerializeString();
        @Test
        public void serialize() throws Exception {
            System.out.println(serializeString.serialize("StringPair.avsc").toString("utf-8"));
        }
    
        @Test
        public void deserialize() throws Exception {
            serializeString.deserialize("StringPair.avsc");
        }
    
    }

经过测试,我们得到了以下结果:

    LEFT
    RIGHT
    
    {"left": "LEFT", "right": "RIGHT"}
    
    Process finished with exit code 0

用例全部正常通过,我们写入的两个字符串完全正常的被序列化与读取了. 对于通用API的写入与读取原理我们举例说明:

    /**重载调用write(Schema schema, Object datum, Encoder out)方法*/
    public void write(D datum, Encoder out) throws IOException {
        write(root, datum, out);
      }
      
      /** 写入数据的主方法.*/
      protected void write(Schema schema, Object datum, Encoder out)
        throws IOException {
        try {
          switch (schema.getType()) {
          case RECORD: writeRecord(schema, datum, out); break;
          case ENUM:   writeEnum(schema, datum, out);   break;
          case ARRAY:  writeArray(schema, datum, out);  break;
          case MAP:    writeMap(schema, datum, out);    break;
          case UNION:
            int index = resolveUnion(schema, datum);
            out.writeIndex(index);
            write(schema.getTypes().get(index), datum, out);
            break;
          case FIXED:   writeFixed(schema, datum, out);   break;
          case STRING:  writeString(schema, datum, out);  break;
          case BYTES:   writeBytes(datum, out);           break;
          case INT:     out.writeInt(((Number)datum).intValue()); break;
          case LONG:    out.writeLong((Long)datum);       break;
          case FLOAT:   out.writeFloat((Float)datum);     break;
          case DOUBLE:  out.writeDouble((Double)datum);   break;
          case BOOLEAN: out.writeBoolean((Boolean)datum); break;
          case NULL:    out.writeNull();                  break;
          default: error(schema,datum);
          }
        } catch (NullPointerException e) {
          throw npe(e, " of "+schema.getFullName());
        }
      }

void write(Schema schema, Object datum, Encoder out)方法会根据不同的模式标注的类型作出不同的反映,如果是基本数据类型的话会直接写入到序列化输出流中,否则继续进行其它处理,我们以 schema的type为record为例继续分析:

     /** Called to write a record.  May be overridden for alternate record
       * representations.*/
      protected void writeRecord(Schema schema, Object datum, Encoder out)
        throws IOException {
        Object state = data.getRecordState(datum, schema);
        for (Field f : schema.getFields()) {
          writeField(datum, f, out, state);
        }
      }
      
      /** Called to write a single field of a record. May be overridden for more 
       * efficient or alternate implementations.*/
      protected void writeField(Object datum, Field f, Encoder out, Object state) 
          throws IOException {
        Object value = data.getField(datum, f.name(), f.pos(), state);
        try {
          write(f.schema(), value, out);
        } catch (NullPointerException e) {
          throw npe(e, " in field " + f.name());
        }
      }

在这里我们发现由write() 方法调用 writeRecord()  方法 调用 writeField()再递归调用 write() 方法,这样就能无所遗漏,经由选择-循环-递归的复杂逻辑处理,保证不漏掉,不错过某个字段的数据,以完成整个数据中所有的模式的解析.并且由于 Avro 写入到流中的信息只有数据本身,而没有像 JSON 那样将冗余的数据模式也同时写入到数据中,这样更利于在网络中高效传输数据.利于对于上面所序列化的字符串

    LEFT
    RIGHT

Avro 实际使用了 9 个字节去表示,如果我们使用传统的json去表示区分,这将花费 34 个字节.在实际存储时,每个字段的数据独占一行(用ascii 码 10 作为区分)(--当数据中存在\n换行符时使用-- 用一页  ascii码 12 作为区分),读取时按照预订的模式所规定的数据类型按照模式声明的顺序去读,对于所需要的字段,可以传空值,但不可以不传.通用的反序列化的逻辑应该正好与序列化的逻辑相反,这里不再叙述.

特殊 API

我们可以使用上面通用的API进行序列化与反序列化操作,但这种方式好像不太符合我们的一切事物皆是对象的思想.Avro 提供的API也有对特殊对象进行操作的API,并且还提供了根据特定的模式去生成类的Maven插件.

例:使用maven插件根据 StringPair.avsc 生成 StringPair.java

pom.xml配置如下:

    <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.avro</groupId>
                    <artifactId>avro-maven-plugin</artifactId>
                    <version>${avro.version}</version>
                    <executions>
                        <execution>
                            <id>schemas</id>
                            <phase>generate-sources</phase>
                            <goals>
                                <goal>schema</goal>
                            </goals>
                            <configuration>
                                <includes>
                                    <include>StringPair.avsc</include>
                                </includes>
                                <stringType>String</stringType> <!-- Avro 1.6.0 onwards -->
                                <sourceDirectory>src/main/resources</sourceDirectory>
                                <outputDirectory>${basedir}/src/main/java/cn/weiwei/WHadoop/Avro/domain
                                </outputDirectory>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>

上面的配置将 avro:schema 插件绑定到 maven 官方的插件 generate-sources 命令之上.配置完成之后,在shell中运行 mvn generate-sources命令便可生成文件 StringPair.java

    /**
     * Autogenerated by Avro
     * 
     * DO NOT EDIT DIRECTLY
     */
    @SuppressWarnings("all")
    /** A pair of strings. */
    @org.apache.avro.specific.AvroGenerated
    public class StringPair extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
      public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"StringPair\",\"doc\":\"A pair of strings.\",\"fields\":[{\"name\":\"left\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"right\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"num\",\"type\":\"int\"}]}");
      public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
      @Deprecated public java.lang.String left;
      @Deprecated public java.lang.String right;
      @Deprecated public int num;
    
      /**
       * Default constructor.  Note that this does not initialize fields
       * to their default values from the schema.  If that is desired then
       * one should use <code>newBuilder()</code>. 
       */
      public StringPair() {}
    
      /**
       * All-args constructor.
       */
      public StringPair(java.lang.String left, java.lang.String right, java.lang.Integer num) {
        this.left = left;
        this.right = right;
        this.num = num;
      }
    
      public org.apache.avro.Schema getSchema() { return SCHEMA$; }
      // Used by DatumWriter.  Applications should not call. 
      public java.lang.Object get(int field$) {
        switch (field$) {
        case 0: return left;
        case 1: return right;
        case 2: return num;
        default: throw new org.apache.avro.AvroRuntimeException("Bad index");
        }
      }
      // Used by DatumReader.  Applications should not call. 
      @SuppressWarnings(value="unchecked")
      public void put(int field$, java.lang.Object value$) {
        switch (field$) {
        case 0: left = (java.lang.String)value$; break;
        case 1: right = (java.lang.String)value$; break;
        case 2: num = (java.lang.Integer)value$; break;
        default: throw new org.apache.avro.AvroRuntimeException("Bad index");
        }
      }
    
      /**
       * Gets the value of the 'left' field.
       */
      public java.lang.String getLeft() {
        return left;
      }
    
      /**
       * Sets the value of the 'left' field.
       * @param value the value to set.
       */
      public void setLeft(java.lang.String value) {
        this.left = value;
      }
    
      /**
       * Gets the value of the 'right' field.
       */
      public java.lang.String getRight() {
        return right;
      }
    
      /**
       * Sets the value of the 'right' field.
       * @param value the value to set.
       */
      public void setRight(java.lang.String value) {
        this.right = value;
      }
    
      /**
       * Gets the value of the 'num' field.
       */
      public java.lang.Integer getNum() {
        return num;
      }
    
      /**
       * Sets the value of the 'num' field.
       * @param value the value to set.
       */
      public void setNum(java.lang.Integer value) {
        this.num = value;
      }
    
      /** Creates a new StringPair RecordBuilder */
      public static StringPair.Builder newBuilder() {
        return new StringPair.Builder();
      }
      
      /** Creates a new StringPair RecordBuilder by copying an existing Builder */
      public static StringPair.Builder newBuilder(StringPair.Builder other) {
        return new StringPair.Builder(other);
      }
      
      /** Creates a new StringPair RecordBuilder by copying an existing StringPair instance */
      public static StringPair.Builder newBuilder(StringPair other) {
        return new StringPair.Builder(other);
      }
      
      /**
       * RecordBuilder for StringPair instances.
       */
      public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<StringPair>
        implements org.apache.avro.data.RecordBuilder<StringPair> {
    
        private java.lang.String left;
        private java.lang.String right;
        private int num;
    
        /** Creates a new Builder */
        private Builder() {
          super(StringPair.SCHEMA$);
        }
        
        /** Creates a Builder by copying an existing Builder */
        private Builder(StringPair.Builder other) {
          super(other);
          if (isValidValue(fields()[0], other.left)) {
            this.left = data().deepCopy(fields()[0].schema(), other.left);
            fieldSetFlags()[0] = true;
          }
          if (isValidValue(fields()[1], other.right)) {
            this.right = data().deepCopy(fields()[1].schema(), other.right);
            fieldSetFlags()[1] = true;
          }
          if (isValidValue(fields()[2], other.num)) {
            this.num = data().deepCopy(fields()[2].schema(), other.num);
            fieldSetFlags()[2] = true;
          }
        }
        
        /** Creates a Builder by copying an existing StringPair instance */
        private Builder(StringPair other) {
                super(StringPair.SCHEMA$);
          if (isValidValue(fields()[0], other.left)) {
            this.left = data().deepCopy(fields()[0].schema(), other.left);
            fieldSetFlags()[0] = true;
          }
          if (isValidValue(fields()[1], other.right)) {
            this.right = data().deepCopy(fields()[1].schema(), other.right);
            fieldSetFlags()[1] = true;
          }
          if (isValidValue(fields()[2], other.num)) {
            this.num = data().deepCopy(fields()[2].schema(), other.num);
            fieldSetFlags()[2] = true;
          }
        }
    
        /** Gets the value of the 'left' field */
        public java.lang.String getLeft() {
          return left;
        }
        
        /** Sets the value of the 'left' field */
        public StringPair.Builder setLeft(java.lang.String value) {
          validate(fields()[0], value);
          this.left = value;
          fieldSetFlags()[0] = true;
          return this; 
        }
        
        /** Checks whether the 'left' field has been set */
        public boolean hasLeft() {
          return fieldSetFlags()[0];
        }
        
        /** Clears the value of the 'left' field */
        public StringPair.Builder clearLeft() {
          left = null;
          fieldSetFlags()[0] = false;
          return this;
        }
    
        /** Gets the value of the 'right' field */
        public java.lang.String getRight() {
          return right;
        }
        
        /** Sets the value of the 'right' field */
        public StringPair.Builder setRight(java.lang.String value) {
          validate(fields()[1], value);
          this.right = value;
          fieldSetFlags()[1] = true;
          return this; 
        }
        
        /** Checks whether the 'right' field has been set */
        public boolean hasRight() {
          return fieldSetFlags()[1];
        }
        
        /** Clears the value of the 'right' field */
        public StringPair.Builder clearRight() {
          right = null;
          fieldSetFlags()[1] = false;
          return this;
        }
    
        /** Gets the value of the 'num' field */
        public java.lang.Integer getNum() {
          return num;
        }
        
        /** Sets the value of the 'num' field */
        public StringPair.Builder setNum(int value) {
          validate(fields()[2], value);
          this.num = value;
          fieldSetFlags()[2] = true;
          return this; 
        }
        
        /** Checks whether the 'num' field has been set */
        public boolean hasNum() {
          return fieldSetFlags()[2];
        }
        
        /** Clears the value of the 'num' field */
        public StringPair.Builder clearNum() {
          fieldSetFlags()[2] = false;
          return this;
        }
    
        @Override
        public StringPair build() {
          try {
            StringPair record = new StringPair();
            record.left = fieldSetFlags()[0] ? this.left : (java.lang.String) defaultValue(fields()[0]);
            record.right = fieldSetFlags()[1] ? this.right : (java.lang.String) defaultValue(fields()[1]);
            record.num = fieldSetFlags()[2] ? this.num : (java.lang.Integer) defaultValue(fields()[2]);
            return record;
          } catch (Exception e) {
            throw new org.apache.avro.AvroRuntimeException(e);
          }
        }
      }
    }

然后创建一个序列化StringPair对象的类 SerializeStringPair.java

    package cn.weiwei.WHadoop.Avro;
    
    
    import cn.weiwei.WHadoop.Avro.domain.StringPair;
    import org.apache.avro.io.*;
    import org.apache.avro.specific.SpecificDatumReader;
    import org.apache.avro.specific.SpecificDatumWriter;
    
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    
    /**
     * <pre>序列化与反序列化 StringPair 工具类</pre>
     * @author Wang Weiwei <email>weiwei02@vip.qq.com / weiwei.wang@100credit.com</email>
     * @version 1.0
     * @sine 2017/4/12
     */
    public class SerializeStringPair {
        public StringPair stringPair;
        
        public SerializeStringPair(){
            stringPair = new StringPair();
            stringPair.setLeft("L");
            stringPair.setRight("R");
        }
        
        /**序列化*/
        public byte[] serialize() throws IOException {
            ByteArrayOutputStream stream = new ByteArrayOutputStream();
            DatumWriter<StringPair> stringPairDatumWriter = new SpecificDatumWriter<StringPair>(StringPair.class);
            Encoder encoder = EncoderFactory.get().binaryEncoder(stream,null);
            stringPairDatumWriter.write(stringPair,encoder);
            encoder.flush();
            stream.close();
            return stream.toByteArray();
        }
        
        /**
         * 反序列化
         * */
        public void deSerialize(byte[] data) throws IOException {
            DatumReader<StringPair> stringPairDatumReader = new SpecificDatumReader<StringPair>(StringPair.class);
            Decoder decoder = DecoderFactory.get().binaryDecoder(data,null);
            StringPair stringPair = stringPairDatumReader.read(null,decoder);
            System.out.println(stringPair);
        }
        
        
    }

通过实例可以发现,特定的api不再需要指定模式等内容,让编程变得更加简单。而其内部实现原理也只是使用反射进行映射,相对都变得更为简单。并且与通用API相同,其内部的字节组织都专门为网络传输做过优化。

© 著作权归作者所有

为为02
粉丝 51
博文 44
码字总数 99356
作品 0
海淀
程序员
私信 提问
Apache Avro 1.8.1 发布

Apache Avro 1.8.1 发布了,Avro(读音类似于[ævrə])是Hadoop的一个子项目,由Hadoop的 创始人Doug Cutting(也是Lucene,Nutch等项目的创始人)牵头开发。Avro是一个数据序列化系统,设计...

oschina
2016/05/24
3.8K
0
Apache Avro 1.4.1 发布

Avro(读音类似于[ævrə])是Hadoop的一个子项目,由Hadoop的 创始人Doug Cutting(也是Lucene,Nutch等项目的创始人)牵头开发。Avro是一个数据序列化系统,设计用于支持大 批量数据交换的...

红薯
2010/10/18
1K
0
【译】Apache Spark 2.4内置数据源Apache Avro

原文链接: Apache Avro as a Built-in Data Source in Apache Spark 2.4 Apache Avro 是一种流行的数据序列化格式。它广泛使用于 Apache Spark 和 Apache Hadoop 生态中,尤其适用于基于 Ka...

开源大数据
2018/12/05
0
0
Apache Spark 2.4 内置的 Avro 数据源介绍

Apache Avro 是一种流行的数据序列化格式。它广泛用于 Apache Spark 和 Apache Hadoop 生态系统,尤其适用于基于 Kafka 的数据管道。从 Apache Spark 2.4 版本开始,Spark 为读取和写入 Avro...

Spark
2018/12/11
0
0
0016-Avro序列化&反序列化和Spark读取Avro数据

1.简介 本篇文章主要讲如何使用java生成Avro格式数据以及如何通过spark将Avro数据文件转换成DataSet和DataFrame进行操作。 1.1Apache Arvo是什么? Apache Avro 是一个数据序列化系统,Avro提...

Hadoop实操
2018/11/18
41
0

没有更多内容

加载失败,请刷新页面

加载更多

mysql概览

学习知识,首先要有一个总体的认识。以下为mysql概览 1-架构图 2-Detail csdn |简书 | 头条 | SegmentFault 思否 | 掘金 | 开源中国 |

程序员深夜写bug
40分钟前
2
0
golang微服务框架go-micro 入门笔记2.2 micro工具之微应用利器micro web

micro web micro 功能非常强大,本文将详细阐述micro web 命令行的功能 阅读本文前你可能需要进行如下知识储备 golang分布式微服务框架go-micro 入门笔记1:搭建go-micro环境, golang微服务框架...

非正式解决方案
今天
3
0
前端——使用base64编码在页面嵌入图片

因为页面中插入一个图片都要写明图片的路径——相对路径或者绝对路径。而除了具体的网站图片的图片地址,如果是在自己电脑文件夹里的图片,当我们的HTML文件在别人电脑上打开的时候图片则由于...

被毒打的程序猿
今天
2
0
Flutter 系列之Dart语言概述

Dart语言与其他语言究竟有什么不同呢?在已有的编程语言经验的基础上,我们该如何快速上手呢?本篇文章从编程语言中最重要的组成部分,也就是基础语法与类型变量出发,一起来学习Dart吧 一、...

過愙
今天
2
0
rime设置为默认简体

转载 https://github.com/ModerRAS/ModerRAS.github.io/blob/master/_posts/2018-11-07-rime%E8%AE%BE%E7%BD%AE%E4%B8%BA%E9%BB%98%E8%AE%A4%E7%AE%80%E4%BD%93.md 写在开始 我的Arch Linux上......

zhenruyan
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部