Hadoop中的压缩Codec(18)

原创
2014/03/04 22:16
阅读数 1K

作为输入

            当压缩文件作为MapReduce的输入时,MapReduce将自动通过扩展名找到相应的Codec对其解压。

作为输出

            当MapReduce的输出文件需要压缩时,可以更改mapred.output.compress为true,mapred.output.compression.codec为想要使用的codec的类名称,当然你可以可以在代码中指定,通过调用FileOutputFormt的静态方法去设置这两个属性:

package com.hadoop.codecs;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

public class CodecDemo {
    public static void main(String[] args) throws Exception {
        if (args.length!=2){
            System.exit(-1);
        }
        Job job=new Job();
        job.setJarByClass(CodecDemo.class);
        job.setJobName("CodecDemo");

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MyMapper.class);
        job.setCombinerClass(MyReducer.class);
        job.setReducerClass(MyReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

	//设置输出压缩开启
        FileOutputFormat.setCompressOutput(job, true);
	//设置压缩类:GzipCodec
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

        System.exit(job.waitForCompletion(true)?0:1);

    }
}


使用CompressionCodes解压缩

/*
使用CompressionCodes解压缩CompressionCodec有两个方法可以方便的压缩和解压。
压缩:通过createOutputStream(OutputStream out)方法获得CompressionOutputStream对象
解压:通过createInputStream(InputStream in)方法获得CompressionInputStream对象
从命令行接受一个CompressionCodec实现类的参数,然后通过ReflectionUtils把实例化这个类,调用CompressionCodec的接口方法对标准输出流进行封装,封装成一个压缩流,通过IOUtils类的copyBytes方法把标准输入流拷贝到压缩流中,最后调用CompressionCodec的finish方法,完成压缩。
*/
package com.hadoop.codecs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;

public class Compressors {
    public static void main(String[] args) throws Exception {
        String codecClassName = args[0];
        Class<?> codecClass = Class.forName(codecClassName);
        Configuration conf = new Configuration();
        CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);

        CompressionOutputStream out = codec.createOutputStream(System.out);
        IOUtils.copyBytes(System.in, out, 4096, false);
        out.finish();
    }
}


使用CompressionCodecFactory解压缩

/*
    如果你想读取一个被压缩的文件的话,首先你得先通过扩展名判断该用哪种codec,当然有更简便得办法,CompressionCodecFactory已经帮你把这件事做了,通过传入一个Path调用它得getCodec方法,即可获得相应得codec。
    注意看下removeSuffix方法,这是一个静态方法,它可以将文件的后缀去掉,然后我们将这个路径做为解压的输出路径。CompressionCodecFactory能找到的codec也是有限的,默认只有三种org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DefaultCodec,如果想添加其他的codec你需要更改io.compression.codecs属性,并注册codec。


*/
package com.hadoop.codecs;  

import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.FileSystem;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IOUtils;  
import org.apache.hadoop.io.compress.CompressionCodec;  
import org.apache.hadoop.io.compress.CompressionCodecFactory;  
  
import java.io.IOException;  
import java.io.InputStream;  
import java.io.OutputStream;  
import java.net.URI;  
  
public class FileDecompressor {  
	public static void main(String[] args) throws Exception {  
		String uri = args[0];  
		Configuration conf = new Configuration();  
		FileSystem fs = FileSystem.get(URI.create(uri), conf);  
  
		Path inputPath = new Path(uri);  
		CompressionCodecFactory factory = new CompressionCodecFactory(conf);  
		CompressionCodec codec = factory.getCodec(inputPath);  
		if (codec == null) {  
			System.out.println("No codec found:" + uri);  
			System.exit(1);  
		}  
		String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());  
  
		InputStream in = null;  
		OutputStream out = null;  
  
		try {  
			in = codec.createInputStream(fs.open(inputPath));  
			out = fs.create(new Path(outputUri));  
			IOUtils.copyBytes(in,out,conf);  
		} finally {  
			IOUtils.closeStream(in);  
			IOUtils.closeStream(out);  
		}  
	}  
}


                                                                                                                            Name:Xr

                                                                                                                            Date:2014-03-04 22:16

展开阅读全文
打赏
2
3 收藏
分享
加载中
更多评论
打赏
0 评论
3 收藏
2
分享
返回顶部
顶部