文档章节

Hadoop 中的I/O (2) 压缩/解压缩

为为02
 为为02
发布于 2017/03/01 00:20
字数 1912
阅读 171
收藏 6

压缩

文件压缩有两大好处:减少存储文件所需空间,加速数据在网络和磁盘上的传输。

在Hadoop中可以使用很多种文件压缩格式,工具和算法,它们各有所长。

压缩格式总结表1

所有压缩算法都需要权衡空间/时间:压缩和解压的速度快就意味着能够节省的空间少。表1所列出的所有压缩工具都提供9个不同的e级别来控制压缩时必须考虑的时间/空间均衡:选项 -1 为优化压缩速度,选项 -9 为优化压缩空间。例如我们可以通过最节省空间的方法创建一个名为 compress.gz 的压缩文件。

    gzip -9 compress

不同的压缩工具具有不同的压缩特性。gzip是一个通用的压缩工具,在空间/时间性能的权衡中,居于其它两个算法之间。bzip2的压缩能力强于gzip,但是压缩速度更慢一些。尽管bzip2解压的速度比起压缩的速度快,但仍然比其他压缩工具解压慢。而LZO,LZ4和Snappy三种压缩方式都是更注重压缩速度,它们的压缩速度比gzip快出一个层级,但文件的压缩率都不如gzip。另外Snappy和LZ4的解压缩速度比LZO高出很多。

1. codec

codec实现了一种压缩-解压缩算法。在Hadoop中,一个实现 CompressionCodec 的类就代表一个 codec 。Hadoop 所封装的压缩解压缩算法全在 org.apache.hadoop.io.compress包下,命名规则是:以压缩格式名开始,以Codec结尾。如 GzipCodec 类和 Lz4Codec 类。

1. 通过 CompressionCodec 对数据流进行压缩和解压缩

CompressionCodec 包含两个函数,可以轻松用于压缩和解压缩数据。如果要对写入输出数据流的数据进行压缩,可用 createOutputStream (OutputStream out)方法在底层的数据流中对需要以压缩的格式写入且在此前尚未压缩的数据新建一个 CompressionOutputStream 对象。同理对输入流中需要解压缩但尚未解压缩的时候则通过 createInputStream 获取 CompressionInputStream ,可通过该方法从底层输入流中读取解压缩的数据。

CompressionOutputStream 和 CompressionInputStream ,拥有能够重置其在底层的压缩或解压缩的方法以及将部分数据流压缩为单独数据块等特性。

例1: 将输入从输入流读入,压缩后从输出流输出

    package com.weiwei.WHadoop.io;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionOutputStream;
    import org.apache.hadoop.util.ReflectionUtils;
    
    import java.io.IOException;
    
    /**
     * [@author](https://my.oschina.net/arthor) WangWeiwei
     * [@version](https://my.oschina.net/u/931210) 1.0
     * [@sine](https://my.oschina.net/mysine) 17-2-25
     * 从标准输入流获取数据源,压缩后输出到标准输出流
     */
    public class StreamCompressor {
        public static void main(String[] args) throws ClassNotFoundException, IOException {
            String codecClassName = args[0];
            Class<?> codeClass = Class.forName(codecClassName);
            Configuration configuration = new Configuration();
            CompressionCodec compressionCodec = (CompressionCodec) ReflectionUtils.newInstance(codeClass,configuration);
    
            CompressionOutputStream compressionOutputStream = compressionCodec.createOutputStream(System.out);
            IOUtils.copyBytes(System.in,System.out,4096,false);
            compressionOutputStream.finish();
        }
    }
2. 通过CompressionCodecFactory 判断 COmpressionCodec

在我们常见的压缩文件中,通常可以通过直接通过文件的扩展名来判断文件压缩时所使用的算法。例如文件后缀名为 .gz ,那么就可以使用 GzipCodec 来进行解压缩读取。CompressionCodecFactory 提供了 getCodec() 方法可以根据文件的扩展名映射到文件。

/**
   * Find the relevant compression codec for the given file based on its
   * filename suffix.
   * [@param](https://my.oschina.net/u/2303379) file the filename to check
   * [@return](https://my.oschina.net/u/556800) the codec object
   */
  public CompressionCodec getCodec(Path file) {
    CompressionCodec result = null;
    if (codecs != null) {
      String filename = file.getName();
      String reversedFilename =
          new StringBuilder(filename).reverse().toString();
      SortedMap<String, CompressionCodec> subMap = 
        codecs.headMap(reversedFilename);
      if (!subMap.isEmpty()) {
        String potentialSuffix = subMap.lastKey();
        if (reversedFilename.startsWith(potentialSuffix)) {
          result = codecs.get(potentialSuffix);
        }
      }
    }
    return result;
  }

例2 : 通过文件的扩展名自动选择合适的 codec 以解压文件

    package com.weiwei.WHadoop.io;
    
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionCodecFactory;
    import org.apache.hadoop.io.compress.CompressionOutputStream;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.util.ReflectionUtils;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    
    
    
    /**
     * @author WangWeiwei
     * @version 1.0
     * @sine 17-2-28
     */
    public class FileDecompressorTest extends MiniClusterSuperTest {
        @Test
        public void testFileDecompressor() throws IOException {
            Path path = new Path("/temp/a.gz");
            CompressionCodec compressionCodec= ReflectionUtils.newInstance(GzipCodec.class,conf);
            OutputStream outputStream = fs.create(path);
            CompressionOutputStream compressionOutputStream = compressionCodec.createOutputStream(outputStream);
            compressionOutputStream.write("abcdefghigh".getBytes());
            compressionOutputStream.finish();
            compressionOutputStream.close();
    
            System.out.println("压缩后的文件大小是 : " + fs.getFileStatus(path).getLen());
    
            CompressionCodecFactory factory = new CompressionCodecFactory(conf);
            CompressionCodec codec = factory.getCodec(path);
            if (codec == null){
                System.out.println("未找到文件压缩的方法");
            }else {
                InputStream in = null;
                OutputStream out = null;
                try {
                    Path target = new Path(factory.removeSuffix(path.toString(),codec.getDefaultExtension()));
                    in = codec.createInputStream(fs.open(path));
                    out = fs.create(target);
                    IOUtils.copyBytes(in,out,conf);
    
                    FileStatus fileStatus = fs.getFileStatus(target);
                    System.out.println("解压缩后的文件大小是" + fileStatus.getLen());
                }finally {
                    IOUtils.closeStream(in);
                    IOUtils.closeStream(out);
                }
            }
        }
    }
3. CodecPool 压缩与解压缩池

CodecPool 可以反复使用大量的压缩和解压缩操作,就像数据库连接池一样,可以减少很多创建对象所需的开销。但压缩/解压缩池一般用于需要非常多的压缩解压缩场景,少量的压缩/解压缩操作则完全无必要使用。

例3:使用压缩池对读取自标准输入的数据进行压缩,然后将其写到标准输出。

    package com.weiwei.WHadoop.io;
    
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.compress.*;
    import org.junit.Test;
    
    import java.io.IOException;
    
    /**
     * @author WangWeiwei
     * @since 2017/3/1
     * 使用压缩池对读取自标准输入的数据进行压缩,然后将其写到标准输出。
     */
    public class PooledStreamCompressorTest extends MiniClusterSuperTest{
        
        @Test
        public void testPooledStreamCompressor() {
            CompressionCodec compressionCodec = new GzipCodec();
            Compressor compressor = null;
            try {
                compressor = CodecPool.getCompressor(compressionCodec,conf);
                CompressionOutputStream outputStream = compressionCodec.createOutputStream(System.out);
                IOUtils.copyBytes(System.in,outputStream,4096,false);
                outputStream.finish();
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                CodecPool.returnCompressor(compressor);
            }
        }
        
    }

2. 压缩和输入分片

对于MapReduce来说,文件压缩是否支持分片是尤其重要的。如果压缩格式是如Gzip类的格式,,因为无法进行分片,gzip不支持随机位置读取,要想解压缩文件只能将多个文件合成一个完整的文件后再进行解压缩。而MapReduce讲究的是数据本地特性,gzip这种压缩方式在MapReduce中只能重新合并成一个完整的压缩文件后再进行解压。虽然gzip压缩率较高,由上面可以看出其并不适合MapReduce。 有两种办法可以实现压缩MapReduce的输出: 一、可以在mapReduce的配置中将 mapred.output.compress 属性设为true然后再将 mapred.output.compression.codec 属性设置为要使用的压缩 codec 的类型全路径。 二、可以使用 FileOutputFormat 类的两个静态方法来完成实现压缩

      /**
       * Set whether the output of the job is compressed.
       * @param job the job to modify
       * @param compress should the output of the job be compressed?
       */
      public static void setCompressOutput(Job job, boolean compress) {
        job.getConfiguration().setBoolean(FileOutputFormat.COMPRESS, compress);
      }
      /**
       * Set the {@link CompressionCodec} to be used to compress job outputs.
       * @param job the job to modify
       * @param codecClass the {@link CompressionCodec} to be used to
       *                   compress the job outputs
       */
      public static void 
      setOutputCompressorClass(Job job, 
                               Class<? extends CompressionCodec> codecClass) {
        setCompressOutput(job, true);
        job.getConfiguration().setClass(FileOutputFormat.COMPRESS_CODEC, 
                                        codecClass, 
                                        CompressionCodec.class);
      }

如果要MapReduce 输出成顺序文件(sequence file), 可以通过 配置 mapred.output.compression.type 属性来控制具体的压缩类型,该属性默认值为 RECORD ,意味针对每条记录进行压缩。推荐将type的属性值设为BLOCK ,这样就能实现对一组记录进行压缩,以获取更高的压缩效率。同时,也可以通过 SequenceFileOutputFormat 类的一个静态方法 putCompressionType() 来设置压缩的属性。

虽然 MapReduce 能直接处理的是原数据,但如果能够对map阶段产生的中间结果进行压缩,仍然能够让程序运行的效率提升。因为map任务产生的中间结果需要写到磁盘上,然后通过网络传输到reducer节点。如果在数据传输之前,使用LZO,LZ4或Snappy这些快速压缩方式,就可以减少网络中传输的数据量,从而能达到提升性能的目的。如我们如果要让map任务的输出结果压缩成 gzip 的格式,可以通过如下代码实现。

    Configuration config = new Configuration();
    conf.setBoolean("mapred.compress.map.output",true);
    conf.setClass("mapred.map.output.compression.codec",GzipCodec.class,CompressionCodec.class);
    Job job = new Job(config);

© 著作权归作者所有

为为02
粉丝 51
博文 44
码字总数 99356
作品 0
海淀
程序员
私信 提问
HIVE高级与企业优化

HiveServer2 参考地址:http://blog.csdn.net/czw698/article/details/44394923 1、启动hiveserver2服务 $HIVE_HOME/bin/hive --service hiveserver2 2、测试连接是否以连上 不用写jdbc程序,......

qi49125
2017/11/14
0
0
Hadoop 2.2.0安装和配置lzo

Hadoop经常用于处理大量的数据,如果期间的输出数据、中间数据能压缩存储,对系统的I/O性能会有提升。综合考虑压缩、解压速度、是否支持split,目前lzo是最好的选择。LZO(LZO是Lempel-Ziv-...

蓝狐乐队
2014/04/22
2.9K
2
mac命令行压缩解压rar文件

去 http://www.rarlab.com/download.htm下载 rarosx 在Mac OS X系统中默认不支持 RAR 文件的解压缩。下面演示如何在Mac OS X系统中使用 rar 命令行操作。 1. 首先从rarlab 网站下载 rar/unr...

吕坤
2014/11/20
10.5K
2
搭建 Hadoop2.7.2 + Spark1.6环境

服务器上已经有了 hadoop2.7.2环境,这次只用配置spark1.6就可以。 服务器操作系统为centOS6.5 1、安装Scala 下载地址:http://www.scala-lang.org/download/ 注:我下载的是scala-2.11.8.tg...

小萝卜_
2016/05/16
424
0
Impala 如何使用 Hadoop 文件格式

Impala 如何使用 Hadoop 文件格式 Impala 支持几种熟悉的 Apache Hadoop 中使用的文件格式。Impala 可以加载或查询其他 Hadoop 组件如 Pig 或 MapReduce 产生的数据文件,并且 Impala 产生的...

weiqingbin
2014/01/13
1K
0

没有更多内容

加载失败,请刷新页面

加载更多

好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
今天
6
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
今天
6
0
【技术分享】TestFlight测试的流程文档

上架基本需求资料 1、苹果开发者账号(如还没账号先申请-苹果开发者账号申请教程) 2、开发好的APP 通过本篇教程,可以学习到ios证书申请和打包ipa上传到appstoreconnect.apple.com进行TestF...

qtb999
今天
10
0
再见 Spring Boot 1.X,Spring Boot 2.X 走向舞台中心

2019年8月6日,Spring 官方在其博客宣布,Spring Boot 1.x 停止维护,Spring Boot 1.x 生命周期正式结束。 其实早在2018年7月30号,Spring 官方就已经在博客进行过预告,Spring Boot 1.X 将维...

Java技术剑
今天
17
0
浅谈java过滤器Filter

一、简介 Servlet中的过滤器Filter是实现了javax.servlet.Filter接口的服务器端程序,主要的用途是过滤字符编码、做一些业务逻辑判断如是否有权限访问页面等。其工作原理是,只要你在web.xml...

青衣霓裳
今天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部