文档章节

MapReduce并行计算框架介绍

为为02
 为为02
发布于 2017/02/05 14:24
字数 1110
阅读 488
收藏 11

关于MapReduce

MapReduce是一种可用于数据处理的编程模型。MapReduce程序本质上是并行运行的, 因此可以将大规模的数据分析任务分发给任何一个拥有足够多机器的数据中心,充分利用Hadoop 提供的并行计算的优势。

使用Hadoop来分析数据

MapReduce任务过程分为两个处理阶段:map阶段和reduce阶段。每个阶段都以键/值对作为 输入和输出,其类型由程序员来选择。程序员还需要写两个函数,map函数和reduce函数. 示例取自《Hadoop权威指南-第三版》

1. 创建查找最高气温的Mapper类

 package com.hadoopbook.ch02;
 
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 
 import java.io.IOException;
 
 /**
  * [@author](https://my.oschina.net/arthor) WangWeiwei
  * [@version](https://my.oschina.net/u/931210) 1.0
  * [@sine](https://my.oschina.net/mysine) 17-2-4
  * 查找最高气温的mapper类
  */
 public class MaxTemperatureMapper extends  Mapper<LongWritable,Text,Text,IntWritable> {
     private static final int MISSING = 9999;
 
     @Override
     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
         String line = value.toString();
         String year = line.substring(15,19);
         int airTemperature;
         if (line.charAt(87) == '+'){
             // parseInt doesn't like leading plus signs
             airTemperature = Integer.parseInt(line.substring(88, 92));
         }else {
             airTemperature = Integer.parseInt(line.substring(87, 92));
         }
         String quality = line.substring(92, 93);
         if (airTemperature != MISSING && quality.matches("[01459]")) {
             context.write(new Text(year), new IntWritable(airTemperature));
         }
     }
 
     public MaxTemperatureMapper() {
         super();
     }
 }

这个mapper类是一个泛型类型,它有四个形参类型,分别指定map函数的输入键/输入值/输出键和输出值的类型。 Hadoop本身提供了一套可优化网络序列化传输的基本类型,而不是直接使用java内嵌的类型。这些类型都在 org.apache.hadoop.io包里。 map() 方法的输入是一个键和一个值,方法还提供了context实例用于输出内容的写入。

2. 查找最高气温的reducer类

类似于上的方法,使用Reducer来定义reduce函数.

    package com.hadoopbook.ch02;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * [@author](https://my.oschina.net/arthor) WangWeiwei
     * @version 1.0
     * @sine 17-2-4
     * 查找最高气温的Reducer类
     */
    public class MaxTemperatureReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
        public MaxTemperatureReducer() {
            super();
        }
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int maxValue = Integer.MIN_VALUE;
            for (IntWritable value : values){
                maxValue = Math.max(maxValue, value.get());
            }
            context.write(key,new IntWritable(maxValue));
        }
    }

同样,reduce函数也有四个形式参数类型用于指定输入和输出类型。reduce函数的输入类型必须匹配map函数 的输出类型:即TEXT类型和IntWritable类型。

3. MapReduce作业

指定一个作业对象,在这个应用中用来在气象数据集中找出最高气温

    package com.hadoopbook.ch02;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * @author WangWeiwei
     * @version 1.0
     * @sine 17-2-4
     * MaxTemperature Application to find the maximum temperature in the weather dataset
     */
    public class MaxTemperature {
        public static void main(String[] args) throws Exception{
            if (args.length != 2){
                System.err.println("Usage: MaxTemperature <input path> <output path>");
                System.exit(-1);
            }
    
            Job job = new Job();
            job.setJarByClass(MaxTemperature.class);
            job.setJobName("Max Temperature");
    
            FileInputFormat.addInputPath(job,new Path(args[0]));
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
    
            job.setMapperClass(MaxTemperatureMapper.class);
            job.setReducerClass(MaxTemperatureReducer.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

Job对象指定作业的执行规范。我们可以使用它来控制整个作业的运行。我们在Hadoop集群上运行这个作业时, 要把它打包成一个JAR文件(Hadoop在集群上发布这个文件)。不必明确指定JAR文件的名称,在Job对象的 setJarByClass方法中传递一个类即可,Hadoop利用这个类来查找包含它的JAR文件,进而找到相关的JAR文件。

构造Job对象之后,需要指定输入和输出数据路径。调用FileInputFormat类的静态方法addInputPath() 来定义输入数据的路径,这个路径可以是单个文件/一个目录(此时目录下的所有文件当作输入)或符合特定文件模式的一系列文件。 由函数名可知,可以多次调用addInputPath()方法。

调用FIleOutFormat类中的静态方法setOutputPath()来指定输出路径,只能有一个输出路径。这个方法指定的是 reduce函数输出文件的写入目录。在运行作业前该目录是不应该存在的,否则Hadoop会报错,并拒绝运行作业。 这种预防措施的目的是防止数据丢失(长时间运行的作业如果结果被意外覆盖,肯定是非常恼人的)

接着通过setMapperClass()和setReducerClass()指定map类型和reduce类型。

setOutputKeyClass() 和 setOutputValueClass() 控制map和reduce函数的输出类型。

在设置定义map和reduce函数的类后,可以开始运行作业。job中的waitForCompletion()方法返回一个布尔值。

© 著作权归作者所有

为为02
粉丝 51
博文 44
码字总数 99356
作品 0
海淀
程序员
私信 提问
好程序员大数据分享MapReduce中job的提交流程

好程序员大数据分享MapReduce中job的提交流程 一、MapReduce的定义 MapReduce是面向大数据并行处理的计算模型、框架和平台。 它的主要思想是:map(映射)和reduce(归约) 1)MapReduce是一...

好程序员IT
06/05
3
0
Storm与Spark、Hadoop框架对比

Storm与Spark、Hadoop三种框架对比 Storm与Spark、Hadoop这三种框架,各有各的优点,每个框架都有自己的最佳应用场景。所以,在不同的应用场景下,应该选择不同的框架。 1.Storm是最佳的流式...

boonya
04/19
9
0
Hadoop简要介绍

本文大部分内容都是从官网Hadoop上来的。其中有一篇介绍HDFS的pdf文档,里面对Hadoop介绍的比较全面了。我的这一个系列的Hadoop学习笔记也是从这里一步一步进行下来的,同时又参考了网上的很...

晨曦之光
2012/03/09
214
0
[HCNA Cloud]FusionInsight架构与原理

大数据是指无法再一定时间内用常规软件工具对其内容进行抓取、管理和处理的数据集合。 Yarn是Hadoop2.0中的资源管理系统,它是一个通用的资源管理模块,可为各类应用程序进行资源管理和调度。...

Grodd
2018/04/25
0
0
使用 Hadoop 和 Mahout 实现推荐引擎

作为我之前博客的延续,在这篇博客中,我将探讨如何使用 Mahout 和 Hadoop 实现一个 推荐引擎 第一部分 介绍 MapReduce 和 为什么为了利用并行计算的优势,一些算法需要而重写 第二部分 我会...

oschina
2013/02/14
6.5K
5

没有更多内容

加载失败,请刷新页面

加载更多

面向对象编程

1、类和对象 类是对象的蓝图和模板,而对象是实例;即对象是具体的实例,类是一个抽象的模板 当我们把一大堆拥有共同特征的对象的静态特征(属性)和动态特征(行为)都抽取出来后,就可以定...

huijue
今天
11
0
redis异常解决 :idea启动本地redis出现 jedis.exceptions.JedisDataException: NOAUTH Authentication required

第一次安装在本地redis服务,试试跑项目,结果却出现nested exception is redis.clients.jedis.exceptions.JedisDataException: NOAUTH Authentication required错误,真是让人头疼 先检查一...

青慕
今天
22
0
Spring 之 IoC 源码分析 (基于注解方式)

一、 IoC 理论 IoC 全称为 Inversion of Control,翻译为 “控制反转”,它还有一个别名为 DI(Dependency Injection),即依赖注入。 二、IoC方式 Spring为IoC提供了2种方式,一种是基于xml...

星爵22
今天
28
0
Docker安装PostgresSql

Docker安装PostgresSql 拉取docker镜像 # docker pull postgres:10.1010.10: Pulling from library/postgres9fc222b64b0a: Pull complete 38296355136d: Pull complete 2809e135bbdb: Pu......

Tree
今天
13
0
内容垂直居中

方法一: 采用上下 padding 形式,将内容放置在垂直居中 .line { padding: 2% 0; text-align: center; height: 5px;} <div class="line"> 内容垂直居中</div> 方法二: 采......

低至一折起
今天
25
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部