Hadoop 中的I/O (2) 压缩/解压缩

原创
2017/03/01 00:20
阅读数 517

压缩

文件压缩有两大好处:减少存储文件所需空间,加速数据在网络和磁盘上的传输。

在Hadoop中可以使用很多种文件压缩格式,工具和算法,它们各有所长。

压缩格式总结表1

所有压缩算法都需要权衡空间/时间:压缩和解压的速度快就意味着能够节省的空间少。表1所列出的所有压缩工具都提供9个不同的e级别来控制压缩时必须考虑的时间/空间均衡:选项 -1 为优化压缩速度,选项 -9 为优化压缩空间。例如我们可以通过最节省空间的方法创建一个名为 compress.gz 的压缩文件。

    gzip -9 compress

不同的压缩工具具有不同的压缩特性。gzip是一个通用的压缩工具,在空间/时间性能的权衡中,居于其它两个算法之间。bzip2的压缩能力强于gzip,但是压缩速度更慢一些。尽管bzip2解压的速度比起压缩的速度快,但仍然比其他压缩工具解压慢。而LZO,LZ4和Snappy三种压缩方式都是更注重压缩速度,它们的压缩速度比gzip快出一个层级,但文件的压缩率都不如gzip。另外Snappy和LZ4的解压缩速度比LZO高出很多。

1. codec

codec实现了一种压缩-解压缩算法。在Hadoop中,一个实现 CompressionCodec 的类就代表一个 codec 。Hadoop 所封装的压缩解压缩算法全在 org.apache.hadoop.io.compress包下,命名规则是:以压缩格式名开始,以Codec结尾。如 GzipCodec 类和 Lz4Codec 类。

1. 通过 CompressionCodec 对数据流进行压缩和解压缩

CompressionCodec 包含两个函数,可以轻松用于压缩和解压缩数据。如果要对写入输出数据流的数据进行压缩,可用 createOutputStream (OutputStream out)方法在底层的数据流中对需要以压缩的格式写入且在此前尚未压缩的数据新建一个 CompressionOutputStream 对象。同理对输入流中需要解压缩但尚未解压缩的时候则通过 createInputStream 获取 CompressionInputStream ,可通过该方法从底层输入流中读取解压缩的数据。

CompressionOutputStream 和 CompressionInputStream ,拥有能够重置其在底层的压缩或解压缩的方法以及将部分数据流压缩为单独数据块等特性。

例1: 将输入从输入流读入,压缩后从输出流输出

    package com.weiwei.WHadoop.io;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionOutputStream;
    import org.apache.hadoop.util.ReflectionUtils;
    
    import java.io.IOException;
    
    /**
     * [@author](https://my.oschina.net/arthor) WangWeiwei
     * [@version](https://my.oschina.net/u/931210) 1.0
     * [@sine](https://my.oschina.net/mysine) 17-2-25
     * 从标准输入流获取数据源,压缩后输出到标准输出流
     */
    public class StreamCompressor {
        public static void main(String[] args) throws ClassNotFoundException, IOException {
            String codecClassName = args[0];
            Class<?> codeClass = Class.forName(codecClassName);
            Configuration configuration = new Configuration();
            CompressionCodec compressionCodec = (CompressionCodec) ReflectionUtils.newInstance(codeClass,configuration);
    
            CompressionOutputStream compressionOutputStream = compressionCodec.createOutputStream(System.out);
            IOUtils.copyBytes(System.in,System.out,4096,false);
            compressionOutputStream.finish();
        }
    }
2. 通过CompressionCodecFactory 判断 COmpressionCodec

在我们常见的压缩文件中,通常可以通过直接通过文件的扩展名来判断文件压缩时所使用的算法。例如文件后缀名为 .gz ,那么就可以使用 GzipCodec 来进行解压缩读取。CompressionCodecFactory 提供了 getCodec() 方法可以根据文件的扩展名映射到文件。

/**
   * Find the relevant compression codec for the given file based on its
   * filename suffix.
   * [@param](https://my.oschina.net/u/2303379) file the filename to check
   * [@return](https://my.oschina.net/u/556800) the codec object
   */
  public CompressionCodec getCodec(Path file) {
    CompressionCodec result = null;
    if (codecs != null) {
      String filename = file.getName();
      String reversedFilename =
          new StringBuilder(filename).reverse().toString();
      SortedMap<String, CompressionCodec> subMap = 
        codecs.headMap(reversedFilename);
      if (!subMap.isEmpty()) {
        String potentialSuffix = subMap.lastKey();
        if (reversedFilename.startsWith(potentialSuffix)) {
          result = codecs.get(potentialSuffix);
        }
      }
    }
    return result;
  }

例2 : 通过文件的扩展名自动选择合适的 codec 以解压文件

    package com.weiwei.WHadoop.io;
    
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionCodecFactory;
    import org.apache.hadoop.io.compress.CompressionOutputStream;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.util.ReflectionUtils;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    
    
    
    /**
     * @author WangWeiwei
     * @version 1.0
     * @sine 17-2-28
     */
    public class FileDecompressorTest extends MiniClusterSuperTest {
        @Test
        public void testFileDecompressor() throws IOException {
            Path path = new Path("/temp/a.gz");
            CompressionCodec compressionCodec= ReflectionUtils.newInstance(GzipCodec.class,conf);
            OutputStream outputStream = fs.create(path);
            CompressionOutputStream compressionOutputStream = compressionCodec.createOutputStream(outputStream);
            compressionOutputStream.write("abcdefghigh".getBytes());
            compressionOutputStream.finish();
            compressionOutputStream.close();
    
            System.out.println("压缩后的文件大小是 : " + fs.getFileStatus(path).getLen());
    
            CompressionCodecFactory factory = new CompressionCodecFactory(conf);
            CompressionCodec codec = factory.getCodec(path);
            if (codec == null){
                System.out.println("未找到文件压缩的方法");
            }else {
                InputStream in = null;
                OutputStream out = null;
                try {
                    Path target = new Path(factory.removeSuffix(path.toString(),codec.getDefaultExtension()));
                    in = codec.createInputStream(fs.open(path));
                    out = fs.create(target);
                    IOUtils.copyBytes(in,out,conf);
    
                    FileStatus fileStatus = fs.getFileStatus(target);
                    System.out.println("解压缩后的文件大小是" + fileStatus.getLen());
                }finally {
                    IOUtils.closeStream(in);
                    IOUtils.closeStream(out);
                }
            }
        }
    }
3. CodecPool 压缩与解压缩池

CodecPool 可以反复使用大量的压缩和解压缩操作,就像数据库连接池一样,可以减少很多创建对象所需的开销。但压缩/解压缩池一般用于需要非常多的压缩解压缩场景,少量的压缩/解压缩操作则完全无必要使用。

例3:使用压缩池对读取自标准输入的数据进行压缩,然后将其写到标准输出。

    package com.weiwei.WHadoop.io;
    
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.compress.*;
    import org.junit.Test;
    
    import java.io.IOException;
    
    /**
     * @author WangWeiwei
     * @since 2017/3/1
     * 使用压缩池对读取自标准输入的数据进行压缩,然后将其写到标准输出。
     */
    public class PooledStreamCompressorTest extends MiniClusterSuperTest{
        
        @Test
        public void testPooledStreamCompressor() {
            CompressionCodec compressionCodec = new GzipCodec();
            Compressor compressor = null;
            try {
                compressor = CodecPool.getCompressor(compressionCodec,conf);
                CompressionOutputStream outputStream = compressionCodec.createOutputStream(System.out);
                IOUtils.copyBytes(System.in,outputStream,4096,false);
                outputStream.finish();
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                CodecPool.returnCompressor(compressor);
            }
        }
        
    }

2. 压缩和输入分片

对于MapReduce来说,文件压缩是否支持分片是尤其重要的。如果压缩格式是如Gzip类的格式,,因为无法进行分片,gzip不支持随机位置读取,要想解压缩文件只能将多个文件合成一个完整的文件后再进行解压缩。而MapReduce讲究的是数据本地特性,gzip这种压缩方式在MapReduce中只能重新合并成一个完整的压缩文件后再进行解压。虽然gzip压缩率较高,由上面可以看出其并不适合MapReduce。 有两种办法可以实现压缩MapReduce的输出: 一、可以在mapReduce的配置中将 mapred.output.compress 属性设为true然后再将 mapred.output.compression.codec 属性设置为要使用的压缩 codec 的类型全路径。 二、可以使用 FileOutputFormat 类的两个静态方法来完成实现压缩

      /**
       * Set whether the output of the job is compressed.
       * @param job the job to modify
       * @param compress should the output of the job be compressed?
       */
      public static void setCompressOutput(Job job, boolean compress) {
        job.getConfiguration().setBoolean(FileOutputFormat.COMPRESS, compress);
      }
      /**
       * Set the {@link CompressionCodec} to be used to compress job outputs.
       * @param job the job to modify
       * @param codecClass the {@link CompressionCodec} to be used to
       *                   compress the job outputs
       */
      public static void 
      setOutputCompressorClass(Job job, 
                               Class<? extends CompressionCodec> codecClass) {
        setCompressOutput(job, true);
        job.getConfiguration().setClass(FileOutputFormat.COMPRESS_CODEC, 
                                        codecClass, 
                                        CompressionCodec.class);
      }

如果要MapReduce 输出成顺序文件(sequence file), 可以通过 配置 mapred.output.compression.type 属性来控制具体的压缩类型,该属性默认值为 RECORD ,意味针对每条记录进行压缩。推荐将type的属性值设为BLOCK ,这样就能实现对一组记录进行压缩,以获取更高的压缩效率。同时,也可以通过 SequenceFileOutputFormat 类的一个静态方法 putCompressionType() 来设置压缩的属性。

虽然 MapReduce 能直接处理的是原数据,但如果能够对map阶段产生的中间结果进行压缩,仍然能够让程序运行的效率提升。因为map任务产生的中间结果需要写到磁盘上,然后通过网络传输到reducer节点。如果在数据传输之前,使用LZO,LZ4或Snappy这些快速压缩方式,就可以减少网络中传输的数据量,从而能达到提升性能的目的。如我们如果要让map任务的输出结果压缩成 gzip 的格式,可以通过如下代码实现。

    Configuration config = new Configuration();
    conf.setBoolean("mapred.compress.map.output",true);
    conf.setClass("mapred.map.output.compression.codec",GzipCodec.class,CompressionCodec.class);
    Job job = new Job(config);
展开阅读全文
打赏
0
6 收藏
分享
加载中
更多评论
打赏
0 评论
6 收藏
0
分享
返回顶部
顶部