HADOOP 在 JAVA 运行

原创
2017/04/14 22:01
阅读数 46

1. Mapper

2. Reducer

 

    HADOOP 的 Maven依赖包:

        <dependencies>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.8.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>2.8.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-core</artifactId>
                <version>2.8.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
                <version>2.8.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-common</artifactId>
                <version>2.8.0</version>
            </dependency>
            <dependency>
                <groupId>jdk.tools</groupId>
                <artifactId>jdk.tools</artifactId>
                <version>1.7</version>
                <scope>system</scope>
                <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
            </dependency>
        </dependencies>

MaxTemperatureMapper类:

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	
	//缺失
	private static final int MISSING = 9999;
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		//转换text类型为String类型
		String line = value.toString();
		//提取出年份
		String year = line.substring(15, 19);
		//气温变量
		int airTemperature;
		//判断符号
		if (line.charAt(87) == '+') { // parseInt doesn't like leading plus
			airTemperature = Integer.parseInt(line.substring(88, 92));
		} else {
			airTemperature = Integer.parseInt(line.substring(87, 92));
		}
		//质量
		String quality = line.substring(92, 93);
		//将有效数据写到map的上下文中,注意类型务必要和泛型声明一致
		if (airTemperature != MISSING && quality.matches("[01459]")) {
			context.write(new Text(year), new IntWritable(airTemperature));
		}
	}
}


    MaxTemperatureReducer类:

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
	@Override
	public void reduce(Text key, Iterable<IntWritable> values, Context context)
			throws IOException, InterruptedException {
		//定义最大值变量
		int maxValue = Integer.MIN_VALUE;
		for (IntWritable value : values) {
			maxValue = Math.max(maxValue, value.get());
		}
		//将reduce的输出写入到context中
		context.write(key, new IntWritable(maxValue));
	}
}

MaxTemperature类:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemperature {
	public static void main(String[] args) throws Exception {
		if (args.length != 2) {
			System.err.println("Usage: MaxTemperature <input path> <output path>");
			System.exit(-1);
		}
		/*FileSystem fs = FileSystem.get(null);
		fs.open(null);*/
		//作业,每一次 map + reduce 过程就是一次作业
		Job job = new Job();
		job.setJarByClass(MaxTemperature.class);
		//设置作业名称,便于调试
		job.setJobName("Max temperature");
		//添加输入路径,可以添加多个路径,可以是文件目录(不递归)或具体文件
		FileInputFormat.addInputPath(job, new Path(args[0]));
		//设置输出路径,只有一个,而且不能存在
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.setMapperClass(MaxTemperatureMapper.class);
		job.setReducerClass(MaxTemperatureReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		//等待作业的完成
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}


    程序导出成jar包:HadoopDemo.jar
    查看jar文件
    在本地文件系统上执行程序
        $>set HADOOP_CLASSPATH=F:\Downloads\hadoop\HadoopDemo.jar //win7
        $>hadoop -Xmx1000m com.wgy.hadoop.MaxTemperature file:///F:\Downloads\hadoop\ncda_data\19*.gz F:\Downloads\hadoop\output
        $>export HADOOP_CLASSPATH=hadoop-xxxxx.jar //Linux
        $>hadoop com.wgy.hadoop.MaxTemperature input/ncdc/sample.txt output
    
    在HDFS集群上执行程序
        $>通过共享文件夹将jar文件发送到ubuntu主机中
        $>将天气数据上传到hdfs文件系统
        $>hadoop jar xxx.jar /aa /xxx

        hadoop fs -mkdir /usr/wgy/ncda_data
        hadoop fs -put /mnt/hgfs/F/Downloads/hadoop/ncda_data/19*.gz /usr/wgy/ncda_data
        cp /mnt/hgfs/F/Downloads/hadoop/HadoopDemo.jar /Downloads
        hadoop jar /Downloads/HadoopDemo.jar /usr/wgy/ncda_data /usr/wgy/out
        hadoop fs -cat /usr/wgy/out/part-r-00000
            1901 317 
            1920 244

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部