文档章节

从零开始最短路径学习Hadoop之02----处理气象数据的第一个MapReduce程序

brian_2017
 brian_2017
发布于 2017/01/17 08:51
字数 1568
阅读 53
收藏 0
编写一个气象数据挖掘的MapReduce程序

1. 气象数据在哪里?
    NCDC  美国国家气候数据中心
    获取数据的方式在www.hadoopbook.com里给出了,是这里 http://hadoopbook.com/code.html
    两个示例的数据在这里下载 https://github.com/tomwhite/hadoop-book/tree/master/input/ncdc/all  ,文件名分别是1901,1902
    原始记录是许多 小文件,现在已经按照年份被拼接成上面的1901, 1902这两个单独的文件。

2. 最重要的是第5节,不想看长篇大论的,可以直接看第5节就行。

3. 用Hadoop分析数据
    3.1 分析一条记录:
    0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999
    记录的含义:
    0029029070
    99999
    19010101  观察日期
    0600  观察时间
    4+64333  经度
    +023450 纬度
    F M-12
    +0005
    99999
    V020
    270
    1
    N
    0159
    1
    99999
    9
    9
    N
    000000
    1
    N
    9-0078
   1
    + 99999
    102001ADDGF108991999999999999999999

    3.2 源代码文件MaxTemperatureMapper.java
package com.ifis;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>{
    private static final int MISSING = 9999;
    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException{
        String line = value.toString();
        String year = line.substring(15,19);
        int airTemperature;
        if(line.charAt(87) == '+'){
            airTemperature = Integer.parseInt(line.substring(88, 92));
        }else{
            airTemperature = Integer.parseInt(line.substring(87, 92));
        }
        String quality = line.substring(92, 93);
        if(airTemperature != MISSING && quality.matches("[01459]")){
            output.collect(new Text(year), new IntWritable(airTemperature));
        }
    }
}

    3.3 源代码MaxTemperatureReducer.java
package com.ifis;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class MaxTemperatureReducer extends MapReduceBase
    implements Reducer<Text, IntWritable, Text, IntWritable>{
    public void reduce(Text key, Iterator<IntWritable> values, 
                       OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException{
        int maxValue = Integer.MIN_VALUE;
        while(values.hasNext()){
            maxValue = Math.max(maxValue, values.next().get());
        }
        output.collect(key, new IntWritable(maxValue));
    }
}

    3.4 源代码MaxTemperature.java
package com.ifis;

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;

public class MaxTemperature{
    public static void main(String[] args) throws IOException{
        if (args.length != 2){
            System.err.println("Usage: MaxTemperature <intput path> <output path>");
            System.exit(-1);
        }

        JobConf conf = new JobConf(MaxTemperature.class);
        conf.setJobName("Max Temperature");
        FileInputFormat.addInputPath(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        conf.setMapperClass(MaxTemperatureMapper.class);
        conf.setReducerClass(MaxTemperatureReducer.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);
        JobClient.runJob(conf);
    }
}

    3.5 编译命令:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2/p1$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar  -d ./classes/ ./src/*.java
    3.6 打jar包:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2/p1$ jar -cvf maxtemp.jar -C ./classes/ .
    3.7 用put命令将1901文件放到hdfs。
    3.8 执行:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2/p1$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar maxtemp.jar  com.ifis.MaxTemperature 1901 o4
    3.9 查看结果:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -cat o4/part-00000

4. 一个新版api的MapReducer。
    4.1 说明
    这个例子书里没有给出import的类和接口的细节。如果有些类或者接口不知道import,到这里找http://hadoop.apache.org/docs/current/api/
    从这里找到类名,然后点击进入,可以看到包的信息。
    如果是java的类和接口,在这里找http://www.javaweb.cc/JavaAPI1.6/
    注意,这个新的api里,各种类要import org.apache.hadoop.mapreduce包,而不是org.apache.hadoop.mapred包,如Context,Job等等。

    4.2 源代码
package com.ifis;

import java.io.IOException;
import java.lang.InterruptedException;
import java.lang.Iterable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;

public class NewMaxTemperature{
    static class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
        private static final int MISSING = 9999;
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
            String line = value.toString();
            String year = line.substring(15, 19);
            int airTemperature;
            if (line.charAt(87) == '+'){
                airTemperature = Integer.parseInt(line.substring(88, 92));
            }else{
                airTemperature = Integer.parseInt(line.substring(87, 92));
            }
            String quality = line.substring(92, 93);
            if (airTemperature != MISSING && quality.matches("[01459]")){
                context.write(new Text(year), new IntWritable(airTemperature));
            }
        }
    }

    static class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
            int maxValue = Integer.MIN_VALUE;
            for(IntWritable v : values){
                maxValue = Math.max(maxValue, v.get());
            }
            context.write(key, new IntWritable(maxValue));
        }
    }

    public static void main(String[] args) throws Exception{
        if (args.length != 2){
            System.err.println("Usage NewMaxTemerapture <input path> <output path>");
            System.exit(-1);
        }

        Job job = new Job();
        job.setJarByClass(NewMaxTemperature.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

    4.3 编译: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes ./src/*.java
    4.4 打包: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ jar -cvf newmaxtemp.jar -C ./classes/ .
    4.5 将1901和1902两个文件put到hdfs上,分别更名为1901.dat和1902.dat
    4.6 执行,从1901.dat和1902.dat这两个文件里找当年的最高温度: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar newmaxtemp.jar com.ifis.NewMaxTemperature 190*.dat o2
    4.7 查看结果: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -cat o2/part-r-00000

5. 将4的例子改造成mapper,reducer分开写的形式。
    假设项目的目录是p3,此目录下的目录结构和文件结构如下:
    |-- classes
    `-- src
        |-- MaxTemperature.java
        |-- MaxTemperatureMapper.java
        `-- MaxTemperatureReducer.java

    5.1 源代码MaxTemperatureMapper.java
package com.ifis;

import java.io.IOException;
import java.lang.InterruptedException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private static final int MISSING = 9999;
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
        String line = value.toString();
        String year = line.substring(15, 19);
        int airTemperature;
        if (line.charAt(87) == '+'){
            airTemperature = Integer.parseInt(line.substring(88, 92));
        }else{
            airTemperature = Integer.parseInt(line.substring(87, 92));
        }
        String quality = line.substring(92, 93);
        if (airTemperature != MISSING && quality.matches("[01459]")){
            context.write(new Text(year), new IntWritable(airTemperature));
        }
    }
}


    5.2 源代码MaxTemperatureReducer.java
package com.ifis;

import java.io.IOException;
import java.lang.InterruptedException;
import java.lang.Iterable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
                       throws IOException, InterruptedException{
        int maxValue = Integer.MIN_VALUE;
        for(IntWritable v : values){
            maxValue = Math.max(maxValue, v.get());
        }
        context.write(key, new IntWritable(maxValue));
    }
}

    5.3 源代码MaxTemperature.java
package com.ifis;

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;

public class MaxTemperature{
    public static void main(String[] args) throws Exception{
        if (args.length != 2){
            System.err.println("Usage NewMaxTemerapture <input path> <output path>");
            System.exit(-1);
        }

        Job job = new Job();
        job.setJarByClass(MaxTemperature.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        System.exit(job.waitForCompletion(true)?0:1);
    }
}

    5.4 编译: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes/ ./src/*.java
    5.5 打包: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ jar -cvf maxtemp.jar -C ./classes/ . 
    5.6 要处理的数据文件1901.dat和1902.dat在上一步已经put到hdfs,这次不需要再做这个动作了。
    5.7 执行: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar maxtemp.jar  com.ifis.MaxTemperature 190*.dat o3
    5.8 查看执行结果: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -cat o3/part-r-00000 

6. Combiner可以提高map阶段的效率。注意,求最大值和最小值可以通过combiner提升效率,但求平均值就不行了。

7. Hadoop的streaming,通过unix标准流的方式,允许用其他语言写MapReduce程序。具体细节在这里不讲了,知道有这个功能就行。我们主要关注java语言。

8. 请熟悉第5节的源代码形式,熟悉到可以手工默写出来,这是MapReducer的主要形式,对本文来说,能掌握这个就足够了。

© 著作权归作者所有

brian_2017
粉丝 3
博文 61
码字总数 145216
作品 0
私信 提问
Hadoop大数据入门到实战(第七节)- Mapreduce的使用

MapReduce是Hadoop的核心功能之一,我们首先需要弄明白MapReduce到底是个啥,是干啥子用滴才行。 什么是MapReduce MapReduce是一种可用于数据处理的编程模型,我们现在设想一个场景,你接到一...

MasterXiao
2018/08/21
0
0
Hadoop的辉煌还能延续多久?

Hadoop技术已经无处不在。不管是好是坏,Hadoop已经成为大数据的代名词。短短几年间,Hadoop从一种边缘技术成为事实上的标准。看来,不仅现在Hadoop是企业大数据的标准,而且在未来,它的地位...

一枚Sir
2014/08/05
240
0
【hadoop】16.MapReduce-简介

简介 本章节我们先来了解一些关于MapReduce的理论知识。从本章节您可以学习到:MapReduce的相关知识。 1、概念 Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析...

Areya
01/12
19
0
Hadoop中的MapReduce(5)

在MapReduce中,它也是主从结构,主节点:JobTracker,从节点:TaskTracker。主节点只有一个从节点有很多个,主节点在主机上,从节点分布到其他机器上。 JobTracker: 作用: 1、负责接收用户...

肖鋭
2014/02/23
107
0
MapReduce 实验 (一) 原理

官网 http://hadoop.apache.org/ hadoop三大组件 HDFS:分布式存储系统 https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsUserGuide.html MapReduce:分布式计算......

pcdog
2018/04/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Spring Cloud 笔记之Spring cloud config client

观察者模式它的数据的变化是被动的。 观察者模式在java中的实现: package com.hxq.springcloud.springcloudconfigclient;import org.springframework.context.ApplicationListener;i...

xiaoxiao_go
昨天
4
0
CentOS7.6中安装使用fcitx框架

内容目录 一、为什么要使用fcitx?二、安装fcitx框架三、安装搜狗输入法 一、为什么要使用fcitx? Gnome3桌面自带的输入法框架为ibus,而在使用ibus时会时不时出现卡顿无法输入的现象。 搜狗和...

技术训练营
昨天
4
0
《Designing.Data-Intensive.Applications》笔记 四

第九章 一致性与共识 分布式系统最重要的的抽象之一是共识(consensus):让所有的节点对某件事达成一致。 最终一致性(eventual consistency)只提供较弱的保证,需要探索更高的一致性保证(stro...

丰田破产标志
昨天
7
0
docker 使用mysql

1, 进入容器 比如 myslq1 里面进行操作 docker exec -it mysql1 /bin/bash 2. 退出 容器 交互: exit 3. mysql 启动在容器里面,并且 可以本地连接mysql docker run --name mysql1 --env MY...

之渊
昨天
7
0
python数据结构

1、字符串及其方法(案例来自Python-100-Days) def main(): str1 = 'hello, world!' # 通过len函数计算字符串的长度 print(len(str1)) # 13 # 获得字符串首字母大写的...

huijue
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部