文档章节

NLineInputFormat

Zero零_度
 Zero零_度
发布于 2015/01/11 18:34
字数 319
阅读 74
收藏 0

package com.test;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * TextInputFormat处理的数据来自于一个InputSplit。InputSplit根据块大小划分。
 * 由于每条记录有长有短,所以,每个map任务处理的记录数都不一样
 * NLineInputFormat决定每个map处理记录数是相同的
 */
public class WordCountNL extends Configured implements Tool {
 
 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
  public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
   String line = value.toString();
   
   StringTokenizer st = new StringTokenizer(line);
   while(st.hasMoreElements()) {
    context.write(new Text(st.nextElement().toString()), new IntWritable(1));
   }
  }
 }
 
 public static class Combiner extends Reducer<Text, IntWritable, Text, IntWritable> {
  public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
   int count = 0;
   Iterator<IntWritable> it = values.iterator();
   while(it.hasNext()) {
    count = count + it.next().get();
   }
   context.write(key, new IntWritable(count));
  }
 }
 
 public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
  public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
   int count = 0;
   Iterator<IntWritable> it = values.iterator();
   while(it.hasNext()) {
    count = count + it.next().get();
   }
   context.write(key, new IntWritable(count));
  }
 }
 
 public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  Configuration conf = this.getConf();
  //设置每个map可以处理多少行数据
  //conf.set("mapreduce.input.lineinputformat.linespermap", "1");
  conf.set(NLineInputFormat.LINES_PER_MAP, "1");

  
  Job job = new Job(conf);
  job.setJobName(WordCountNL.class.getSimpleName());
  job.setJarByClass(WordCountNL.class);
  
  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  job.setMapperClass(Map.class);
  job.setCombinerClass(Combiner.class);
  job.setReducerClass(Reduce.class);
  
  job.setInputFormatClass(NLineInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  
  job.waitForCompletion(true);
  
  return job.isSuccessful()?0:1;
 }
 
 public static void main(String[] args) throws Exception {
  int exit = ToolRunner.run(new WordCount(), args);
  System.exit(exit);
 }
 
}

© 著作权归作者所有

共有 人打赏支持
Zero零_度
粉丝 68
博文 1246
码字总数 252959
作品 0
程序员
mapreduce按行划分MAP,即实现输入文件按行划分,每N行一个MAP

按行划分MAP,即实现输入文件按行划分,每N行一个MAP //设置JOB的格式化输入类为NLineInputFormat job.setInputFormatClass(NLineInputFormat.class) //设置每N行为一个MAP,当然,这个数据最...

闵开慧
2014/07/17
0
0
InputFormat加载数据

InputFormat是一个抽象类,其定义如下: InputFormat会对数据进行两方面的处理: 对输入数据进行逻辑切分,形成一个个split 针对每个split,新建一个RecorReader读取split里面的数据,形成一...

Jason_typ
06/13
0
0
Hadoop输入和输出的处理类(7)

hadoop输入的处理类 InputFormat InputFormat负责处理MR的输入部分。 作用: 1、验证作业的输入是否规范。 2、把输入文件切分成InputSplit。 3、提供RecordReader的实现类,把InputSplit读到...

肖鋭
2014/03/01
0
0
MapReduce多种输入格式

MapReduce多种输入格式 文件是 MapReduce 任务数据的初始存储地。正常情况下,输入文件一般是存储在 HDFS 里面。这些文件的格式可以是任意的:我们可以使用基于行的日志文件,也可以使用二进...

wypersist
05/06
0
0
MapReduce编程二

(1) InputFormat接口 用户需要实现该接口以指定输入文件的内容格式。该接口有两个方法 public interface InputFormat<K, V> { InputSplit[] getSplits(JobConf job, int numSplits) throws......

张欢19933
2016/03/30
38
0

没有更多内容

加载失败,请刷新页面

加载更多

992. Sort Array By Parity II - LeetCode

Question 992. Sort Array By Parity II Solution 题目大意:给一个int数组,一半是奇数一半是偶数,分别对偶数数和奇数数排序并要求这个数本身是偶数要放在偶数位上 思路:把奇数数和偶数数...

yysue
23分钟前
1
0
Snackbar源码分析

目录介绍 1.最简单创造方法 1.1 Snackbar作用 1.2 最简单的创建 1.3 Snackbar消失的几种方式 2.源码分析 2.1 Snackbar的make方法源码分析 2.2 对Snackbar属性进行设置 2.3 Snackbar的show显示...

潇湘剑雨
54分钟前
1
0
分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业数据存储

分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业数据存储 摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/job-storage/ 本文基于 Elastic-Job V2.1.5 版本分享 1. 概述 本文主要分享...

DemonsI
今天
1
0
jmockit demo

1、@Mocked,标识一个指定的class的实例或被测对象的参数被Mock掉。 2、@Capturing,标识一个被Mock的对象,从该对象派生的子类也被Mock了。 3、@Injectable,标识只有一个指定的被测对象的内...

我的老腰啊
今天
1
0
内容换行

用 <textarea>13611112222 这里想换行 13877779999</textarea><textarea>13611112222 13877779999</textarea>...

小黄狗
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部