文档章节

hadoop 多文件输出-新旧API

坏坏一笑
 坏坏一笑
发布于 2014/06/06 16:39
字数 1265
阅读 100
收藏 0

       换了份工作,linux经验极少的我竟然搞起hadoop了,最近有个需求:根据“]@\n”字符提取日志,日志有多种类型,不同的类型需要清洗到不同的路径下或者产生不同类型的文件,在这里就不说清洗了,说说多文件输出吧,至于根据“]@\n”字符提取日志属于自定义InputFormat的范畴,下篇再说。

        一般来说Map/Reduce都是输出一组文件,但是有些情况下需要我们输出多组文件,比如我上面提到的需求,接下来我用新旧API分别说明如何实现多文件输出

旧API:

        MultipleTextOutputFormat 这个类很重要,我们其实只要写个类继承MultipleTextOutputFormat,并且重写generateFileNameForKeyValue(Object key, Object value, String name)方法就好了。因为MultipleTextOutputFormat中有个write方法,即将记录写到hdfs上,在这个方法中,会调用generateFileNameForKeyValue。废话不多说,上代码:

public class MultiFileOutputFormat extends MultipleTextOutputFormat<Object, Object>{
 @Override
 protected String generateFileNameForKeyValue(Object key, Object value,
   String name) {
  if(key instanceof OutputFileName){
   return ((OutputFileName) key).getPath()+"/"+name;
  }else{
   return super.generateFileNameForKeyValue(key, value, name);
  }
 }
}

    其中OutputFileName是我自己定义的枚举类,便于管理而已,这里也可以return一个路径,以下是OutputFileName的代码

public enum OutputFileName {
    ERRORLOG("errorlog","logtype=errorlog"),
    APIREQUEST("apiRequest","logtype=apiRequest"),
    FIRSTINTOTIME("firstIntoTime","logtype=firstIntoTime"),
    TABFLUSHTIME("tabFlushTime","logtype=tabFlushTime"),
    PERFORMANCE("performance","logtype=performance"),
    FILEREQUEST("fileRequest","logtype=fileRequest"); 
    private String name;
    private String path;
    private String tempPath;
    private OutputFileName(String name,String path){
        this.name = name;
        this.path = path;
    }
    public String getName(){
        return this.name;
    }
    public String getPath(){
        if(!StringUtil.isEmpty(tempPath)){
            String temp = this.tempPath;     
            this.tempPath = null;     
            return temp;
        }else{
            return this.path;
        }
    } 
}

         如何使用MultiFileOutputFormat这个自己写的类呢?就这么用

//job所在类的main方法中   
JobConf conf = new JobConf(config,XXX.class);
conf.setOutputFormat(MultiFileOutputFormat.class); 
//map函数中 
collector.collect(OutputFileName.ERRORLOG, new Text(log));

        此示例做了以上 的操作就可以将数据写到logtype=errorlog目录下了,当然可以根据不同的日志去设置输出目录了

新API:

        对于新的API,我没发现MultipleTextOutputFormat这个类,很头疼,我甚至看了源码,仿照旧API自己写了MultipleTextOutputFormat,这就需要做很多事情,必须写个集成RecordWriter的类,重写里面的方法,当时确实可以做到将数据写到不同的路径下,但是也有bug,数据很多的时候,路径下的数据只有一部分保留,做了一下测试,确实把所有的记录都写了,但却只是把最后写的一部分保留在设定好的路径下了,至今都没发现原因,这里就不给代码了,只能保留60多万行的记录

        当然我还是有办法的,经过百般折磨,终于在网上找到相关资料,使用这个类MultipleOutputs,查查API,还真有,只不过是在org.apache.hadoop.mapreduce.lib.output包下,这个类相当于把旧的API东西又重新整理了一遍,我们不用再去写其他的类集成MultipleTextOutputFormat。具体使用方法看代码吧

public static class MapperClass extends Mapper<Object, Text, Text, NullWritable> { 
    private Text outkey = new Text("");
    private MultipleOutputs<Text, NullWritable> mos;
    public void map(Object key, Text value, Context context) throws IOException,InterruptedExceptio{        String log = value.toString();
        outkey.set(log); 
        int begin = log.indexOf("@[#(");
        if(begin != -1){
            String logForSplit = log.substring(begin+"@".length()); 
            String [] split = logForSplit.split("#");
            if(split != null && split.length >0){ 
                String cType = split[0]; 
                if(!StringUtil.isEmpty(cType)){ 
                    if("apiRequest".equals(cType)){
                        mos.write("apiRequest", outkey, NullWritable.get());
                    }else if("errlog".equals(cType)){
                        mos.write("errorlog", outkey, NullWritable.get());
                    } 
                } 
            }  
        }     
    } 
     @Override
      protected void cleanup(Context context) throws IOException,
        InterruptedException {
           mos.close();
           super.cleanup(context);
      }    
      @Override
      protected void setup(Context context) throws IOException,
        InterruptedException {
           mos = new MultipleOutputs<Text, NullWritable>(context);
           super.setup(context);
      }
}
public class TestJob { 
     
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  Configuration conf = new Configuration();
  
  
  
  Job job = new Job(conf, "ss");
  job.setInputFormatClass(TrackInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  job.setJarByClass(TestJob.class);
  job.setMapperClass(TestJob.MapperClass.class);
  job.setNumReduceTasks(0);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(NullWritable.class);
  if(inputPaths.length > 0){
   Path[] paths = new Path[inputPaths.length];
   for(int i = 0 ; i < inputPaths.length ; i++){
    paths[i] = new Path(inputPaths[i]);
   }
   FileInputFormat.setInputPaths(job, paths);
  }else{
   FileInputFormat.setInputPaths(job, new Path(args[0]));
  }
  
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  MultipleOutputs.addNamedOutput(job, "errorlog",
    TextOutputFormat.class, Text.class, NullWritable.class);
  MultipleOutputs.addNamedOutput(job, "apiRequest",
    TextOutputFormat.class, Text.class, NullWritable.class);
   } 
}

    OK,这就可以了,总结一下需要注意的问题,首先在我们的map类中一定要定义MultipleOutputs的对象,并且重写cleanup和setup方法,分别用来关闭和创建MultipleOutputs对象,最重要的是在job所在的类中注册我们的文件名,比如errorlog,apiRequest等

 

        上述的两个例子有点区别,第一个是将数据写到不同的目录下,而第二个是写到同一个目录下,但是会分成不同类型的文件,如我截取的记录

 -rw-r--r--   2 hadoop supergroup   10569073 2014-06-06 11:50 /test/aa/fileRequest-m-00063.lzo
-rw-r--r--   2 hadoop supergroup   10512656 2014-06-06 11:50 /test/aa/fileRequest-m-00064.lzo
-rw-r--r--   2 hadoop supergroup      68780 2014-06-06 11:51 /test/aa/firstIntoTime-m-00000.lzo
-rw-r--r--   2 hadoop supergroup      67901 2014-06-06 11:51 /test/aa/firstIntoTime-m-00001.lzo

    至于怎么样输出到不同的目录下,有待研究,这种方式有个不好的地方, 会产生很多的

-rw-r--r--   2 hadoop supergroup         42 2014-06-06 11:50 /test/aa/part-m-00035.lzo    空文件

不过不影响使用,上述第二种方式,参照了http://my.oschina.net/leejun2005/blog/133424 这位仁兄的博客,很感谢他

    下一篇我将介绍自定义InputFormat,为了满足不按照行进行读取日志,而是按照指定的特殊字符读取的需求

 

 

 

 

 

        

© 著作权归作者所有

坏坏一笑
粉丝 10
博文 54
码字总数 29772
作品 0
昌平
程序员
私信 提问
MapReduce中的自定义多目录/文件名输出HDFS

最近考虑到这样一个需求: 需要把原始的日志文件用hadoop做清洗后,按业务线输出到不同的目录下去,以供不同的部门业务线使用。 这个需求需要用到MultipleOutputFormat和MultipleOutputs来实...

大数据之路
2012/12/08
14.2K
0
面试问题

358、你们的集群规模? 开发集群:10台(8台可用)8核cpu 359、你们的数据是用什么导入到数据库的?导入到什么数据库? 处理之前的导入:通过hadoop命令导入到hdfs文件系统 处理完成之后的导...

HIVE
2016/07/05
1K
2
MR IOException: Type mismatch in key from map的处理

一、错误描述 执行MR出现如下错误,如执行命令是: hadoop jar /opt/cloudera/parcels/CDH-5.0.1-1.cdh5.0.1.p0.47/lib/hadoop-mapreduce/hadoop-streaming-2.3.0-cdh5.0.1.jar -input /tes......

cloud-coder
2014/06/09
2.3K
2
Hadoop编写调试MapReduce程序详解

编程学习,最好的方法还是自己动手,所以这里简单介绍在Hadoop上编写调试一个MapReduce程序。 先说一下我的开发环境,我的操作系统是Centos6.0,Hadoop版本是0.20.2,开发环境是eclipse。在H...

miaosu
2013/03/20
12.1K
3
大数据(hadoop-mapreduce代码及编程模型讲解)

MapReduce编程模型 MapReduce将整个运行过程分为两个阶段: Map 阶段和Reduce阶段 Map阶段由一定数量的Map Task组成 输入数据格式解析: InputFormat 输入数据处理: Mapper 数据分组: Part...

这很耳东先生
05/28
12
0

没有更多内容

加载失败,请刷新页面

加载更多

Netty整合Protobuffer

现在我们都知道,rpc的三要素:IO模型,线程模型,然后就是数据交互模型,即我们说的序列化和反序列化,现在我们来看一下压缩比率最大的二进制序列化方式——Protobuffer,而且该方式是可以跨...

算法之名
21分钟前
11
0
如何用C++实现栈

栈的定义 栈(stack)又名堆栈,它是一种运算受限的线性表。限定仅在表尾进行插入和删除操作的线性表。这一端被称为栈顶,相对地,把另一端称为栈底。向一个栈插入新元素又称作进栈、入栈或压...

BWH_Steven
39分钟前
4
0
编程作业20190210900169

1编写一个程序,提示用户输入名和姓,然后以“名,姓”的格式打印出来。 #include <stdio.h>#include <stdlib.h> int main(){ char firstName[20]; char lastName[20]; print......

1李嘉焘1
51分钟前
10
0
补码的优点及原理分析

只讨论整数 1.计算机内部为什么没有减法器? 减法运算本身其实就是加法,如x - y即x +(-y),所以只需要将负数成功表示出来并可以参加加法运算,那加法器就可同时实现“+”和“-”的运算。这...

清自以敬
今天
76
0
Docker 可视化管理 portainer

官网安装指南: https://portainer.readthedocs.io/en/latest/deployment.html docker-compose.yml 位置,下载地址:https://downloads.portainer.io/docker-compose.yml...

Moks角木
今天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部