学习MapReduce(二)

原创
2017/03/10 19:35
阅读数 41

2017.3.10 深入了解MapReduce的各种工作机制,以及Yarn的工作流程,使用自己编写的序列化类》》》》》

今天看了一个小项目,内容如下:

(案例)---->>>对手机上网流量统计结果,按总流量倒序排序<<<----------

因为手里没有数据所以使用了python随机写了1万条数据,刚开始学习python,随便写了一点,见笑:

    

#coding:utf-8
'''使用随机数来生成一张流量表'''

import random
file = open("E:\\1.txt","w")
for i in range(10000):
    phone = random.randint(13800000000,13899999999)

    upload = random.randint(0,1000)

    download = random.randint(0,1000)
    file.write("%d\t%d\t%d\n" %(phone,upload,download))
file.close()
 

最后得到了1万条符合上面要求的文件。

开始分析这个案例,案例要求对数据的总流量进行排序,所以,需要我们自己写一个可序列化的实现类,这个实现类,可以对如上数据进行比较。代码如下:

package lcPhoneFolw;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class PhoneFlow implements WritableComparable<PhoneFlow>{
    private long upload;
    private long download;
    private long sumload;
    
    public PhoneFlow(){}
    
    @Override
    public String toString() {
        
        return upload+"\t"+download+"\t"+sumload;
    }

    public PhoneFlow(long upload, long download) {
        this.upload = upload;
        this.download = download;
        this.sumload = upload+download;
    }

    public long getUpload() {
        return upload;
    }

    public void setUpload(long upload) {
        this.upload = upload;
    }

    public long getDownload() {
        return download;
    }

    public void setDownload(long download) {
        this.download = download;
    }

    public long getSumload() {
        return sumload;
    }

    public void setSumload(long sumload) {
        this.sumload = sumload;
    }
    //序列化
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        out.writeLong(upload);
        out.writeLong(download);
        out.writeLong(sumload);
    }
    //反序列化(应该是和序列化的顺序一样,如果不一样应该是会出问题。没有研究过)
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        upload = in.readLong();
        download = in.readLong();
        sumload = in.readLong();
    }


    //自定义比较方法
    public int compareTo(PhoneFlow o) {
        
        return this.sumload>o.getSumload() ? -1 : 1 ;
    }

}

自己编写的序列化类写完后,开始写Mapper类和Reducer类,写Reducer类时,有个小坑

---------------->>>>Mapper类<<<<-----------------------

package lcPhoneFolw;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
 * 使用自己的序列化类当作map结果的K,
 * @author admin
 *
 */
public class PhoneMapper extends Mapper<LongWritable, Text, PhoneFlow ,Text> {

    
    @Override
    protected void map(LongWritable key, Text value,Context context)
        throws IOException, InterruptedException {
    
            String line = value.toString();
            
            String[] strs = line.split("\t");
            //既然要有一个自己写的序列化类的实例,这里我们要有一个实例
            Text phoneNb  = new Text(strs[0]);
            long upload = Long.parseLong(strs[1]);
            long download = Long.parseLong(strs[2]);
            PhoneFlow pf = new PhoneFlow(upload, download);
            
            context.write(pf,phoneNb);

    }    
    
}
--------------->>>>Reducer类<<<<-----------------

package lcPhoneFolw;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
 * 这里有个小坑,我一直没搞明白为何在自己的序列化类中只写了比较方法,而没有使用这个方法就可以排序了,其实这和MapReduce的机制有关系。
 * 当我们使用我们自己写的序列化类为map方法的输出结果的K 传给Reducer时,会自动按照我们定义的比较方式排序,然后交给Reducer。
 * Reducer主要是针对value的计算,(也就是说想要排序 最好是将要排序的值当作K传给Ruducer)
 * @author admin
 *
 */
public class PhoneRedcer extends Reducer<PhoneFlow,Text, Text,PhoneFlow> {
    
        @Override
        protected void reduce(PhoneFlow key, Iterable<Text> values,Context context)
                throws IOException, InterruptedException {
            context.write(values.iterator().next(),key);
        }
    }
写驱动类

---------->>>驱动类<<<---------

package lcPhoneFolw;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class PhoneDrive {
    public static void main(String[] args) throws Exception {
        Configuration conf =  new Configuration();
//        conf.set("fs.defaultFS", "hdfs://192.168.1.101:8020");
        
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(PhoneDrive.class);
        
        //mapper的信息
        job.setMapperClass(PhoneMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(PhoneFlow.class);
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        //reducer的信息
        job.setReducerClass(PhoneRedcer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(PhoneFlow.class);
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        
        job.waitForCompletion(true);
        
    }
}
 

测试通过!明天见!

 

 

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部