文档章节

hadoop计数器、分区、序列化等

Zero零_度
 Zero零_度
发布于 2015/01/07 09:57
字数 936
阅读 126
收藏 0
package com.test;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/*
 * 手机号码  流量[类型1、类型2、类型3]
 * 13500001234 12,56,78
 * 18600001235 32,21,80
 * 15800001235 16,33,56
 * 13500001234 19,92,73
 * 18600001235 53,55,29
 * 18600001239 27,77,68
 * 
 * 计算得出
 * 手机号 类型1汇总 类型2汇总 类型3汇总
 */
public class WordCount extends Configured implements Tool {
 
 public static class Map extends Mapper<LongWritable, Text, Text, StreamWritable> {
  //避免每调用一次map就创建一次对象
  private final Text phoneNum = new Text();
  private final StreamWritable streamWritable = new StreamWritable();
  
  private String firstLine = "#_#";
  private String lastLine;
  
  public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
   String line = value.toString();
   
   //获得map输入的第一条记录
   if("#_#".equals(firstLine)) {
    firstLine = key.toString() + "\t" + line;
   }
   
   //获得map输入的最后一条记录
   lastLine = key.toString() + "\t" + line;
   
   //13500001234手机号码总共在多少行出现【自定义计数器】
   Counter helloCounter = (Counter) context.getCounter("Words", "13500001234");
   if(line.contains("13500001234")) {
    helloCounter.increment(1L);
   }
   
   String[] strs = line.split("\t");
   //手机号码
   phoneNum.set(strs[0]);
   
   //流量
   String[] stream = strs[1].split(",");
   streamWritable.set(Long.parseLong(stream[0]), Long.parseLong(stream[1]), Long.parseLong(stream[2]));
   
   context.write(phoneNum, streamWritable);
  }
  
  protected void cleanup(org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,StreamWritable>.Context context) throws IOException ,InterruptedException {
   //获得map输入的第一条记录
   System.out.println(firstLine);
   
   //获得map输出的最后一条记录
   System.out.println(lastLine);
  };
 }
 
 public static class Reduce extends Reducer<Text, StreamWritable, Text, StreamWritable> {
  //避免每调用一次reduce就创建一次对象
  private StreamWritable streamWritable = new StreamWritable();
  
  /*
   * map函数执行结束后,map输出的<k, v>一共有4个,分别是<hello, 1><you, 1>,<hello, 1>,<me, 1>
   * 分区,默认只有一个分区  job.setPartitionerClass
   * 排序 <hello, 1>,<hello, 1>,<me, 1><you, 1>
   * 分组 把相同key的value放到一个集合中 <hello, {1,1}><me, {1}><you, {1}>,每一组调用一次reduce函数
   * 归约(可选) job.setCombinerClass
   */
  public void reduce(Text key, Iterable<StreamWritable> values, Context context) throws IOException, InterruptedException {
   long stream1 = 0;
   long stream2 = 0;
   long stream3 = 0;
   
   Iterator<StreamWritable> it = values.iterator();
   while(it.hasNext()) {
    streamWritable = it.next();
    stream1 = stream1 + streamWritable.getStream1();
    stream2 = stream2 + streamWritable.getStream2();
    stream3 = stream3 + streamWritable.getStream3();
   }
   
   streamWritable.set(stream1, stream2, stream3);
   context.write(key, streamWritable);
  }
 }
 
 public int run(String[] args) throws Exception {
  Configuration conf = this.getConf();
  Job job = new Job(conf);
  job.setJarByClass(WordCount.class);
  job.setJobName(WordCount.class.getSimpleName());
  
  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  //如果没有配置,默认值是1
  job.setNumReduceTasks(1);
  
  //指定map产生的数据按照什么规则分配到不同的reduce中,如果没有配置,默认是HashPartitioner.class
  job.setPartitionerClass(MyPartitioner.class);
  
  //FileInputFormat.getSplits决定map任务数量,XxxInputFormat.RecordReader处理每一个split,得到map输入的key、value
  //默认是TextInputFormat
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  
  job.setMapperClass(Map.class);
  job.setCombinerClass(Reduce.class);
  job.setReducerClass(Reduce.class);
  
  //当reduce输出类型与map输出类型一致时,map的输出类型可以不设置
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(StreamWritable.class);
  
  //reduce的输出类型一定要设置
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(StreamWritable.class);
  
  job.waitForCompletion(true);
  
  return job.isSuccessful()?0:1;
 }
 
 public static void main(String[] args) throws Exception {
  int exit = ToolRunner.run(new WordCount(), args);
  System.exit(exit);
 }
 
}
//自定义Partitioner
class MyPartitioner extends Partitioner<Text, StreamWritable> {
 @Override
 //返回值表示,分配到第几个reduce任务中
 public int getPartition(Text key, StreamWritable value, int numPartitions) {
  //13500001234手机号码分到第1个reduce,其余的分到第二个reduce
  if("13500001234".equals(key.toString())) {
   return 0;
  } else {
   return 1;
  }
 }
}
//自定义序列化类[处理手机流量]
//Serializable:Java序列化的信息非常臃肿,比如存在层层类继承的时候,继承关系序列化出去,还需要序列化回来。
//hadoop的Writable轻量很多
class StreamWritable implements Writable {
 private long stream1;
 
 private long stream2;
 
 private long stream3;
 
 public long getStream1() {
  return stream1;
 }
 public void setStream1(long stream1) {
  this.stream1 = stream1;
 }
 public long getStream2() {
  return stream2;
 }
 public void setStream2(long stream2) {
  this.stream2 = stream2;
 }
 public long getStream3() {
  return stream3;
 }
 public void setStream3(long stream3) {
  this.stream3 = stream3;
 }
 public StreamWritable() {
  
 }
 
 public StreamWritable(long stream1, long stream2, long stream3) {
  this.set(stream1, stream2, stream3);
 }
 
 public void set(long stream1, long stream2, long stream3) {
  this.stream1 = stream1;
  this.stream2 = stream2;
  this.stream3 = stream3;
 }
 
 @Override
 public void write(DataOutput out) throws IOException {
  out.writeLong(stream1);//写出顺序和读入顺序一一对应
  out.writeLong(stream2);
  out.writeLong(stream3);
 }
 @Override
 public void readFields(DataInput in) throws IOException {
  this.stream1 = in.readLong();//写出顺序和读入顺序一一对应
  this.stream2 = in.readLong();
  this.stream3 = in.readLong();
 }
 
 //输出的时候会调用toString方法
 @Override
 public String toString() {
  return this.stream1+"\t"+this.stream2+"\t"+this.stream3;
 }
}

 

© 著作权归作者所有

共有 人打赏支持
Zero零_度
粉丝 67
博文 1245
码字总数 252866
作品 0
程序员
加载中

评论(1)

猪兜兜
猪兜兜
13好东西
Apache Spark 官方文档 翻译 - 编程指南

最近用Apache Spark 处理一些大数据,学了spark官方英文文档,顺便翻译了方便学习。 spark 版本 2.2.0. 翻译官方文档原地址: http://spark.apache.org/docs/latest/rdd-programming-guide....

___k先生
2017/12/07
0
0
Spark编程指南V1.4.0(翻译)

Spark编程指南V1.4.0 · 简单介绍 · 接入Spark · Spark初始化 · 使用Shell · 在集群上部署代码 · 弹性分布式数据集 · 并行集合(Parallelized Collections) · 其它数据集 · RDD的操作...

技术mix呢
2017/10/12
0
0
bboss hadoop hdfs大数据抽取工具

bboss大数据抽取工具功能特点如下: 实现db到hadoop hdfs数据导入功能,提供高效的分布式并行处理能力,可以采用数据库分区、按字段分区、分页方式并行批处理抽取db数据到hdfs文件系统中;能...

bboss
2015/08/01
0
0
bboss v4.0.9 发布,大数据抽取工具

bboss 大数据抽取工具 4.0.9 发布 相较上一版本(v4.0.8),v4.0.9增加了以下特性: 按照按日期字段切分任务,抽取db数据到hdfs,可以在第一次切分任务的基础上,将日期范围任务块进一步切分...

bboss
2015/08/29
989
2
bboss 大数据抽取工具 4.0.8 发布

bboss大数据抽取工具4.0.8发布 bboss大数据抽取工具功能特点如下: 实现db到hadoop hdfs数据导入功能,提供高效的分布式并行处理能力,可以采用数据库分区、按字段分区、分页方式并行批处理抽...

bboss
2015/08/01
2.2K
5

没有更多内容

加载失败,请刷新页面

加载更多

下一页

(一)软件测试专题——之Linux常用命令篇01

本文永久更新地址:https://my.oschina.net/bysu/blog/1931063 【若要到岸,请摇船:开源中国 不最醉不龟归】 Linux的历史之类的很多书籍都习惯把它的今生来世,祖宗十八代都扒出来,美其名曰...

不最醉不龟归
20分钟前
3
0
蚂蚁金服Java开发三面

8月20号晚上8点进行了蚂蚁金服Java开发岗的第三面,下面开始: 自我介绍(要求从实践过程以及技术背景角度着重介绍) 实习经历,说说你在公司实习所做的事情,学到了什么 关于你们的交易平台...

edwardGe
27分钟前
7
0
TypeScript基础入门 - 函数 - this(三)

转载 TypeScript基础入门 - 函数 - this(三) 项目实践仓库 https://github.com/durban89/typescript_demo.gittag: 1.2.4 为了保证后面的学习演示需要安装下ts-node,这样后面的每个操作都能...

durban
36分钟前
0
0
Spark core基础

Spark RDD的五大特性 RDD是由一系列的Partition组成的,如果Spark计算的数据是在HDFS上那么partition个数是与block数一致(大多数情况) RDD是有一系列的依赖关系,有利于Spark计算的容错 RDD中每...

张泽立
44分钟前
0
0
如何搭建Keepalived+Nginx+Tomcat高可用负载均衡架构

一.概述 初期的互联网企业由于业务量较小,所以一般单机部署,实现单点访问即可满足业务的需求,这也是最简单的部署方式,但是随着业务的不断扩大,系统的访问量逐渐的上升,单机部署的模式已...

Java大蜗牛
59分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部