1. Mapper
2. Reducer
HADOOP 的 Maven依赖包:
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.7</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
</dependencies>
MaxTemperatureMapper类:
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
//缺失
private static final int MISSING = 9999;
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//转换text类型为String类型
String line = value.toString();
//提取出年份
String year = line.substring(15, 19);
//气温变量
int airTemperature;
//判断符号
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
//质量
String quality = line.substring(92, 93);
//将有效数据写到map的上下文中,注意类型务必要和泛型声明一致
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
MaxTemperatureReducer类:
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
//定义最大值变量
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
//将reduce的输出写入到context中
context.write(key, new IntWritable(maxValue));
}
}
MaxTemperature类:
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
/*FileSystem fs = FileSystem.get(null);
fs.open(null);*/
//作业,每一次 map + reduce 过程就是一次作业
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
//设置作业名称,便于调试
job.setJobName("Max temperature");
//添加输入路径,可以添加多个路径,可以是文件目录(不递归)或具体文件
FileInputFormat.addInputPath(job, new Path(args[0]));
//设置输出路径,只有一个,而且不能存在
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//等待作业的完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
程序导出成jar包:HadoopDemo.jar
查看jar文件
在本地文件系统上执行程序
$>set HADOOP_CLASSPATH=F:\Downloads\hadoop\HadoopDemo.jar //win7
$>hadoop -Xmx1000m com.wgy.hadoop.MaxTemperature file:///F:\Downloads\hadoop\ncda_data\19*.gz F:\Downloads\hadoop\output
$>export HADOOP_CLASSPATH=hadoop-xxxxx.jar //Linux
$>hadoop com.wgy.hadoop.MaxTemperature input/ncdc/sample.txt output
在HDFS集群上执行程序
$>通过共享文件夹将jar文件发送到ubuntu主机中
$>将天气数据上传到hdfs文件系统
$>hadoop jar xxx.jar /aa /xxx
hadoop fs -mkdir /usr/wgy/ncda_data
hadoop fs -put /mnt/hgfs/F/Downloads/hadoop/ncda_data/19*.gz /usr/wgy/ncda_data
cp /mnt/hgfs/F/Downloads/hadoop/HadoopDemo.jar /Downloads
hadoop jar /Downloads/HadoopDemo.jar /usr/wgy/ncda_data /usr/wgy/out
hadoop fs -cat /usr/wgy/out/part-r-00000
1901 317
1920 244