Hadoop 2.5.1学习笔记5: mongo-hadoop connector的使用范例

原创
2014/11/13 12:35
阅读数 185

package com.dewmobile.task;

import java.io.IOException;
import java.util.Iterator;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.bson.BSONObject;

import com.mongodb.hadoop.MongoInputFormat;
import com.mongodb.hadoop.MongoOutputFormat;
import com.mongodb.hadoop.util.MongoConfigUtil;

public class ComputeProfile extends Configured implements Tool {

 @Override
 public int run(String[] arg0) throws Exception {
  // 1 initialized
  Configuration conf = getConf();
  // 2 input
  MongoConfigUtil.setInputURI(conf,
    "mongodb://192.168.56.66:27018/mongo-hadoop-test.apps");
  MongoConfigUtil.setOutputURI(conf,
    "mongodb://192.168.56.66:27018/mongo-hadoop-test.appsout");
  Job job = new Job(conf, "ComputeProfile");
  job.setJarByClass(ComputeProfile.class);

  // 3 execute
  job.setMapperClass(MapClass.class);
  job.setCombinerClass(Combiner.class);
  job.setReducerClass(Reduce.class);

  // 4 output
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  // conf.set("mongo.output.uri","mongodb://localhost:27017/mongo-hadoop-test.appsout");
  job.setInputFormatClass(MongoInputFormat.class);
  job.setOutputFormatClass(MongoOutputFormat.class);

  // 5 run it now !!!
  System.exit(job.waitForCompletion(true) ? 0 : 1);
  return 0;
 }

 public static class MapClass extends
   Mapper<Object, BSONObject, Text, IntWritable> {
  public void map(Object key, BSONObject value, Context context)
    throws IOException, InterruptedException {

   if (null == key || value == null) {
    System.out
      .println("**************************************************");
    System.out.println("null pointer");
    System.out
      .println("**************************************************");
   }
   System.out.println("key---" + key + " value---" + value.toString());
   context.write(new Text("" + System.currentTimeMillis()), new IntWritable(1));
  }

 }

 public static class Combiner extends
   Reducer<Text, IntWritable, Text, IntWritable> {
  public void reduce(Text key, Iterable<IntWritable> values,
    Context context) throws IOException, InterruptedException {
   Iterator ite = values.iterator();
   if (ite.hasNext()) {
    context.write(key, (IntWritable) ite.next());    
   }

  }

 }

 public static class Reduce extends
   Reducer<Text, IntWritable, Text, IntWritable> {
  public void reduce(Text key, Iterable<IntWritable> values,
    Context context) throws IOException, InterruptedException {
   Iterator ite = values.iterator();
   if (ite.hasNext()) {
    IntWritable i = (IntWritable)ite.next();
    context.write(key, i);
    System.out.println("[output by reduce]---"+key+"  "+i);
   }
  }

 }

 public static void main(String[] args) throws Exception {
  int res = ToolRunner.run(new Configuration(), new ComputeProfile(),
    args);
  System.exit(res);
 }

}

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
2 收藏
0
分享
返回顶部
顶部