文档章节

Hadoop中的分区(13)

肖鋭
 肖鋭
发布于 2014/03/04 21:24
字数 459
阅读 52
收藏 0
package partition;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class KpiApp {
	public static final String INPUT_PATH = "hdfs://hadoop:9000/files/HTTP_20130313143750.dat";
	public static final String OUTPUT_PATH = "hdfs://hadoop:9000/files/format";
	public static void main(String[] args)throws Exception {
		Configuration conf = new Configuration();
		existsFile(conf);
		Job job = new Job(conf, KpiApp.class.getName());
		//打成Jar在Linux运行
		job.setJarByClass(KpiApp.class);
		
		//1.1
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		job.setInputFormatClass(TextInputFormat.class);
		
		//1.2
		job.setMapperClass(MyMapper.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(KpiWritable.class);
		
		//1.3 自定义分区
		job.setPartitionerClass(KpiPartition.class);
		job.setNumReduceTasks(2);
		
		//1.4 排序分组
		//1.5 聚合
		
		//2.1
		
		//2.2
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(KpiWritable.class);
		
		//2.3
		FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
		job.setOutputFormatClass(TextOutputFormat.class);
		
		job.waitForCompletion(true);
	}
	private static void existsFile(Configuration conf) throws IOException,
			URISyntaxException {
		FileSystem fs = FileSystem.get(new URI(OUTPUT_PATH),conf);
		if(fs.exists(new Path(OUTPUT_PATH))){
			fs.delete(new Path(OUTPUT_PATH), true);
		}
	}
	static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable>{

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String string = value.toString();
			String[] split = string.split("\t");
			String phone = split[1];
			Text key2 = new Text();
			key2.set(phone);
			
			KpiWritable v2= new KpiWritable();
			v2.set(split[6],split[7],split[8],split[9]);
			context.write(key2, v2);
		}
		
	}
	static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable>{

		@Override
		protected void reduce(Text key2, Iterable<KpiWritable> values,Context context)
				throws IOException, InterruptedException {
				long upPackNum = 0L;
				long downPackNum = 0L;
				long upPayLoad = 0L;
				long downPayLoad = 0L;
				for(KpiWritable writable : values){
					upPackNum += writable.upPackNum;
					downPackNum += writable.downPackNum;
					upPayLoad += writable.upPayLoad;
					downPayLoad += writable.downPayLoad;
				}
				KpiWritable value3 = new KpiWritable();
				value3.set(String.valueOf(upPackNum), String.valueOf(downPackNum), String.valueOf(upPayLoad), String.valueOf(downPayLoad));
				context.write(key2, value3);
		}
	}
}
class KpiWritable implements Writable{
	long upPackNum;
	long downPackNum;
	long upPayLoad;
	long downPayLoad;
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(this.upPackNum);
		out.writeLong(this.downPackNum);
		out.writeLong(this.upPayLoad);
		out.writeLong(this.downPayLoad);
	}

	public void set(String string, String string2, String string3,
			String string4) {
		this.upPackNum = Long.parseLong(string);
		this.downPackNum = Long.parseLong(string2);
		this.upPayLoad = Long.parseLong(string3);
		this.downPayLoad = Long.parseLong(string4);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.upPackNum = in.readLong();
		this.downPackNum = in.readLong();
		this.upPayLoad = in.readLong();
		this.downPayLoad = in.readLong();
	}

	@Override
	public String toString() {
		return  upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad;
	}
}
class KpiPartition extends Partitioner<Text, KpiWritable>{

	@Override
	public int getPartition(Text key, KpiWritable value, int numPartitions) {
		String string = key.toString();
		return string.length()==11?0:1;
	}
}



                                                                    Name:Xr
                                                                    Date:2014-03-04 21:23

  Paritioner是Hashpartitioner的基类,如果需要定制Partitioner也需要继承该类。

  HashPartitioner是MapReduce的默认Partitioner。

© 著作权归作者所有

肖鋭
粉丝 10
博文 62
码字总数 29531
作品 0
朝阳
程序员
私信 提问
Hive从入门到实战视频教程【40讲全】

1、Hive是什么,Hive的体系结构,Hive和Hadoop的关系 2、Hive的元数据存储、Hive的数据存储、Hive和RDBMS的区别 3、JDK安装 4、Hadoop集群搭建-1 5、Hadoop集群搭建-2 6、Hadoop集群搭建-3 ...

tom先生
2014/11/26
58
0
hive性能调优

limit限制调整 --因为使用limit语句时候,是先执行整个查询语句,然后再返回部分结果的 set hive.limit.optimize.enable=true; set hive.limit.row.max.size=10000; set hive.limit.optimiz...

aibati2008
2016/04/20
194
0
Debian7.4 安装及配置

 准备学习Hadoop,想把自己学习的过程,遇到的问题以及问题的解决方案一起共享给大家。学习Hadoop技术,首先要做的就是搭建Hadoop环境,由于手里只有一台古董级笔记本,也只能用这个笔记本...

RickyLau
2014/04/28
337
0
Hive性能调优

1.limit限制调整--因为使用limit语句时候,是先执行整个查询语句,然后再返回部分结果的set hive.limit.optimize.enable=true;set hive.limit.row.max.size=10000;set hive.limit.optimize.l...

qhaiyan
2016/12/03
44
0
hadoop伪分布式环境搭建:linux操作系统安装图解

本篇文章是接上一篇《新手入门篇:虚拟机搭建hadoop环境的详细步骤》,上一篇有人问怎么没写hadoop安装。在文章开头就已经说明了,hadoop安装会在后面写到,因为整个系列的文章涉及到每一步的...

adnb34g
2018/07/30
0
0

没有更多内容

加载失败,请刷新页面

加载更多

新海软件邮政市场监管综合信息平台

二、系统功能 该平台包括邮政普遍服务管理、快递管理两大系统,涵盖了地图定位、普服信息、快递信息、GIS管理、网格管理、视频监控、数据分析(BI)、系统设置等八大模块,全面反映了区域邮政...

neocean
昨天
177
0
【微记忆】用户隐私政策与条款

微记忆尊重并保护所有注册用户的个人隐私权。为了给您提供更准确、更贴心的服务,微记忆会按照本隐私权政策的规定储存并使用您的个人信息。微记忆承诺将以高度严格的审慎义务对待这些信息。除...

微记忆
昨天
101
0
两周自制脚本语言-第7天 添加函数功能

第7天 添加函数功能 基本的函数定义与调用执行、引入闭包使Stone语言可以将变量赋值为函数,或将函数作为参数传递给其他函数 有些函数将有返回值的归为函数,没有返回值的归为子程序 7.1 扩充...

果汁分你一半
昨天
105
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部