文档章节

MapReduce例程-求各个部门的总工资

悟空太多啦
 悟空太多啦
发布于 2014/08/31 23:01
字数 1358
阅读 212
收藏 1

数据

     EMPNO       ENAME        JOB                   MGR   HIREDATE                      SAL         COMM      DEPTNO

      7369 SMITH      CLERK           7902 17-12月-80            800                    20
      7499 ALLEN      SALESMAN        7698 20-2月 -81           1600        300         30
      7521 WARD       SALESMAN        7698 22-2月 -81           1250        500         30
      7566 JONES      MANAGER         7839 02-4月 -81           2975                    20
      7654 MARTIN     SALESMAN        7698 28-9月 -81           1250       1400         30
      7698 BLAKE      MANAGER         7839 01-5月 -81           2850                    30
      7782 CLARK      MANAGER         7839 09-6月 -81           2450                    10
      7839 KING       PRESIDENT            17-11月-81           5000                    10
      7844 TURNER     SALESMAN        7698 08-9月 -81           1500          0         30
      7900 JAMES      CLERK           7698 03-12月-81            950                    30
      7902 FORD       ANALYST         7566 03-12月-81           3000                    20
      7934 MILLER     CLERK           7782 23-1月 -82           1300                    10

代码

package cn.kissoft.hadoop.week07;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import cn.kissoft.hadoop.util.HdfsUtil;

/**
 * Homework-01:求各个部门的总工资
 * 
 * @author wukong(jinsong.sun@139.com)
 */
public class TotalSalaryByDeptMR extends Configured implements Tool {

	public static class M extends Mapper<LongWritable, Text, Text, IntWritable> {

		@Override
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String deptno = line.substring(79).trim();
			String sal = line.substring(57, 68).trim();
			int salary = Integer.valueOf(sal);

			context.write(new Text(deptno), new IntWritable(salary));
		}
	}

	public static class R extends Reducer<Text, IntWritable, Text, IntWritable> {

		@Override
		public void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable value : values) {
				sum += value.get();
			}
			context.write(key, new IntWritable(sum));
		}
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		Job job = new Job(conf, "Job-TotalSalaryByDeptMR");
//		job.setJarByClass(this.getClass());
		
		job.setMapperClass(M.class);
		job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        
		job.setReducerClass(R.class);
		
		job.setOutputFormatClass(TextOutputFormat.class);
//		job.setOutputKeyClass(NullWritable.class); // 指定输出的KEY的格式
		job.setOutputKeyClass(Text.class); // 指定输出的KEY的格式
		job.setOutputValueClass(IntWritable.class); // 指定输出的VALUE的格式

		
		FileInputFormat.addInputPath(job, new Path(args[0])); // 输入路径
		FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径
		
		return job.waitForCompletion(true) ? 0 : 1; 
//		job.waitForCompletion(true);
//		return job.isSuccessful() ? 0 : 1;
	}

	/**
	 * 
	 * @param args hdfs://bd11:9000/user/wukong/w07/emp.txt hdfs://bd11:9000/user/wukong/w07/out01/
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		checkArgs(args);

		HdfsUtil.rm(args[1], true);
		
		Date start = new Date();
		int res = ToolRunner.run(new Configuration(), new TotalSalaryByDeptMR(), args);
		printExcuteTime(start, new Date());

		System.exit(res);
	}

	/**
	 * 判断参数个数是否正确,如果无参数运行则显示以作程序说明。
	 * 
	 * @param args
	 */
	private static void checkArgs(String[] args) {
		if (args.length != 2) {
			System.err.println("");
			System.err.println("Usage: Test_1 < input path > < output path > ");
			System.err
					.println("Example: hadoop jar ~/Test_1.jar hdfs://localhost:9000/home/james/Test_1 hdfs://localhost:9000/home/james/output");
			System.err.println("Counter:");
			System.err.println("\t" + "LINESKIP" + "\t"
					+ "Lines which are too short");
			System.exit(-1);
		}
	}

	/**
	 * 打印程序运行时间
	 * 
	 * @param start
	 * @param end
	 */
	private static void printExcuteTime(Date start, Date end) {
		DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		float time = (float) ((end.getTime() - start.getTime()) / 60000.0);
		System.out.println("任务开始:" + formatter.format(start));
		System.out.println("任务结束:" + formatter.format(end));
		System.out.println("任务耗时:" + String.valueOf(time) + " 分钟");
	}
}

运行结果

10	8750
20	6775
30	9400

控制台

14/08/31 23:01:01 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/08/31 23:01:01 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
14/08/31 23:01:01 INFO input.FileInputFormat: Total input paths to process : 1
14/08/31 23:01:02 WARN snappy.LoadSnappy: Snappy native library not loaded
14/08/31 23:01:02 INFO mapred.JobClient: Running job: job_local248108448_0001
14/08/31 23:01:02 INFO mapred.LocalJobRunner: Waiting for map tasks
14/08/31 23:01:02 INFO mapred.LocalJobRunner: Starting task: attempt_local248108448_0001_m_000000_0
14/08/31 23:01:02 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
14/08/31 23:01:02 INFO mapred.MapTask: Processing split: hdfs://bd11:9000/user/wukong/w07/emp.txt:0+1119
14/08/31 23:01:02 INFO mapred.MapTask: io.sort.mb = 100
14/08/31 23:01:02 INFO mapred.MapTask: data buffer = 79691776/99614720
14/08/31 23:01:02 INFO mapred.MapTask: record buffer = 262144/327680
14/08/31 23:01:02 INFO mapred.MapTask: Starting flush of map output
14/08/31 23:01:02 INFO mapred.MapTask: Finished spill 0
14/08/31 23:01:02 INFO mapred.Task: Task:attempt_local248108448_0001_m_000000_0 is done. And is in the process of commiting
14/08/31 23:01:02 INFO mapred.LocalJobRunner: 
14/08/31 23:01:02 INFO mapred.Task: Task 'attempt_local248108448_0001_m_000000_0' done.
14/08/31 23:01:02 INFO mapred.LocalJobRunner: Finishing task: attempt_local248108448_0001_m_000000_0
14/08/31 23:01:02 INFO mapred.LocalJobRunner: Map task executor complete.
14/08/31 23:01:02 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
14/08/31 23:01:02 INFO mapred.LocalJobRunner: 
14/08/31 23:01:02 INFO mapred.Merger: Merging 1 sorted segments
14/08/31 23:01:02 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 110 bytes
14/08/31 23:01:02 INFO mapred.LocalJobRunner: 
14/08/31 23:01:02 INFO mapred.Task: Task:attempt_local248108448_0001_r_000000_0 is done. And is in the process of commiting
14/08/31 23:01:02 INFO mapred.LocalJobRunner: 
14/08/31 23:01:02 INFO mapred.Task: Task attempt_local248108448_0001_r_000000_0 is allowed to commit now
14/08/31 23:01:02 INFO output.FileOutputCommitter: Saved output of task 'attempt_local248108448_0001_r_000000_0' to hdfs://bd11:9000/user/wukong/w07/out01
14/08/31 23:01:02 INFO mapred.LocalJobRunner: reduce > reduce
14/08/31 23:01:02 INFO mapred.Task: Task 'attempt_local248108448_0001_r_000000_0' done.
14/08/31 23:01:03 INFO mapred.JobClient:  map 100% reduce 100%
14/08/31 23:01:03 INFO mapred.JobClient: Job complete: job_local248108448_0001
14/08/31 23:01:03 INFO mapred.JobClient: Counters: 19
14/08/31 23:01:03 INFO mapred.JobClient:   File Output Format Counters 
14/08/31 23:01:03 INFO mapred.JobClient:     Bytes Written=24
14/08/31 23:01:03 INFO mapred.JobClient:   File Input Format Counters 
14/08/31 23:01:03 INFO mapred.JobClient:     Bytes Read=1119
14/08/31 23:01:03 INFO mapred.JobClient:   FileSystemCounters
14/08/31 23:01:03 INFO mapred.JobClient:     FILE_BYTES_READ=426
14/08/31 23:01:03 INFO mapred.JobClient:     HDFS_BYTES_READ=2238
14/08/31 23:01:03 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=138578
14/08/31 23:01:03 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=24
14/08/31 23:01:03 INFO mapred.JobClient:   Map-Reduce Framework
14/08/31 23:01:03 INFO mapred.JobClient:     Reduce input groups=3
14/08/31 23:01:03 INFO mapred.JobClient:     Map output materialized bytes=114
14/08/31 23:01:03 INFO mapred.JobClient:     Combine output records=0
14/08/31 23:01:03 INFO mapred.JobClient:     Map input records=12
14/08/31 23:01:03 INFO mapred.JobClient:     Reduce shuffle bytes=0
14/08/31 23:01:03 INFO mapred.JobClient:     Reduce output records=3
14/08/31 23:01:03 INFO mapred.JobClient:     Spilled Records=24
14/08/31 23:01:03 INFO mapred.JobClient:     Map output bytes=84
14/08/31 23:01:03 INFO mapred.JobClient:     Total committed heap usage (bytes)=326107136
14/08/31 23:01:03 INFO mapred.JobClient:     SPLIT_RAW_BYTES=105
14/08/31 23:01:03 INFO mapred.JobClient:     Map output records=12
14/08/31 23:01:03 INFO mapred.JobClient:     Combine input records=0
14/08/31 23:01:03 INFO mapred.JobClient:     Reduce input records=12
任务开始:2014-08-31 23:01:01
任务结束:2014-08-31 23:01:03
任务耗时:0.024416666 分钟


© 著作权归作者所有

悟空太多啦
粉丝 20
博文 86
码字总数 72074
作品 1
南京
项目经理
私信 提问
雅虎计划重构 Hadoop-MapReduce,解决性能瓶颈

最近雅虎开发者博客发了一篇介绍Hadoop重构计划的文章。因为他们发现当集群的规模达到4000台机器的时候,Hadoop遭遇到扩展性的瓶颈,目前他们正准备开始对Hadoop进行重构。 Mapreduce面临的瓶...

小编辑
2011/02/26
3.6K
3
Hadoop中的MapReduce(5)

在MapReduce中,它也是主从结构,主节点:JobTracker,从节点:TaskTracker。主节点只有一个从节点有很多个,主节点在主机上,从节点分布到其他机器上。 JobTracker: 作用: 1、负责接收用户...

肖鋭
2014/02/23
109
0
10、Mapreduce的一些场景

1、排序并且求 TOPOne 和TOPN 1、在map端的输出中,将需要排序的字段作为key。那么到达reduce时,相同的key会作为一组排在一起的数据。 注意:如果key是自己组装的javabean,那么这个javabea...

刘付kin
2018/10/29
18
0
MapReduce的预处理编程问题

求大神:刚学MapReduce,我想将一个java程序用MapReduce实行并行化,该java程序执行时先有一个读参数的预处理过程, 然后通过绝对路径读取文件进行鉴定,请问该预处理的过程应该放在MapRedu...

Logic09
2015/05/11
232
1
【Hadoop踩雷】无法上传文件?有办法!

正文之前 一鼓作气!肝死它!!!! 正文 前面都已经配置好了。我就准备试试伪分布式了!!结果??!啊哈?!?! 数据节点不见了???WTF? 所以就去找呀找~ 最后找到了两个法子。。 启动...

HustWolf
2018/06/03
0
0

没有更多内容

加载失败,请刷新页面

加载更多

java发送html模板的高逼格邮件

最近做了一个监测k8s服务pod水平伸缩发送邮件的功能(当pod的cpu/内存达到指定阈值后会水平扩展出多个pod、或者指定时间内pod数应扩展到指定数量),一开始写了个格式很low的邮件,像下面这样...

码农实战
16分钟前
6
0
php-fpm配置文件详解/MariaDB密码重置、慢查询日志

来源:https://blog.csdn.net/Powerful_Fy php-fpm主配置文件路径:/usr/local/php-fpm/etc/php-fpm.conf #位于安装php安装目录下的etc/目录中,该文件中最后一行将配置文件指向:include=/...

asnfuy
21分钟前
4
0
川普给埃尔多安和内堪尼亚胡的信

任性 https://twitter.com/netanyahu/status/1186647558401253377 https://edition.cnn.com/2019/10/16/politics/trump-erdogan-letter/index.htm...

Iridium
42分钟前
12
0
golang-mysql-原生

db.go package mainimport ("database/sql""time"_ "github.com/go-sql-driver/mysql")var (db *sql.DBdsn = "root:123456@tcp(127.0.0.1:3306)/test?charset=u......

李琼涛
今天
5
0
编程作业20191021092341

1编写一个程序,把用分钟表示的时间转换成用小时和分钟表示的时 间。使用#define或const创建一个表示60的符号常量或const变量。通过while 循环让用户重复输入值,直到用户输入小于或等于0的值...

1李嘉焘1
今天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部