文档章节

学习MapReduce(二)

静下来想想静静
 静下来想想静静
发布于 2017/03/10 19:35
字数 817
阅读 30
收藏 0

2017.3.10 深入了解MapReduce的各种工作机制,以及Yarn的工作流程,使用自己编写的序列化类》》》》》

今天看了一个小项目,内容如下:

(案例)---->>>对手机上网流量统计结果,按总流量倒序排序<<<----------

因为手里没有数据所以使用了python随机写了1万条数据,刚开始学习python,随便写了一点,见笑:

    

#coding:utf-8
'''使用随机数来生成一张流量表'''

import random
file = open("E:\\1.txt","w")
for i in range(10000):
    phone = random.randint(13800000000,13899999999)

    upload = random.randint(0,1000)

    download = random.randint(0,1000)
    file.write("%d\t%d\t%d\n" %(phone,upload,download))
file.close()
 

最后得到了1万条符合上面要求的文件。

开始分析这个案例,案例要求对数据的总流量进行排序,所以,需要我们自己写一个可序列化的实现类,这个实现类,可以对如上数据进行比较。代码如下:

package lcPhoneFolw;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class PhoneFlow implements WritableComparable<PhoneFlow>{
    private long upload;
    private long download;
    private long sumload;
    
    public PhoneFlow(){}
    
    @Override
    public String toString() {
        
        return upload+"\t"+download+"\t"+sumload;
    }

    public PhoneFlow(long upload, long download) {
        this.upload = upload;
        this.download = download;
        this.sumload = upload+download;
    }

    public long getUpload() {
        return upload;
    }

    public void setUpload(long upload) {
        this.upload = upload;
    }

    public long getDownload() {
        return download;
    }

    public void setDownload(long download) {
        this.download = download;
    }

    public long getSumload() {
        return sumload;
    }

    public void setSumload(long sumload) {
        this.sumload = sumload;
    }
    //序列化
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        out.writeLong(upload);
        out.writeLong(download);
        out.writeLong(sumload);
    }
    //反序列化(应该是和序列化的顺序一样,如果不一样应该是会出问题。没有研究过)
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        upload = in.readLong();
        download = in.readLong();
        sumload = in.readLong();
    }


    //自定义比较方法
    public int compareTo(PhoneFlow o) {
        
        return this.sumload>o.getSumload() ? -1 : 1 ;
    }

}

自己编写的序列化类写完后,开始写Mapper类和Reducer类,写Reducer类时,有个小坑

---------------->>>>Mapper类<<<<-----------------------

package lcPhoneFolw;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
 * 使用自己的序列化类当作map结果的K,
 * @author admin
 *
 */
public class PhoneMapper extends Mapper<LongWritable, Text, PhoneFlow ,Text> {

    
    @Override
    protected void map(LongWritable key, Text value,Context context)
        throws IOException, InterruptedException {
    
            String line = value.toString();
            
            String[] strs = line.split("\t");
            //既然要有一个自己写的序列化类的实例,这里我们要有一个实例
            Text phoneNb  = new Text(strs[0]);
            long upload = Long.parseLong(strs[1]);
            long download = Long.parseLong(strs[2]);
            PhoneFlow pf = new PhoneFlow(upload, download);
            
            context.write(pf,phoneNb);

    }    
    
}
--------------->>>>Reducer类<<<<-----------------

package lcPhoneFolw;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
 * 这里有个小坑,我一直没搞明白为何在自己的序列化类中只写了比较方法,而没有使用这个方法就可以排序了,其实这和MapReduce的机制有关系。
 * 当我们使用我们自己写的序列化类为map方法的输出结果的K 传给Reducer时,会自动按照我们定义的比较方式排序,然后交给Reducer。
 * Reducer主要是针对value的计算,(也就是说想要排序 最好是将要排序的值当作K传给Ruducer)
 * @author admin
 *
 */
public class PhoneRedcer extends Reducer<PhoneFlow,Text, Text,PhoneFlow> {
    
        @Override
        protected void reduce(PhoneFlow key, Iterable<Text> values,Context context)
                throws IOException, InterruptedException {
            context.write(values.iterator().next(),key);
        }
    }
写驱动类

---------->>>驱动类<<<---------

package lcPhoneFolw;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class PhoneDrive {
    public static void main(String[] args) throws Exception {
        Configuration conf =  new Configuration();
//        conf.set("fs.defaultFS", "hdfs://192.168.1.101:8020");
        
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(PhoneDrive.class);
        
        //mapper的信息
        job.setMapperClass(PhoneMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(PhoneFlow.class);
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        //reducer的信息
        job.setReducerClass(PhoneRedcer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(PhoneFlow.class);
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        
        job.waitForCompletion(true);
        
    }
}
 

测试通过!明天见!

 

 

 

© 著作权归作者所有

静下来想想静静
粉丝 2
博文 14
码字总数 12348
作品 0
焦作
私信 提问
大数据MapReduce 编程实战

MapReduce 编程实战 一、大数据的起源 1、举例:(1)商品推荐 问题1:大量订单如何存储? 问题2:大量订单如何计算? (2)天气预报: 问题1:大量的天气数据如何存储? 问题2:大量的天气数...

我叫大兄弟
2018/05/06
0
0
好程序员大数据分享MapReduce中job的提交流程

好程序员大数据分享MapReduce中job的提交流程 一、MapReduce的定义 MapReduce是面向大数据并行处理的计算模型、框架和平台。 它的主要思想是:map(映射)和reduce(归约) 1)MapReduce是一...

好程序员IT
06/05
3
0
大数据教程(10.7)Mapreduce的其他补充(计数器、多job串联、参数优化等)

上一篇文章分析了自定义inputFormat(小文件合并)的实现,在此博主将继续Mapreduce的其他补充(计数器、多job串联、参数优化等)内容的分享。 一、计数器应用 在实际生产代码中,常常需要将...

em_aaron
2018/12/30
30
0
如何分布式运行mapreduce程序

如何分布式运行mapreduce程序 一、 首先要知道此前提 若在windows的Eclipse工程中直接启动mapreduc程序,需要先把hadoop集群的配置目录下的xml都拷贝到src目录下,让程序自动读取集群的地址后...

Zero零_度
2015/09/06
245
0
【hadoop】16.MapReduce-简介

简介 本章节我们先来了解一些关于MapReduce的理论知识。从本章节您可以学习到:MapReduce的相关知识。 1、概念 Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析...

Areya
01/12
19
0

没有更多内容

加载失败,请刷新页面

加载更多

Executor线程池原理与源码解读

线程池为线程生命周期的开销和资源不足问题提供了解决方 案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。 线程实现方式 Thread、Runnable、Callable //实现Runnable接口的...

小强的进阶之路
昨天
6
0
maven 环境隔离

解决问题 即 在 resource 文件夹下面 ,新增对应的资源配置文件夹,对应 开发,测试,生产的不同的配置内容 <resources> <resource> <directory>src/main/resources.${deplo......

之渊
昨天
8
0
详解箭头函数和普通函数的区别以及箭头函数的注意事项、不适用场景

箭头函数是ES6的API,相信很多人都知道,因为其语法上相对于普通函数更简洁,深受大家的喜爱。就是这种我们日常开发中一直在使用的API,大部分同学却对它的了解程度还是不够深... 普通函数和...

OBKoro1
昨天
7
0
轻量级 HTTP(s) 代理 TinyProxy

CentOS 下安装 TinyProxy yum install -y tinyproxy 启动、停止、重启 # 启动service tinyproxy start# 停止service tinyproxy stop# 重启service tinyproxy restart 相关配置 默认...

Anoyi
昨天
2
0
Linux创建yum仓库

第一步、搞定自己的光盘 #创建文件夹 mkdir -p /media/cdrom #挂载光盘 mount /dev/cdrom /media/cdrom #编辑配置文件使其永久生效 vim /etc/fstab 第二步,编辑yun源 vim /ect yum.repos.d...

究极小怪兽zzz
昨天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部