文档章节

从零开始最短路径学习Hadoop之05----MapReduce应用开发

brian_2017
 brian_2017
发布于 2017/01/17 08:59
字数 3005
阅读 7
收藏 0
1. MapReduce程序编写流程:写map函数和reduce函数和它们的单元测试;写驱动程序并用本地数据集进行测试;在集群上运行并测试。Hadoop提供了一些在集群上进行诊断的辅助工具,如IsolationRunner。程序运行正确,需要进行调优。需要执行一切标准检查,然后做task profiling。Hadoop提供钩子hook辅助这个过程。

2. 配置API
    2.1 我们此前用过的Configuration类的实例代表配置属性及其取值的一个集合。每个属性由一个String来命名,值类型可以是多种类型之一。
    2.2 Configuration从XML文件读取属性值。
    2.3 Hadoop的系统默认属性在core-default.x ml文件,位置相关的覆盖属性在core-site.xml文件。
    2.4 如果Configuration连续读取多个配置xml文件,一般来说,后来xml文件的定义的属性会覆盖之前定义的属性。但前面文件被标记成final属性不能被后面的定义覆盖。
    2.5 一般,管理员将守护进程site file中的属性标记为final,这样可以防止用户在客户端或者作业提交参数中改动这些参数。
    2.6 系统属性的优先级高于源文件中定义的属性。
    2.7 上述属性配置需要用一个演示程序进行展示。
    2.8 例子1 读取简单配置文件configuration-1.xml
        2.8.1 在/home/brian/usr/hadoop/hadoop-1.1.2/new-conf目录,configuration-1.xml放入这个目录中。注意,必须将new-conf目录放必须放在/home/brian/usr/hadoop/hadoop-1.1.2,放在其他目录不可以。configuration-1.xml内容如下:
<?xml version="1.0"?>

<configuration>
  <property>
    <name>color</name>
    <value>yellow</value>
    <description>Color</description>
  </property>

  <property>
    <name>size</name>
    <value>10</value>
    <description>Size</description>
  </property>

  <property>
    <name>weight</name>
    <value>heavy</value>
    <final>true</final>
    <description>Weight</description>
  </property>

  <property>
    <name>size-weight</name>
    <value>${size},${weight}</value>
    <description>Size and weight</description>
  </property>

</configuration>

        2.8.2 源代码ReadConf.java
package com.cere;

import org.apache.hadoop.conf.Configuration;

public class ReadConf{
    public static void main(String[] args){
        Configuration conf = new Configuration();
        conf.addResource("./new-conf/configuration-1.xml");
        System.out.printf("color = %s\n", conf.get("color"));
        System.out.printf("size = %d\n", conf.getInt("size", 0));
        System.out.printf("weight = %s\n", conf.get("weight"));
        System.out.printf("wide = %s\n", conf.get("breadth", "wide"));
    }
}

        2.8.3 编译:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-5/p1$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes/ src/*.java
        2.8.4 打包:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-5/p1$ jar -cvf rc.jar -C ./classes/ .
        2.8.5 执行:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-5/p1$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar rc.jar com.cere.ReadConf
        2.8.6 输出结果:
color = yellow
size = 10
weight = heavy
wide = wide
        2.8.6 xml文件没有保存类型信息,属性在被读取的时候,可以被解释为指定的类型。get方法运行为xml文件中没有定义的属性指定默认值,如上述代码的最后一行的breadth属性的wide属性是没有赋值的,但可以打印出wide=wide。
    
    2.9 例子2. 读多个配置文件,以及值的覆盖问题。configuration-2.xml跟configuration-1.xml在同一个目录。
        2.9.1 configuration-2.xml内容如下:
<?xml version="1.0"?>

<configuration>
  <property>
    <name>size</name>
    <value>12</value>
    <description>Size</description>
  </property>

  <property>
    <name>weight</name>
    <value>light</value>
    <description>Weight</description>
  </property>

</configuration>

        2.9.2 ReadMulConf.java的内容如下:

package com.cere;

import org.apache.hadoop.conf.Configuration;

public class ReadMulConf{
    public static void main(String[] args){
        Configuration conf = new Configuration();
        conf.addResource("./new-conf/configuration-1.xml");
        conf.addResource("./new-conf/configuration-2.xml");
        System.out.printf("color = %s\n", conf.get("color"));
        System.out.printf("size = %d\n", conf.getInt("size", 0));
        System.out.printf("weight = %s\n", conf.get("weight"));
        System.out.printf("wide = %s\n", conf.get("breadth", "wide"));
    }
}

        2.9.3 编译:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-5/p2$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes/ src/*.java
        2.9.4 打包:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-5/p2$ jar -cvf rmc.jar -C ./classes/ .
        2.9.5 执行:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-5/p2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar rmc.jar com.cere.ReadMulConf
              输出结果:
color = yellow
size = 12
weight = heavy
wide = wide        
        2.9.6 结果分析:size属性值原先是10,在configuration-2.xml中被改成12;weight在configuration-1.xml是heavy,尽管configuration-2.xml的weight属性是light,但在configuration-1.xml中weight的属性是final,不可以被更改,于是仍然是heavy。

    2.10 例子3 可变的扩展

        2.10.1 源代码是ReadMulConfD.java

package com.cere;

import org.apache.hadoop.conf.Configuration;

public class ReadMulConfD{
    public static void main(String[] args){
        Configuration conf = new Configuration();
        conf.addResource("./new-conf/configuration-1.xml");
        conf.addResource("./new-conf/configuration-2.xml");
        System.out.printf("color = %s\n", conf.get("color"));
        System.out.printf("size = %d\n", conf.getInt("size", 0));
        System.out.printf("weight = %s\n", conf.get("weight"));
        System.out.printf("wide = %s\n", conf.get("breadth", "wide"));
        System.out.printf("size-weight = %s\n", conf.get("size-weight"));
        System.out.printf("------------------------\nreset size\n");
        System.setProperty("size", "14");
        System.out.printf("size-weight = %s\n", conf.get("size-weight"));
    }
} 

      2.10.2 编译:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-5/p3$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes/ src/*.java

        2.10.3 打包:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-5/p3$ jar -cvf rmcd.jar -C ./classes/ .
        2.10.4 执行:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-5/p3$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar rmcd.jar com.cere.ReadMulConfD 
               输出结果:
color = yellow
size = 12
weight = heavy
wide = wide
size-weight = 12,heavy
------------------------
reset size
size-weight = 14,heavy
        2.10.5 解释:在读取了配置之后,再执行系统属性的配置将size设置成14,于是再次读取size-weight的时候,size就变成14。

3. 配置管理
    3.1 开发Hadoop时候,需要在本地运行和集群之间进行切换。可以在几个集群上工作,或者在本地伪分布式集群上测试。伪分布式集群是其守护进程运行在本地的集群。
    3.2 hadoop-1.1.2的子目录conf放上3个配置文件,hadoop-local.xml,hadoop-localhost.xml和hadoop-cluster.xml。hadoop在三种配置之间切换运行。
    3.3 在客户端执行不同的配置: 
        "hadoop fs -conf conf/hadoop-localhost.xml -ls ."
        这样列出来的就是本地集群的文件。
    3.4 hadoop-local.xml文件:
<?xml version="1.0"?>

<configuration>
  <property>
    <name>fs.default.name</name>
    <value>file:///</value>
  </property>

  <property>
    <name>mapred.job.tracker</name>
    <value>local</value>
  </property>

</configuration>


    3.5 hadoop-localhost.xml文件:
<?xml version="1.0"?>

<configuration>

  <property>
    <name>fs.default.name</name>
    <value>hdfs://localhost:9000</value>
  </property>

  <property>
    <name>mapred.job.tracker</name>
    <value>localhost:9001</value>
  </property>

</configuration>

    3.6 hadoop-cluster.xml文件:
<?xml version="1.0"?>

<configuration>
  <property>
    <name>fs.default.name</name>
    <value>hdfs://namenode/</value>
  </property>

  <property>
    <name>mapred.job.tracker</name>
    <value>jobtracker:8021</value>
  </property>

</configuration>

    3.7 执行1:brian@brian-laptop:~/usr/hadoop/hadoop-1.1.2$ ./bin/hadoop fs -conf conf/hadoop-localhost.xml -ls .
        这个会显示hdfs上的当前用户的目录下的文件。
    3.8 执行2:brian@brian-laptop:~/usr/hadoop/hadoop-1.1.2$ ./bin/hadoop fs -conf conf/hadoop-local.xml -ls .
        这个会显示/home/brian/usr/hadoop/hadoop-1.1.2目录下的文件。

4. 辅助类 GenericOptionsParser, Tool和ToolRunner
    4.1 GenericOptionsParser是一个类,用来解释常用的Hadoop命令行选项,并根据需要,为Configuration对象设置相应的取值。
    4.2 通常不直接使用GenericOptionsParser,更方便的方式是实现Tool接口,通过ToolRunner来运行应用程序,ToolRunner内部调用GenericOptionsParser。
    4.3 Configurable是一个接口。Configured类是Configurable的一个实现。ConfigurationPrinter是Configured的一个子类。
        Tool是一个接口,它继承了Cofigurable接口。
    4.3 Tool实现示例,用于打印一个Configuration对象的属性

        4.3.1 代码ConfigurationPrinter.java

package com.cere;

import java.util.Map;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ConfigurationPrinter extends Configured implements Tool{
    static {
        Configuration.addDefaultResource("./conf/hdfs-default.xml");
        Configuration.addDefaultResource("./conf/hdfs-site.xml");
        Configuration.addDefaultResource("./conf/mapred-default.xml");
        Configuration.addDefaultResource("./conf/mapred-site.xml");
    }

    @Override
    public int run(String[] args) throws Exception{
        Configuration conf = getConf();
        for(Map.Entry<String, String> entry: conf){
            System.out.printf("%s = %s\n", entry.getKey(), entry.getValue());
        }
        return 0;
    }

    public static void main(String[] args) throws Exception{
        int exitCode = ToolRunner.run(new ConfigurationPrinter(), args);
        System.exit(exitCode);
    }
}

        4.3.2 编译:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-5/p4$javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes/ src/*.java
        4.3.3 打包:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-5/p4$ jar -cvf cp.jar -C ./classes/ .
        4.3.4 运行: brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-5/p4$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar cp.jar com.cere.ConfigurationPrinter
        4.3.5 解释:ConfigutaionPrinter的main方法不直接调用自身的run方法,而是调用ToolRunner的静态run方法。因为ToolRunner的run方法,会先位Tool建立一个Configuration对方,然后再调用ConfigurationPrinter的run方法。ToolRunner还使用了GenericOptionParser获取命令行中指定的任何标准选项,然后,在Configuration实例上进行设置。
        4.3.6 GenericOptionsParser允许设置个别属性,在命令行加一个"-D"在加上属性, -D的优先级高过配置文件的其他优先级,例如:
                     brian@brian-laptop:~/work/ifi/devsvn/hdp-train/class-5/p4$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar cp.jar com.cere.ConfigurationPrinter -D color=yellow | grep color                 

    4.4 运行 brian@brian-laptop:~/usr/hadoop/hadoop-1.1.2$ ./bin/hadoop jar ~/work/ifi/devsvn/hdp-train/class-5/p4/cp.jar com.cere.ConfigurationPrinter -conf conf/hadoop-localhost.xml  | grep mapred.job.tracker
           输出:mapred.job.tracker = localhost:9001

    4.5 注意:不是所有的属性都能在命令行设置。有些设置只能在特定的配置文件里才能生效。有时间需要阅读源代码才能确定。

5. 编写单元测试
    5.1 一个Junit的例子。 单元测试用Junit来做。安装Junit参考这里 http://blog.csdn.net/lizhe_dashuju/article/details/9627589
    5.2 Mockito可以模拟各种类,安装Mockito
        5.2.1 Mockito主页 https://code.google.com/p/mockito/
        5.2.2 在这里下载mockito-all-1.9.5.jar,在这里 https://code.google.com/p/mockito/downloads/detail?name=mockito-all-1.9.5.jar&can=2&q=
        5.2.3 配置/etc/environment,在这里将修改CLASSPATH,成为:CLASSPATH=.:/usr/lib/jvm/java-6-sum/lib:/usr/local/lib/junit/hamcrest-core-1.3.jar:/usr/local/lib/junit/junit-4.11.jar:/usr/local/lib/java-lib/mockito-all-1.9.5.jar
        5.2.4 测试Mockito
           5.2.4.1 例子在此 http://blog.iamzsx.me/show.html?id=118001
           5.2.4.2 这个例子仅仅用于说明mockito。
           5.2.4.3 源代码RandomUtil.java
package com.cere;

import java.util.Random;

public class RandomUtil{
    private final static Random DEFAULT_RANDOM = new Random();

    public static int randomSelect(int[] weights){
        return randomSelect(DEFAULT_RANDOM, weights);
    }

    public static int randomSelect(Random r, int[] weights){
        if (weights == null || weights.length == 0){
            throw new IllegalArgumentException("weights must not be an empty arrary");
        }

        int s = 0;
        for(int i:weights){
            s += i;
        }
        int target = r.nextInt(s);
        System.out.printf("target = %d\n", target);
        int accumulator = 0;
        for(int i = 0; i < weights.length; i++){
            System.out.printf("\ni = %d\n", i);
            accumulator += weights[i];
            System.out.printf("accumulator = %d\n", accumulator);
            if (accumulator > target){
                System.out.printf("return i = %d\n", i);
                return i;
            }
        }
        System.out.printf("return 0\n");
        return 0;
    }

    public static void main(String[] args){
        int[] expected = new int[] {0, 0, 1, 2, 4, 4};
        randomSelect(expected);
    }
}

            5.2.4.4 源代码TestRandomUtil.java
package com.cere;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Random;
import org.junit.Assert;
import org.junit.Test;

public class TestRandomUtil{
    @Test
    public void testSelect(){
        int[] weights = {2,1,3,0,4};
        int s = 0;
        for(int i:weights){s += i;}
        int[] expected = new int[] {0, 0, 1, 2, 2, 2, 4, 4, 4, 4};
        for(int i = 0; i < s; i++){
            Random r = mock(Random.class);
            when(r.nextInt(s)).thenReturn(i);
            Assert.assertEquals(expected[i], RandomUtil.randomSelect(r, weights));
        }
    }
}
            5.2.4.5 编译: brian@brian-laptop:~/work/ifi/devsvn/hdp-train/class-5/p7$ javac -d ./classes/ src/*.java
            5.2.4.6 执行1: cd classes; 
                                  brian@brian-laptop:~/work/ifi/devsvn/hdp-train/class-5/p7/classes$ java com.cere.RandomUtil
            5.2.4.7 执行2: 
                                  cd classes; 
                                  brian@brian-laptop:~/work/ifi/devsvn/hdp-train/class-5/p7/classes$ java org.junit.runner.JUnitCore  com.cere.TestRandomUtil
                      这个是执行测试的过程。

    5.3 大气数据处理的单元测试
        5.3.1 被测试的代码是"从零开始最短路径学习Hadoop之02----处理气象数据的第一个MapReduce程序" http://blog.csdn.net/lizhe_dashuju/article/details/9624191的第3节的代码。
        5.3.2  在上述项目的src目录下,写单元测试文件TestMaxTemperatureMapper.java
package com.ifis;

import java.io.IOException;
import static org.mockito.Mockito.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.OutputCollector;
import org.junit.*;

public class TestMaxTemperatureMapper{
    @Test
    public void processValidRecord() throws IOException, InterruptedException{
        MaxTemperatureMapper mapper = new MaxTemperatureMapper();
        Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" +
                              "99999V0203201N00261220001CN9999999N9-00111+99999999999");
        OutputCollector<Text, IntWritable> output = mock(OutputCollector.class);
        mapper.map(null, value, output, null);
        verify(output).collect(new Text("1950"), new IntWritable(-11));
    }
}
        5.3.3 修改/etc/envirnment里的CLASSPATH成为"CLASSPATH=.:/usr/lib/jvm/java-6-sum/lib:/usr/local/lib/junit/hamcrest-core-1.3.jar:/usr/local/lib/junit/junit-4.11.jar:/usr/local/lib/java-lib/mockito-all-1.9.5.jar:/home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar:/home/brian/usr/hadoop/hadoop-1.1.2/lib/commons-cli-1.2.jar:/home/brian/usr/hadoop/hadoop-1.1.2/lib/commons-logging-1.1.1.jar"
        5.3.4 编译:brian@brian-laptop:~/work/ifi/devsvn/hdp-train/class-5/p1$ javac -d ./classes/ src/*.java -Xlint:unchecked
                      这个编译会提示OutputCollector有警告。不要紧,忽略它。
        5.3.5 执行: cd classes; 
                                brian@brian-laptop:~/work/ifi/devsvn/hdp-train/class-5/p1/classes$ java org.junit.runner.JUnitCore com.ifis.TestMaxTemperatureMapper
                      可以看到测试是通过的。
        5.3.6 作者的网站www.hadoopbook.com里,是使用MRUnit定制功能模块,有兴趣可以参考一下。
        5.3.7 如果将value改动一下,让这个值缺失温度部分,再次重新编译执行测试,会报错。
        5.3.8 用同样的方式写Reducer的测试单元TestMaxTemperatureReducer.java,编译和执行方式跟5.4.5类似。
package com.ifis;

import java.io.IOException;
import java.util.Iterator;
import java.util.Arrays;
import static org.mockito.Mockito.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.OutputCollector;
import org.junit.*;

public class TestMaxTemperatureReducer{
    @Test
    public void processValidRecord() throws IOException, InterruptedException{
        MaxTemperatureReducer reducer = new MaxTemperatureReducer();
        Text key = new Text("1950");
        Iterator<IntWritable> values = Arrays.asList(new IntWritable(10), new IntWritable(5)).iterator();
        OutputCollector<Text, IntWritable> output = mock(OutputCollector.class);
        reducer.reduce(key, values, output, null);
        verify(output).collect(key, new IntWritable(10));
    }
}

6. MapReduce的工作流
    如果处理过程复杂,一般是有更多的MapReduce作业,而不是更复杂的map和reduce函数。也就说,不要扩展map和reduce函数的复杂度,而是增加更多的作业。 对更复杂的问题,可以考虑比MapReduce更高级的语言,如Pig, hive, Cascading。有了它之后,用不着处理MapReduce作业的转换,将工作集中在分析正在执行的任务。 将问题分解成MapReduce作业, 当MapReduce工作流的作业不止一个时,根据线性作业链,或者有向无环图DAG--directed acyclic graph处理。 线性链表,一个接一个地运行作业。 DAG运行:JobControl类。

© 著作权归作者所有

brian_2017
粉丝 3
博文 61
码字总数 145216
作品 0
私信 提问
大数据经典学习路线(及供参考)之 一

1.Linux基础和分布式集群技术 学完此阶段可掌握的核心能力: 熟练使用Linux,熟练安装Linux上的软件,了解熟悉负载均衡、高可靠等集群相关概念,搭建互联网高并发、高可靠的服务架构; 学完此...

柯西带你学编程
2018/05/22
0
0
Hadoop实战之 MapReduce

私塾在线 整体课程概览 第一部分:开始云计算之旅 第二部分:初识Hadoop 第三部分:Hadoop 环境安装部署 第四部分:Hadoop Shell 基本操作介绍 第五部分:Hadoop 分布式文件系统1 第五部分:...

linni
2014/01/08
736
0
Hadoop的辉煌还能延续多久?

Hadoop技术已经无处不在。不管是好是坏,Hadoop已经成为大数据的代名词。短短几年间,Hadoop从一种边缘技术成为事实上的标准。看来,不仅现在Hadoop是企业大数据的标准,而且在未来,它的地位...

一枚Sir
2014/08/05
240
0
什么是hadoop大数据?我又为什么要写这篇文章?

点击链接 https://my.oschina.net/ijj/blog 关注我的博客。学习更多hadoop知识。 这些天,有很多人咨询我大数据相关的一些信息,觉得大数据再未来会是一个朝阳行业,希望能尽早学会、入行,借...

隐姓埋名啊
2017/03/16
531
1
2014-11-12--Hadoop的基础学习(三)--Hadoop中MapReduce框架入门

1.MapReduce的简单概念 百度百科:MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",和他们的主要思想,都是从函数式编程语言里借来的...

查封炉台
2014/11/16
8.3K
8

没有更多内容

加载失败,请刷新页面

加载更多

Executor线程池原理与源码解读

线程池为线程生命周期的开销和资源不足问题提供了解决方 案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。 线程实现方式 Thread、Runnable、Callable //实现Runnable接口的...

小强的进阶之路
57分钟前
6
0
maven 环境隔离

解决问题 即 在 resource 文件夹下面 ,新增对应的资源配置文件夹,对应 开发,测试,生产的不同的配置内容 <resources> <resource> <directory>src/main/resources.${deplo......

之渊
今天
8
0
详解箭头函数和普通函数的区别以及箭头函数的注意事项、不适用场景

箭头函数是ES6的API,相信很多人都知道,因为其语法上相对于普通函数更简洁,深受大家的喜爱。就是这种我们日常开发中一直在使用的API,大部分同学却对它的了解程度还是不够深... 普通函数和...

OBKoro1
今天
7
0
轻量级 HTTP(s) 代理 TinyProxy

CentOS 下安装 TinyProxy yum install -y tinyproxy 启动、停止、重启 # 启动service tinyproxy start# 停止service tinyproxy stop# 重启service tinyproxy restart 相关配置 默认...

Anoyi
今天
2
0
Linux创建yum仓库

第一步、搞定自己的光盘 #创建文件夹 mkdir -p /media/cdrom #挂载光盘 mount /dev/cdrom /media/cdrom #编辑配置文件使其永久生效 vim /etc/fstab 第二步,编辑yun源 vim /ect yum.repos.d...

究极小怪兽zzz
今天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部