文档章节

学习MapReduce(四),分区Partition的使用

静下来想想静静
 静下来想想静静
发布于 2017/03/12 17:16
字数 869
阅读 12
收藏 0

    2017.3.12,MapReduce一共有Map阶段,Reduce阶段两个阶段,但是在这两个阶段中间,有Shuffle阶段,Shuffle阶段分很多阶段:比如Combiner,Partition等等,可以用这样的结构来说:

        Map -->Partition-->Combiner(如果有的话)-->Reduce ---->输出到HDFS上<<<目前就知道这些阶段,可能还有更多更细节的阶段,以后补充>>>    

        其实我们如果不写自己的Partition,Hadoop会根据自己的一个类来进行分区-->HashPartitoner.但是很多案例或者业务需要我们自己来进行分区。这时候我们就要写自己的Partition类,通过继承Partition来完成。

    在本地会先走Map,Map的输出会当作Partition的输入。然后,根据业务需求写自己的逻辑代码。

代码如下:

----------------------->>>>自己写的序列化类

 

package lcPhoneFolw;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class PhoneFlow implements WritableComparable<PhoneFlow>{
	private long upload;
	private long download;
	private long sumload;
	
	public PhoneFlow(){}
	
	@Override
	public String toString() {
		
		return upload+"\t"+download+"\t"+sumload;
	}

	public PhoneFlow(long upload, long download) {
		this.upload = upload;
		this.download = download;
		this.sumload = upload+download;
	}

	public long getUpload() {
		return upload;
	}

	public void setUpload(long upload) {
		this.upload = upload;
	}

	public long getDownload() {
		return download;
	}

	public void setDownload(long download) {
		this.download = download;
	}

	public long getSumload() {
		return sumload;
	}

	public void setSumload(long sumload) {
		this.sumload = sumload;
	}
	//序列化
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		out.writeLong(upload);
		out.writeLong(download);
		out.writeLong(sumload);
	}
	//反序列化(应该是和序列化的顺序一样,如果不一样应该是会出问题。没有研究过)
	public void readFields(DataInput in) throws IOException {
		// TODO Auto-generated method stub
		upload = in.readLong();
		download = in.readLong();
		sumload = in.readLong();
	}


	//自定义比较器,会在map输出结果 PhoneFlow为K时,自动按照这个比较器传给reduce
	public int compareTo(PhoneFlow o) {

		return this.sumload>o.getSumload() ? -1 : 1 ;
	}



}

--------->>>>>>自己写的Partition类

 

package lcPhoneFolw;

import java.util.*;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
 * 按照业务需求,我们需要针对手机号进行分区。这里传入的k,v值是map阶段的输出k,v
 * @author admin
 *
 */
public class PhonePartitioner extends Partitioner<Text, PhoneFlow>{
	//测试,所以只写了一个分区规则,就是匹配1380的手机号 分成一个区
	public static Map<String,Integer> map = new HashMap<String,Integer>();
	static{
		map.put("1380",0);
	}
	@Override
	public int getPartition(Text key, PhoneFlow values, int numPartitions) {
		String number = key.toString().substring(0,4);
		Integer partition = map.get(number);
		return partition==null?1:partition;
	}

}

------------------------->>>>>测试类(这个类中我是通过内部类来写的Mapper类和Reducer类)

package lcPhoneFolw;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 这个类是为了测试Partition的
 * @author admin
 *
 */
public class PhoneAllPartitionDemo {
	static class PhoneMapper3 extends Mapper<LongWritable,Text,Text,PhoneFlow>{
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] strs = line.split("\t");
			Text phoneNumber = new Text(strs[0]);
			long upload =  Long.valueOf(strs[1]);
			long download =  Long.valueOf(strs[2]);
			PhoneFlow pf = new PhoneFlow(upload,download);
			//将手机号和PhoneFlow传给PhonePartitioner来进行分区
			context.write(phoneNumber, pf);
		}
	}
	
	static class PhoneReducer3 extends Reducer<Text, PhoneFlow, Text, PhoneFlow>{
		
		@Override
		protected void reduce(Text key, Iterable<PhoneFlow> values,Context context)
				throws IOException, InterruptedException {
			context.write(key,values.iterator().next());
		}
	}
	/**
	 * main方法里有些变化,得定义Partition类和开几个任务
	 * @param args
	 */
	public static void main(String[] args)throws Exception {
		Configuration conf = new Configuration();
		
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(PhoneAllPartitionDemo.class);
		
		//设定分区类和reduce开启几个任务
		job.setPartitionerClass(PhonePartitioner.class);
		job.setNumReduceTasks(2);
		
		//下面就是mapper和Reducer的设定
		job.setMapperClass(PhoneMapper3.class);
		job.setReducerClass(PhoneReducer3.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(PhoneFlow.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(PhoneFlow.class);
		
		FileInputFormat.setInputPaths(job,new Path(args[0]));
		FileOutputFormat.setOutputPath(job,new Path(args[1]));
		
		job.waitForCompletion(true);

	}
}

运行测试类,得到的文件有:

    文件数量从默认的1个变成了两个了。这是因为在Partition类中定义了两个分区,一个1380号段,一个非1380号段的。打开这两个文件发现果然是这样子滴。

    如上就是Partition的使用,谢谢观看

 

© 著作权归作者所有

静下来想想静静
粉丝 2
博文 14
码字总数 12348
作品 0
焦作
私信 提问
Shuffle对MapReduce性能调优

Shuffle对MapReduce性能调优: Shuffle和排序 MapReduce确保每一个reduce的输出都按键排序,系统执行排序的过程---------将map输出作为输入传给reduce--------称为shuffle Shuffle过程是Map...

片刻
2015/11/09
163
0
Hadoop学习笔记:数据分析引擎Hive

概述 Hive是一个构建在Hadoop之上的数据仓库,和传统的数据仓库一样主要用来访问和管理数据,提供了类SQL查询语言;和传统数据仓库不一样的是可以处理超大规模的数据,可扩展性和容错性非常强...

GaryBigPig
01/16
0
0
《Hadoop权威指南》书摘-MapReduce概述

转载请注明出处:http://wangnan.tech 简书:http://www.jianshu.com/u/244399b1d776 MapReduce是一种可用于数据处理的编程模型,MapReduce程序本质上是并行运行的,因此可以将大规模数据分析...

GhostStories
2018/07/23
0
0
大数据教程(8.6)yarn客户端提交job的流程梳理和总结&自定义partition编程

上一篇博客博主分享了mapreduce的并行原理,本篇博客将继续分享yarn客户端提交job的流程和自定义partition编程。 一、yarn客户端提交job的流程 二、自定义partition编程 FlowBean(输出结果类...

em_aaron
2018/11/28
69
0
一起学Hadoop——MapReduce原理

一致性Hash算法。 1、求出不同服务器的哈希值,然后映射到一个范围为0—232-1的数值空间的圆环中,即将首(0)和尾(232-1)相接的圆环,如图1。 图1 2、当有一个李四的用户访问时,就会给该用户...

小七奇奇
2018/08/25
0
0

没有更多内容

加载失败,请刷新页面

加载更多

CentOS7.6中安装使用fcitx框架

内容目录 一、为什么要使用fcitx?二、安装fcitx框架三、安装搜狗输入法 一、为什么要使用fcitx? Gnome3桌面自带的输入法框架为ibus,而在使用ibus时会时不时出现卡顿无法输入的现象。 搜狗和...

技术训练营
昨天
5
0
《Designing.Data-Intensive.Applications》笔记 四

第九章 一致性与共识 分布式系统最重要的的抽象之一是共识(consensus):让所有的节点对某件事达成一致。 最终一致性(eventual consistency)只提供较弱的保证,需要探索更高的一致性保证(stro...

丰田破产标志
昨天
8
0
docker 使用mysql

1, 进入容器 比如 myslq1 里面进行操作 docker exec -it mysql1 /bin/bash 2. 退出 容器 交互: exit 3. mysql 启动在容器里面,并且 可以本地连接mysql docker run --name mysql1 --env MY...

之渊
昨天
10
0
python数据结构

1、字符串及其方法(案例来自Python-100-Days) def main(): str1 = 'hello, world!' # 通过len函数计算字符串的长度 print(len(str1)) # 13 # 获得字符串首字母大写的...

huijue
昨天
6
0
PHP+Ajax微信手机端九宫格抽奖实例

PHP+Ajax结合lottery.js制作的一款微信手机端九宫格抽奖实例,抽奖完成后有收货地址添加表单出现。支持可以设置中奖概率等。 奖品列表 <div class="lottery_list clearfix" id="lottery"> ......

ymkjs1990
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部