学习MapReduce(一)

原创
2017/03/09 17:40
阅读数 13

2017.3.9号,学习进度来到了MapReduce。通过查看文档,观看视频,说说自己对MapReduce的理解。

1.概念:

    MapReduce是Hadoop的计算框架。它和HDFS一样,都是Hadoop中不可缺少的一部分。它分为两个阶段,一个Map阶段,一个Reduce阶段。这两个阶段都是以键值对形式来进行参数传入的。

2.运行机制:

    一个数据文件按照需求,切割成数块,放在HDFS上。当一个客户端启动一个MapReduce的jar包时,ResouceManager会根据需求启动相应的MRappMaster,MRappMaster会根据MRjar包的设定启动一定数量的MRappTask先来执行map()方法,得到一堆经过处理的K,V结果,这些结果会在shuffle中进行重新洗牌,将一样的key的 K,V结果放入同一个结果集中,(当然shuffle也有别的用途),map()方法结束后,你会得到一堆不同的结果集,而这些结果集的每个结果集中的key都一样,只是value不同。这时,进入Reduce阶段,Reduce阶段将以每个结果集为输入进行计算。最终得到一个业务需求的结果。最后,这个结果会汇总在一起,放入HDFS上。

3.代码阶段:

写一个wordcount的代码。

一个mapreduce的jar包,需要写一个Mapper的子类,一个Reducer的子类和一个Driver类:

Mapper子类:
public class MapReduceMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    /**
     * 重写map方法 ,实现自己想要完成的内容
     */
    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
        //将整个Text转化为一个String对象
        String line = value.toString();
        //通过hadoop的StringUtils包来对String进行切片
        String[] words =StringUtils.split(line,' ');
        //遍历
        for (String str : words) {
            context.write(new Text(str),new LongWritable(1));
        }
    }
}

Reducer子类:

public class MRreducer extends Reducer<Text, LongWritable, Text, LongWritable> {

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,Context context)
            throws IOException, InterruptedException {
        
        long count =  0;
        
        for (LongWritable value : values) {
            //因为我们要实现key的个数累加,所以我们使用计数器便利values,得到总数
            count += value.get();
        }
        
        context.write(key, new LongWritable(count));
    }
}

Diver类:

public class MapReduceRun {
        
    
    public static void main(String[] args) throws Exception {
        //获得job实例
        Configuration conf = new Configuration();
        
        Job job = Job.getInstance(conf);
        //在运行之前,需要将jar包所在地给hadoop
        job.setJarByClass(MapReduceRun.class);
        
        job.setMapperClass(MapReduceMapper.class);
        job.setReducerClass(MRreducer.class);
        //通过job实例来实现得到自己写的map类中得到的K,V
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        //通过job来得到自己写的reduce类中的K,V
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //通过FileInputFormat和FileOutputFormat来指定原始文档和输出文档所在地
        FileInputFormat.setInputPaths(job,new Path("hdfs://xxx:port"+args[0]));//指定在hdfs上的输入文件
        FileOutputFormat.setOutputPath(job,new Path("hdfs://xxx:port"+args[1]));//制定在hdfs上的输出目录    
        
        job.waitForCompletion(true);
    }
}

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部