文档章节

hadoop计数器、分区、序列化等

Zero零_度
 Zero零_度
发布于 2015/01/07 09:57
字数 936
阅读 122
收藏 0
点赞 1
评论 1
package com.test;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/*
 * 手机号码  流量[类型1、类型2、类型3]
 * 13500001234 12,56,78
 * 18600001235 32,21,80
 * 15800001235 16,33,56
 * 13500001234 19,92,73
 * 18600001235 53,55,29
 * 18600001239 27,77,68
 * 
 * 计算得出
 * 手机号 类型1汇总 类型2汇总 类型3汇总
 */
public class WordCount extends Configured implements Tool {
 
 public static class Map extends Mapper<LongWritable, Text, Text, StreamWritable> {
  //避免每调用一次map就创建一次对象
  private final Text phoneNum = new Text();
  private final StreamWritable streamWritable = new StreamWritable();
  
  private String firstLine = "#_#";
  private String lastLine;
  
  public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
   String line = value.toString();
   
   //获得map输入的第一条记录
   if("#_#".equals(firstLine)) {
    firstLine = key.toString() + "\t" + line;
   }
   
   //获得map输入的最后一条记录
   lastLine = key.toString() + "\t" + line;
   
   //13500001234手机号码总共在多少行出现【自定义计数器】
   Counter helloCounter = (Counter) context.getCounter("Words", "13500001234");
   if(line.contains("13500001234")) {
    helloCounter.increment(1L);
   }
   
   String[] strs = line.split("\t");
   //手机号码
   phoneNum.set(strs[0]);
   
   //流量
   String[] stream = strs[1].split(",");
   streamWritable.set(Long.parseLong(stream[0]), Long.parseLong(stream[1]), Long.parseLong(stream[2]));
   
   context.write(phoneNum, streamWritable);
  }
  
  protected void cleanup(org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,StreamWritable>.Context context) throws IOException ,InterruptedException {
   //获得map输入的第一条记录
   System.out.println(firstLine);
   
   //获得map输出的最后一条记录
   System.out.println(lastLine);
  };
 }
 
 public static class Reduce extends Reducer<Text, StreamWritable, Text, StreamWritable> {
  //避免每调用一次reduce就创建一次对象
  private StreamWritable streamWritable = new StreamWritable();
  
  /*
   * map函数执行结束后,map输出的<k, v>一共有4个,分别是<hello, 1><you, 1>,<hello, 1>,<me, 1>
   * 分区,默认只有一个分区  job.setPartitionerClass
   * 排序 <hello, 1>,<hello, 1>,<me, 1><you, 1>
   * 分组 把相同key的value放到一个集合中 <hello, {1,1}><me, {1}><you, {1}>,每一组调用一次reduce函数
   * 归约(可选) job.setCombinerClass
   */
  public void reduce(Text key, Iterable<StreamWritable> values, Context context) throws IOException, InterruptedException {
   long stream1 = 0;
   long stream2 = 0;
   long stream3 = 0;
   
   Iterator<StreamWritable> it = values.iterator();
   while(it.hasNext()) {
    streamWritable = it.next();
    stream1 = stream1 + streamWritable.getStream1();
    stream2 = stream2 + streamWritable.getStream2();
    stream3 = stream3 + streamWritable.getStream3();
   }
   
   streamWritable.set(stream1, stream2, stream3);
   context.write(key, streamWritable);
  }
 }
 
 public int run(String[] args) throws Exception {
  Configuration conf = this.getConf();
  Job job = new Job(conf);
  job.setJarByClass(WordCount.class);
  job.setJobName(WordCount.class.getSimpleName());
  
  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  //如果没有配置,默认值是1
  job.setNumReduceTasks(1);
  
  //指定map产生的数据按照什么规则分配到不同的reduce中,如果没有配置,默认是HashPartitioner.class
  job.setPartitionerClass(MyPartitioner.class);
  
  //FileInputFormat.getSplits决定map任务数量,XxxInputFormat.RecordReader处理每一个split,得到map输入的key、value
  //默认是TextInputFormat
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  
  job.setMapperClass(Map.class);
  job.setCombinerClass(Reduce.class);
  job.setReducerClass(Reduce.class);
  
  //当reduce输出类型与map输出类型一致时,map的输出类型可以不设置
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(StreamWritable.class);
  
  //reduce的输出类型一定要设置
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(StreamWritable.class);
  
  job.waitForCompletion(true);
  
  return job.isSuccessful()?0:1;
 }
 
 public static void main(String[] args) throws Exception {
  int exit = ToolRunner.run(new WordCount(), args);
  System.exit(exit);
 }
 
}
//自定义Partitioner
class MyPartitioner extends Partitioner<Text, StreamWritable> {
 @Override
 //返回值表示,分配到第几个reduce任务中
 public int getPartition(Text key, StreamWritable value, int numPartitions) {
  //13500001234手机号码分到第1个reduce,其余的分到第二个reduce
  if("13500001234".equals(key.toString())) {
   return 0;
  } else {
   return 1;
  }
 }
}
//自定义序列化类[处理手机流量]
//Serializable:Java序列化的信息非常臃肿,比如存在层层类继承的时候,继承关系序列化出去,还需要序列化回来。
//hadoop的Writable轻量很多
class StreamWritable implements Writable {
 private long stream1;
 
 private long stream2;
 
 private long stream3;
 
 public long getStream1() {
  return stream1;
 }
 public void setStream1(long stream1) {
  this.stream1 = stream1;
 }
 public long getStream2() {
  return stream2;
 }
 public void setStream2(long stream2) {
  this.stream2 = stream2;
 }
 public long getStream3() {
  return stream3;
 }
 public void setStream3(long stream3) {
  this.stream3 = stream3;
 }
 public StreamWritable() {
  
 }
 
 public StreamWritable(long stream1, long stream2, long stream3) {
  this.set(stream1, stream2, stream3);
 }
 
 public void set(long stream1, long stream2, long stream3) {
  this.stream1 = stream1;
  this.stream2 = stream2;
  this.stream3 = stream3;
 }
 
 @Override
 public void write(DataOutput out) throws IOException {
  out.writeLong(stream1);//写出顺序和读入顺序一一对应
  out.writeLong(stream2);
  out.writeLong(stream3);
 }
 @Override
 public void readFields(DataInput in) throws IOException {
  this.stream1 = in.readLong();//写出顺序和读入顺序一一对应
  this.stream2 = in.readLong();
  this.stream3 = in.readLong();
 }
 
 //输出的时候会调用toString方法
 @Override
 public String toString() {
  return this.stream1+"\t"+this.stream2+"\t"+this.stream3;
 }
}

 

© 著作权归作者所有

共有 人打赏支持
Zero零_度
粉丝 66
博文 734
码字总数 252800
作品 0
程序员
加载中

评论(1)

猪兜兜
猪兜兜
13好东西
Apache Spark 官方文档 翻译 - 编程指南

最近用Apache Spark 处理一些大数据,学了spark官方英文文档,顺便翻译了方便学习。 spark 版本 2.2.0. 翻译官方文档原地址: http://spark.apache.org/docs/latest/rdd-programming-guide....

___k先生 ⋅ 2017/12/07 ⋅ 0

Spark编程指南V1.4.0(翻译)

Spark编程指南V1.4.0 · 简单介绍 · 接入Spark · Spark初始化 · 使用Shell · 在集群上部署代码 · 弹性分布式数据集 · 并行集合(Parallelized Collections) · 其它数据集 · RDD的操作...

技术mix呢 ⋅ 2017/10/12 ⋅ 0

bboss v4.0.9 发布,大数据抽取工具

bboss 大数据抽取工具 4.0.9 发布 相较上一版本(v4.0.8),v4.0.9增加了以下特性: 按照按日期字段切分任务,抽取db数据到hdfs,可以在第一次切分任务的基础上,将日期范围任务块进一步切分...

bboss ⋅ 2015/08/29 ⋅ 2

bboss 大数据抽取工具 4.0.8 发布

bboss大数据抽取工具4.0.8发布 bboss大数据抽取工具功能特点如下: 实现db到hadoop hdfs数据导入功能,提供高效的分布式并行处理能力,可以采用数据库分区、按字段分区、分页方式并行批处理抽...

bboss ⋅ 2015/08/01 ⋅ 5

Hadoop工作原理学习笔记

应用开发 主要知识点如下: Configuration类(支持overwrite,variable $) 测试(mock单元测试,本地测试,集群测试) Tool, ToolRunner 集群测试(package, 启动job, Job web UI for namen...

蓝狐乐队 ⋅ 2014/02/21 ⋅ 0

HIVE与mysql的关系

Hive是一个基于Hadoop的数据仓库平台。通过hive,我们可以方便地进行ETL的工作。hive定义了一个类似于SQL的查询语言:HQL,能 够将用户编写的QL转化为相应的Mapreduce程序基于Hadoop执行。 ...

脸大的都是胖纸 ⋅ 2015/04/22 ⋅ 0

hive-mysql的关系(Hive集成Mysql作为元数据)

Hive是一个基于Hadoop的数据仓库平台。通过hive,我们可以方便地进行ETL的工作。hive定义了一个类似于SQL的查询语言:HQL,能 够将用户编写的QL转化为相应的Mapreduce程序基于Hadoop执行。 ...

cookqq ⋅ 2014/01/04 ⋅ 0

Spark中的编程模型

Spark中的基本概念 在Spark中,有下面的基本概念。 Application:基于Spark的用户程序,包含了一个driver program和集群中多个executor Driver Program:运行Application的main()函数并创建S...

闪电 ⋅ 2016/07/28 ⋅ 0

MapReduce 计数器简介

1、计数器简介 在许多情况下,一个用户需要了解待分析的数据,尽管这并非所要执行的分析任务 的核心内容。以统计数据集中无效记录数目的任务为例,如果发现无效记录的比例 相当高,那么就需要...

xrzs ⋅ 2014/06/09 ⋅ 3

Spark编程指南《Spark 官方文档》

《Spark 官方文档》Spark编程指南 spark-1.6.0 [原文地址] Spark编程指南 概述 总体上来说,每个Spark应用都包含一个驱动器(driver)程序,驱动器运行用户的main函数,并在集群上执行各种并...

openthings ⋅ 2016/05/29 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

NFS介绍 NFS服务端安装配置 NFS配置选项

NFS介绍 NFS是Network File System的缩写;这个文件系统是基于网路层面,通过网络层面实现数据同步 NFS最早由Sun公司开发,分2,3,4三个版本,2和3由Sun起草开发,4.0开始Netapp公司参与并主导...

lyy549745 ⋅ 22分钟前 ⋅ 0

Spring AOP 源码分析 - 筛选合适的通知器

1.简介 从本篇文章开始,我将会对 Spring AOP 部分的源码进行分析。本文是 Spring AOP 源码分析系列文章的第二篇,本文主要分析 Spring AOP 是如何为目标 bean 筛选出合适的通知器(Advisor...

java高级架构牛人 ⋅ 45分钟前 ⋅ 0

HTML-标签手册

标签 描述 <!--...--> 定义注释。 <!DOCTYPE> 定义文档类型。 <a> 定义锚。超链接 <abbr> 定义缩写。 <acronym> 定义只取首字母的缩写。 <address> 定义文档作者或拥有者的联系信息。 <apple......

ZHAO_JH ⋅ 47分钟前 ⋅ 0

SylixOS在t_main中使用硬浮点方法

问题描述 在某些使用场景中,应用程序不使用动态加载的方式执行,而是跟随BSP在 t_main 线程中启动,此时应用代码是跟随 BSP 进行编译的。由于 BSP 默认使用软浮点,所以会导致应用代码中的浮...

zhywxyy ⋅ 54分钟前 ⋅ 0

JsBridge原理分析

看了这个Github代码 https://github.com/lzyzsd/JsBridge,想起N年前比较火的Hybrid方案,想看看现在跨平台调用实现有什么新的实现方式。代码看下来之后发现确实有点独特之处,这里先把核心的...

Kingguary ⋅ 今天 ⋅ 0

Intellij IDEA神器常用技巧五-真正常用快捷键(收藏级)

如果你觉得前面几篇博文太啰嗦,下面是博主多年使用Intellij IDEA真正常用快捷键,建议收藏!!! sout,System.out.println()快捷键 fori,for循环快捷键 psvm,main方法快捷键 Alt+Home,导...

Mkeeper ⋅ 今天 ⋅ 0

Java 静态代码分析工具简要分析与使用

本文首先介绍了静态代码分析的基本概念及主要技术,随后分别介绍了现有 4 种主流 Java 静态代码分析工具 (Checkstyle,FindBugs,PMD,Jtest),最后从功能、特性等方面对它们进行分析和比较,...

Oo若离oO ⋅ 今天 ⋅ 0

SpringBoot自动配置小记

spring-boot项目的特色就在于它的自动配置,自动配置就是开箱即用的本源。 不过支持一个子项目的自动配置,往往比较复杂,无论是sping自己的项目,还是第三方的,都是如此。刚接触会有点乱乱...

大_于 ⋅ 今天 ⋅ 0

React jsx 中写更优雅、直观的条件运算符

在这篇文字中我学到了很多知识,同时结合工作中的一些经验也在思考一些东西。比如条件运算符 Conditional Operator condition ? expr_if_true : expr_if_false 在jsx中书写条件语句我们经常都...

开源中国最帅没有之一 ⋅ 今天 ⋅ 0

vim编辑模式与命令模式

5.5 进入编辑模式 从编辑模式返回一般模式“Esc” 5.6 vim命令模式 命令 :“nohl”=no high light 无高亮,取消内容中高亮标记 "x":保存退出,和wq的区别是,当进入一个文件未进行编辑时,使...

弓正 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部