文档章节

Mapreduce RCFile写入和读取API示例

水东流
 水东流
发布于 2016/07/14 18:06
字数 733
阅读 68
收藏 0

RCFile是FaceBook开发的高压缩比、高效读的行列存储结构。通常在Hive中可以直接对一张Text表使用insert-select转换,但有时希望使用Mapreduce进行RCFile的读写。

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.5.0-cdh5.2.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-serde</artifactId>
            <version>0.13.1-cdh5.2.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hive.hcatalog</groupId>
            <artifactId>hive-hcatalog-core</artifactId>
           <version>0.13.1-cdh5.2.1</version>
       </dependency>

 

读取文本文件,使用mapreduce生成RCFile格式文件

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hive.hcatalog.rcfile.RCFileMapReduceInputFormat;

import java.io.IOException;

public class RcFileReaderJob {
    static class RcFileMapper extends Mapper<Object, BytesRefArrayWritable, Text, NullWritable> {
        @Override
        protected void map(Object key, BytesRefArrayWritable value,
                           Context context)
                throws IOException, InterruptedException {
            Text txt = new Text();
            StringBuffer sb = new StringBuffer();
            for (int i = 0; i < value.size(); i++) {
                BytesRefWritable v = value.get(i);
                txt.set(v.getData(), v.getStart(), v.getLength());
                if (i == value.size() - 1) {
                    sb.append(txt.toString());
                } else {
                    sb.append(txt.toString() + "\t");
                }
            }
            context.write(new Text(sb.toString()), NullWritable.get());
        }

        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            super.cleanup(context);
        }

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            super.setup(context);

        }
    }

    static class RcFileReduce extends Reducer<Text, NullWritable, Text, NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values,
                              Context context) throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }

    public static boolean runLoadMapReducue(Configuration conf, Path input, Path output) throws IOException,
            ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(conf);
        job.setJarByClass(RcFileReaderJob.class);
        job.setJobName("RcFileReaderJob");
        job.setNumReduceTasks(1);
        job.setMapperClass(RcFileMapper.class);
        job.setReducerClass(RcFileReduce.class);
        job.setInputFormatClass(RCFileMapReduceInputFormat.class);
//        MultipleInputs.addInputPath(job, input, RCFileInputFormat.class);
        RCFileMapReduceInputFormat.addInputPath(job, input);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        FileOutputFormat.setOutputPath(job, output);
        return job.waitForCompletion(true);
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        if (args.length != 2) {
            System.err.println("Usage: rcfile <in> <out>");
            System.exit(2);
        }
        RcFileReaderJob.runLoadMapReducue(conf, new Path(args[0]), new Path(args[1]));
    }
}  

 

读取RCFile格式文件,使用mapreduce生成Text格式文件

 

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.rcfile.RCFileMapReduceOutputFormat;

import java.io.IOException;

public class RcFileWriterJob extends Configured implements Tool{
    public static class Map extends Mapper<Object, Text, NullWritable, BytesRefArrayWritable>{
        private byte[] fieldData;
        private int numCols;
        private BytesRefArrayWritable bytes;
        
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            numCols = context.getConfiguration().getInt("hive.io.rcfile.column.number.conf", 0);
            bytes = new BytesRefArrayWritable(numCols);
        }
        
        public void map(Object key, Text line, Context context
                ) throws IOException, InterruptedException {
            bytes.clear();
            String[] cols = line.toString().split("\t", -1);
            System.out.println("SIZE : "+cols.length);
            for (int i=0; i<numCols; i++){
                fieldData = cols[i].getBytes("UTF-8");
                BytesRefWritable cu = new BytesRefWritable(fieldData, 0, fieldData.length);
                bytes.set(i, cu);
            }
            context.write(NullWritable.get(), bytes);
        }
    }
    
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if(otherArgs.length < 2){
            System.out.println("Usage: " +
                    "hadoop jar RCFileLoader.jar <main class> " +
                    "-tableName <tableName> -numCols <numberOfColumns> -input <input path> " +
                    "-output <output path> -rowGroupSize <rowGroupSize> -ioBufferSize <ioBufferSize>");
            System.out.println("For test");
            System.out.println("$HADOOP jar RCFileLoader.jar edu.osu.cse.rsam.rcfile.mapreduce.LoadTable " +
                    "-tableName test1 -numCols 10 -input RCFileLoaderTest/test1 " +
                    "-output RCFileLoaderTest/RCFile_test1");
            System.out.println("$HADOOP jar RCFileLoader.jar edu.osu.cse.rsam.rcfile.mapreduce.LoadTable " +
                    "-tableName test2 -numCols 5 -input RCFileLoaderTest/test2 " +
                    "-output RCFileLoaderTest/RCFile_test2");
            return 2;
        }

        String tableName = "";
        int numCols = 0;
        String inputPath = "";
        String outputPath = "";
        int rowGroupSize = 16 *1024*1024;
        int ioBufferSize = 128*1024;
        for (int i=0; i<otherArgs.length - 1; i++){
            if("-tableName".equals(otherArgs[i])){
                tableName = otherArgs[i+1];
            }else if ("-numCols".equals(otherArgs[i])){
                numCols = Integer.parseInt(otherArgs[i+1]);
            }else if ("-input".equals(otherArgs[i])){
                inputPath = otherArgs[i+1];
            }else if("-output".equals(otherArgs[i])){
                outputPath = otherArgs[i+1];
            }else if("-rowGroupSize".equals(otherArgs[i])){
                rowGroupSize = Integer.parseInt(otherArgs[i+1]);
            }else if("-ioBufferSize".equals(otherArgs[i])){
                ioBufferSize = Integer.parseInt(otherArgs[i+1]);
            }
            
        }
        
        conf.setInt("hive.io.rcfile.record.buffer.size", rowGroupSize);
        conf.setInt("io.file.buffer.size", ioBufferSize);

        Job job = Job.getInstance(conf);
        job.setJobName("RcFileWriterJob");
        job.setJarByClass(RcFileWriterJob.class);
        job.setMapperClass(Map.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(BytesRefArrayWritable.class);
//        job.setNumReduceTasks(0);
        
        FileInputFormat.addInputPath(job, new Path(inputPath));
        
        job.setOutputFormatClass(RCFileMapReduceOutputFormat.class);
        RCFileMapReduceOutputFormat.setColumnNumber(job.getConfiguration(), numCols);
        RCFileMapReduceOutputFormat.setOutputPath(job, new Path(outputPath));
        RCFileMapReduceOutputFormat.setCompressOutput(job, false);

        System.out.println("Loading table " + tableName + " from " + inputPath + " to RCFile located at " + outputPath);
        System.out.println("number of columns:" + job.getConfiguration().get("hive.io.rcfile.column.number.conf"));
        System.out.println("RCFile row group size:" + job.getConfiguration().get("hive.io.rcfile.record.buffer.size"));
        System.out.println("io bufer size:" + job.getConfiguration().get("io.file.buffer.size"));
        
        return (job.waitForCompletion(true) ? 0 : 1);
    }
    
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new RcFileWriterJob(), args);
        System.exit(res);
    }

}

本文转载自:http://blog.csdn.net/opensure/article/details/46228095

上一篇: HttpInput
水东流
粉丝 4
博文 51
码字总数 23858
作品 0
海淀
程序员
私信 提问
基于 Hive 的文件格式:RCFile 简介及其应用

Hadoop 作为MR 的开源实现,一直以动态运行解析文件格式并获得比MPP数据库快上几倍的装载速度为优势。不过,MPP数据库社区也一直批评Hadoop由于文件格式并非为特定目的而建,因此序列化和反序...

大数据之路
2014/06/18
2.8K
1
一文详解大规模数据计算处理原理及操作重点

作者介绍 李智慧,《大型网站技术架构:核心原理与案例分析》作者。曾供职于阿里巴巴与英特尔亚太研发中心,从事大型网站与大数据方面的研发工作,目前在做企业级区块链方面的开发工作。 大数...

DBAplus社群
2018/08/07
0
0
深入学习Hive应用场景及架构原理

col1 ARRAY< INT>, col2 MAP< STRING,INT>, col3 STRUCT< a:STRING,b:INT,c:DOUBLE> ) (三)类型转化 Hive 的原子数据类型是可以进行隐式转换的,类似于 Java 的类型转换,例如某表达式使用......

py_123456
2018/05/12
0
0
004.hive命令的3种调用方式 | ApacheCN(apache中文网)

ApacheCN apache中文网 hive命令的3种调用方式 官网地址:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli(可参考) 方式1:hive –f /root/shell/hive-script.sql......

片刻
2014/01/13
29.8K
0
7个实例全面掌握Hadoop MapReduce

作者介绍 杜亦舒,创业中,技术合伙人,喜欢研究分享技术。个人订阅号:性能与架构。 本文旨在帮您快速了解 MapReduce 的工作机制和开发方法,解决以下几个问题: 文章中提供了程序实例中涉及...

杜亦舒
2017/06/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

3_数组

3_数组

行者终成事
今天
7
0
经典系统设计面试题解析:如何设计TinyURL(二)

原文链接:https://www.educative.io/courses/grokking-the-system-design-interview/m2ygV4E81AR 编者注:本文以一道经典的系统设计面试题:《如何设计TinyURL》的参考答案和解析为例,帮助...

APEMESH
今天
7
0
使用logstash同步MySQL数据到ES

概述   在生成业务常有将MySQL数据同步到ES的需求,如果需要很高的定制化,往往需要开发同步程序用于处理数据。但没有特殊业务需求,官方提供的logstash就很有优势了。   在使用logstas...

zxiaofan666
今天
10
0
X-MSG-IM-分布式信令跟踪能力

经过一周多的鏖战, X-MSG-IM的分布式信令跟踪能力已基本具备, 特点是: 实时. 只有要RX/TX就会实时产生信令跟踪事件, 先入kafka, 再入influxdb待查. 同时提供实时sub/pub接口. 完备. 可以完整...

dev5
今天
7
0
OpenJDK之CyclicBarrier

OpenJDK8,本人看的是openJDK。以前就看过,只是经常忘记,所以记录下 图1 CyclicBarrier是Doug Lea在JDK1.5中引入的,作用就不详细描述了,主要有如下俩个方法使用: await()方法,如果当前线...

克虏伯
今天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部