文档章节

MapReduce项目应用之 处理手机通信流量统计

simpler
 simpler
发布于 2014/04/22 20:42
字数 882
阅读 223
收藏 2

模拟元数据如下 HTTP_20130313143750.dat

1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200

1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200

1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200

1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200

1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200

1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200

1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200

1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200

1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200

1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200

1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200

1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200

1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200

1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200

1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200

1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200

1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200

1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200

1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200

1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200

1363157985079 13823070001 20-7C-8F-70-68-1F:CMCC 120.196.100.99 6 3 360 180 200

1363157985069 13600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.55 18 138 1080 186852 200

上面日志的格式如下

c9b5819681ae327bc8bbed029be30120

MapReduce代码如下

   

package MapReduce;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class KpiApp {
    static final String INPUT_PATH = "hdfs://hadoop:9000/wlan";
    static final String OUT_PATH = "hdfs://hadoop:9000/outwlan";
    public static void main(String[] args) throws Exception{
        final Job job = new Job(new Configuration(), KpiApp.class.getSimpleName());
        //1.1 指定输入文件路径
        FileInputFormat.setInputPaths(job, INPUT_PATH);
        //指定哪个类用来格式化输入文件
        job.setInputFormatClass(TextInputFormat.class);
        
        //1.2指定自定义的Mapper类
        job.setMapperClass(MyMapper.class);
        //指定输出<k2,v2>的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(KpiWritable.class);
        
        //1.3 指定分区类
        job.setPartitionerClass(HashPartitioner.class);
        job.setNumReduceTasks(1);
        
        //1.4 TODO 排序、分区
        
        //1.5  TODO (可选)合并
        
        //2.2 指定自定义的reduce类
        job.setReducerClass(MyReducer.class);
        //指定输出<k3,v3>的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(KpiWritable.class);
        
        //2.3 指定输出到哪里
        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
        //设定输出文件的格式化类
        job.setOutputFormatClass(TextOutputFormat.class);
        
        //把代码提交给JobTracker执行
        job.waitForCompletion(true);
    }

    static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable>{
        protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,KpiWritable>.Context context) throws IOException ,InterruptedException {
            final String[] splited = value.toString().split("\t");
            final String msisdn = splited[1];
            final Text k2 = new Text(msisdn);
            final KpiWritable v2 = new KpiWritable(splited[6],splited[7],splited[8],splited[9]);
            context.write(k2, v2);
        };
    }
    
    static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable>{
        /**
         * @param    k2    表示整个文件中不同的手机号码    
         * @param    v2s    表示该手机号在不同时段的流量的集合
         */
        protected void reduce(Text k2, java.lang.Iterable<KpiWritable> v2s, org.apache.hadoop.mapreduce.Reducer<Text,KpiWritable,Text,KpiWritable>.Context context) throws IOException ,InterruptedException {
            long upPackNum = 0L;
            long downPackNum = 0L;
            long upPayLoad = 0L;
            long downPayLoad = 0L;
            
            for (KpiWritable kpiWritable : v2s) {
                upPackNum += kpiWritable.upPackNum;
                downPackNum += kpiWritable.downPackNum;
                upPayLoad += kpiWritable.upPayLoad;
                downPayLoad += kpiWritable.downPayLoad;
            }
            
            final KpiWritable v3 = new KpiWritable(upPackNum+"", downPackNum+"", upPayLoad+"", downPayLoad+"");
            context.write(k2, v3);
        };
    }
}

class KpiWritable implements Writable{
    long upPackNum;
    long downPackNum;
    long upPayLoad;
    long downPayLoad;
    
    public KpiWritable(){}
    
    public KpiWritable(String upPackNum, String downPackNum, String upPayLoad, String downPayLoad){
        this.upPackNum = Long.parseLong(upPackNum);
        this.downPackNum = Long.parseLong(downPackNum);
        this.upPayLoad = Long.parseLong(upPayLoad);
        this.downPayLoad = Long.parseLong(downPayLoad);
    }
    
    
    @Override
    public void readFields(DataInput in) throws IOException {
        this.upPackNum = in.readLong();
        this.downPackNum = in.readLong();
        this.upPayLoad = in.readLong();
        this.downPayLoad = in.readLong();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upPackNum);
        out.writeLong(downPackNum);
        out.writeLong(upPayLoad);
        out.writeLong(downPayLoad);
    }
    
    @Override
    public String toString() {
        return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad;
    }
}

将HTTP_20130313143750.dat上传至hadoop HDFS文件系统中

bf81947f9a3a71970c687a87ea1369b0_3

运行MapReduce代码,查看输出的/outwlan/part-*文件下的内容

02f6240d-16ec-4cc1-a05d-bd9b378c7bd8

© 著作权归作者所有

simpler
粉丝 25
博文 44
码字总数 40338
作品 0
成都
程序员
私信 提问
7个实例全面掌握Hadoop MapReduce

作者介绍 杜亦舒,创业中,技术合伙人,喜欢研究分享技术。个人订阅号:性能与架构。 本文旨在帮您快速了解 MapReduce 的工作机制和开发方法,解决以下几个问题: 文章中提供了程序实例中涉及...

杜亦舒
2017/06/08
0
0
大数据教程(9.1)流量汇总排序的mr实现

上一章我们有讲到一个mapreduce案例——移动流量排序,如果我们要将最后的输出结果按总流量大小逆序输出,该怎么实现呢?本节博主将分享这个实现的过程。 一、分析 首先,要实现这个功能,我...

em_aaron
2018/12/01
94
2
大数据(hadoop-Mapreduce原理架构)

课程目标: 1:MapReduce的应用场景 2:MapReduce编程模型 3:MapReduce的架构 4:常见MapReduce应用场景 5:总结 MapReduce的定义 源自于Google的MapReduce论文 发表于2004年12月 Hadoop M...

这很耳东先生
2019/04/30
82
0
一文详解大规模数据计算处理原理及操作重点

作者介绍 李智慧,《大型网站技术架构:核心原理与案例分析》作者。曾供职于阿里巴巴与英特尔亚太研发中心,从事大型网站与大数据方面的研发工作,目前在做企业级区块链方面的开发工作。 大数...

DBAplus社群
2018/08/07
0
0
大数据教程(8.4)移动流量分析案例

前面分享了使用mapreduce做wordcount单词统计的实现与原理。本篇博主将继续分享一个移动流量分析的经典案例,来帮助在实际工作中理解和使用hadoop平台。 一、需求 以下是一个移动流量的日志,...

em_aaron
2018/11/24
43
0

没有更多内容

加载失败,请刷新页面

加载更多

深度残差收缩网络

深度残差网络ResNet获得了2016年CVPR会议的最佳论文奖,截至目前,在谷歌学术上的引用量已经达到了38295次。 深度残差收缩网络是深度残差网络的一种新颖的改进版本,其实是深度残差网络、注意...

深度研究员
23分钟前
72
0
聊聊artemis的scheduledDeliveryTime

序 本文主要研究一下artemis的scheduledDeliveryTime HDR_SCHEDULED_DELIVERY_TIME activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/......

go4it
29分钟前
19
0
Guide to the Synchronized Keyword in Java

1. Overview This quick article will be an intro to using the synchronized block in Java. Simply put, in a multi-threaded environment, a race condition occurs when two or more th......

Ciet
39分钟前
38
0
Spring AOP-00-开篇

AOP联盟为增强定义了org.aopalliance.aop.Advice接口,Spring支持5种类型的增强。本章3.3节中使用到的@Before、@After等注解是基于AspectJ实现的增强类型。其实Spring也支持很多增强类型,S...

moon888
45分钟前
51
0
Vue造轮子-Tabs测试(下)

1. 如果g-tabs里面不是g-tabs-head,g-tabs-body期望会报错。 // 目前没有报错,所以先改 // tabs.vue if(this.$children.length===0){ // 这个$children是看子组件,不是子元...

ories
57分钟前
31
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部